You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/05/07 01:36:13 UTC

git commit: kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel Koshy and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 44c39c4ea -> 9b6bf4078


kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b6bf407
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b6bf407
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b6bf407

Branch: refs/heads/trunk
Commit: 9b6bf407874ef0fda12d8b2cc7f8331ce4aebeea
Parents: 44c39c4
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue May 6 16:36:09 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue May 6 16:36:09 2014 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      |  7 ++-
 core/src/main/scala/kafka/log/LogManager.scala  |  6 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 26 ++++++--
 .../kafka/server/KafkaServerStartable.scala     |  8 +++
 .../scala/unit/kafka/log/LogManagerTest.scala   | 64 +++++++++++++++++---
 .../server/HighwatermarkPersistenceTest.scala   |  1 +
 .../unit/kafka/server/ReplicaManagerTest.scala  |  1 +
 7 files changed, 97 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 401bf1e..2fa1341 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,7 +28,6 @@ import kafka.cluster.Broker
 import kafka.common._
 import kafka.log.LogConfig
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
 import kafka.utils._
 import kafka.utils.Utils._
@@ -37,6 +36,8 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
+import scala.None
+import kafka.server._
 import scala.Some
 import kafka.common.TopicAndPartition
 
@@ -154,7 +155,7 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -316,6 +317,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
       Utils.registerMBean(this, KafkaController.MBeanName)
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
+      brokerState.newState(RunningAsController)
       maybeTriggerPartitionReassignment()
       maybeTriggerPreferredReplicaElection()
       /* send partition leadership info to all live brokers */
@@ -351,6 +353,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null
       }
+      brokerState.newState(RunningAsBroker)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ab72cff..1946c94 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 import kafka.utils._
 import scala.collection._
 import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.OffsetCheckpoint
+import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
 
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File],
                  val flushCheckpointMs: Long,
                  val retentionCheckMs: Long,
                  scheduler: Scheduler,
+                 val brokerState: BrokerState,
                  private val time: Time) extends Logging {
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LockFile = ".lock"
@@ -109,6 +110,9 @@ class LogManager(val logDirs: Array[File],
         val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
         if(cleanShutDownFile.exists())
           info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
+        else
+          brokerState.newState(RecoveringFromUncleanShutdown)
+
         for(dir <- subDirs) {
           if(dir.isDirectory) {
             info("Loading log '" + dir.getName + "'")

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c208f83..c22e51e 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -31,16 +31,19 @@ import kafka.cluster.Broker
 import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
 import kafka.common.ErrorMapping
 import kafka.network.{Receive, BlockingChannel, SocketServer}
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Kafka Server " + config.brokerId + "], "
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
   private var startupComplete = new AtomicBoolean(false)
+  val brokerState: BrokerState = new BrokerState
   val correlationId: AtomicInteger = new AtomicInteger(0)
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
@@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
   var zkClient: ZkClient = null
 
+  newGauge(
+    "BrokerState",
+    new Gauge[Int] {
+      def value = brokerState.currentState
+    }
+  )
+
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
     info("starting")
+    brokerState.newState(Starting)
     isShuttingDown = new AtomicBoolean(false)
     shutdownLatch = new CountDownLatch(1)
 
@@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     zkClient = initZk()
 
     /* start log manager */
-    logManager = createLogManager(zkClient)
+    logManager = createLogManager(zkClient, brokerState)
     logManager.startup()
 
     socketServer = new SocketServer(config.brokerId,
@@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     /* start offset manager */
     offsetManager = createOffsetManager()
 
-    kafkaController = new KafkaController(config, zkClient)
+    kafkaController = new KafkaController(config, zkClient, brokerState)
     
     /* start processing requests */
     apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+    brokerState.newState(RunningAsBroker)
    
     Mx4jLoader.maybeLoad()
 
@@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       var prevController : Broker = null
       var shutdownSuceeded : Boolean = false
       try {
+        brokerState.newState(PendingControlledShutdown)
         while (!shutdownSuceeded && remainingRetries > 0) {
           remainingRetries = remainingRetries - 1
 
@@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
               // send the controlled shutdown request
               val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
               channel.send(request)
+
               response = channel.receive()
+
               val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
               if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
                   shutdownResponse.partitionsRemaining.size == 0) {
@@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     val canShutdown = isShuttingDown.compareAndSet(false, true)
     if (canShutdown) {
       Utils.swallow(controlledShutdown())
+      brokerState.newState(BrokerShuttingDown)
       if(kafkaHealthcheck != null)
         Utils.swallow(kafkaHealthcheck.shutdown())
       if(socketServer != null)
@@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       if(zkClient != null)
         Utils.swallow(zkClient.close())
 
+      brokerState.newState(NotRunning)
       shutdownLatch.countDown()
       startupComplete.set(false)
       info("shut down completed")
@@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
   def getLogManager(): LogManager = logManager
   
-  private def createLogManager(zkClient: ZkClient): LogManager = {
+  private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
     val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, 
                                      segmentMs = 60L * 60L * 1000L * config.logRollHours,
                                      flushInterval = config.logFlushIntervalMessages,
@@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                    flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
                    retentionCheckMs = config.logCleanupIntervalMs,
                    scheduler = kafkaScheduler,
+                   brokerState = brokerState,
                    time = time)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index acda52b..cef3b84 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -52,6 +52,14 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
     }
   }
 
+  /**
+   * Allow setting broker state from the startable.
+   * This is needed when a custom kafka server startable want to emit new states that it introduces.
+   */
+  def setServerState(newState: Byte) {
+    server.brokerState.newState(newState)
+  }
+
   def awaitShutdown() = 
     server.awaitShutdown
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index be1a1ee..d03d4c4 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,10 +21,9 @@ import java.io._
 import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
-import kafka.server.KafkaConfig
+import kafka.server.{BrokerState, OffsetCheckpoint}
 import kafka.common._
 import kafka.utils._
-import kafka.server.OffsetCheckpoint
 
 class LogManagerTest extends JUnit3Suite {
 
@@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite {
                                 flushCheckpointMs = 100000L, 
                                 retentionCheckMs = 1000L, 
                                 scheduler = time.scheduler, 
-                                time = time)
+                                time = time,
+                                brokerState = new BrokerState())
     logManager.startup
     logDir = logManager.logDirs(0)
   }
@@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite {
     logManager.shutdown()
 
     val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
-    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time)
+    logManager = new LogManager(
+      logDirs = Array(logDir),
+      topicConfigs = Map(),
+      defaultConfig = config,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 100000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      brokerState = new BrokerState(),
+      time = time
+    )
     logManager.startup
 
     // create a log
@@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite {
   def testTimeBasedFlush() {
     logManager.shutdown()
     val config = logConfig.copy(flushMs = 1000)
-    logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
+    logManager = new LogManager(
+      logDirs = Array(logDir),
+      topicConfigs = Map(),
+      defaultConfig = config,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 10000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      brokerState = new BrokerState(),
+      time = time
+    )
     logManager.startup
     val log = logManager.createLog(TopicAndPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
@@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite {
                      TestUtils.tempDir(), 
                      TestUtils.tempDir())
     logManager.shutdown()
-    logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
+    logManager = new LogManager(
+      logDirs = dirs,
+      topicConfigs = Map(),
+      defaultConfig = logConfig,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 10000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      brokerState = new BrokerState(),
+      time = time
+    )
     
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
@@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite {
   @Test
   def testTwoLogManagersUsingSameDirFails() {
     try {
-      new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
+      new LogManager(
+        logDirs = Array(logDir),
+        topicConfigs = Map(),
+        defaultConfig = logConfig,
+        cleanerConfig = cleanerConfig,
+        flushCheckMs = 1000L,
+        flushCheckpointMs = 10000L,
+        retentionCheckMs = 1000L,
+        scheduler = time.scheduler,
+        brokerState = new BrokerState(),
+        time = time
+      )
       fail("Should not be able to create a second log manager instance with the same data directory")
     } catch {
       case e: KafkaException => // this is good 
@@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite {
       flushCheckpointMs = 100000L,
       retentionCheckMs = 1000L,
       scheduler = time.scheduler,
-      time = time)
+      time = time,
+      brokerState = new BrokerState())
     logManager.startup
     verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
   }
@@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite {
       flushCheckpointMs = 100000L,
       retentionCheckMs = 1000L,
       scheduler = time.scheduler,
-      time = time)
+      time = time,
+      brokerState = new BrokerState())
     logManager.startup
     verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index a78f7cf..558a5d6 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
                                                          flushCheckpointMs = 10000L,
                                                          retentionCheckMs = 30000,
                                                          scheduler = new KafkaScheduler(1),
+                                                         brokerState = new BrokerState(),
                                                          time = new MockTime))
     
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 41ebc7a..518d416 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite {
       flushCheckpointMs = 100000L,
       retentionCheckMs = 1000L,
       scheduler = time.scheduler,
+      brokerState = new BrokerState(),
       time = time)
   }
 


Re: git commit: kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel Koshy and Jun Rao

Posted by Neha Narkhede <ne...@gmail.com>.
I think we forgot to commit the new BrokerState file.


On Tue, May 6, 2014 at 4:36 PM, <ju...@apache.org> wrote:

> Repository: kafka
> Updated Branches:
>   refs/heads/trunk 44c39c4ea -> 9b6bf4078
>
>
> kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel
> Koshy and Jun Rao
>
>
> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
> Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b6bf407
> Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b6bf407
> Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b6bf407
>
> Branch: refs/heads/trunk
> Commit: 9b6bf407874ef0fda12d8b2cc7f8331ce4aebeea
> Parents: 44c39c4
> Author: Timothy Chen <tn...@gmail.com>
> Authored: Tue May 6 16:36:09 2014 -0700
> Committer: Jun Rao <ju...@gmail.com>
> Committed: Tue May 6 16:36:09 2014 -0700
>
> ----------------------------------------------------------------------
>  .../kafka/controller/KafkaController.scala      |  7 ++-
>  core/src/main/scala/kafka/log/LogManager.scala  |  6 +-
>  .../main/scala/kafka/server/KafkaServer.scala   | 26 ++++++--
>  .../kafka/server/KafkaServerStartable.scala     |  8 +++
>  .../scala/unit/kafka/log/LogManagerTest.scala   | 64 +++++++++++++++++---
>  .../server/HighwatermarkPersistenceTest.scala   |  1 +
>  .../unit/kafka/server/ReplicaManagerTest.scala  |  1 +
>  7 files changed, 97 insertions(+), 16 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/controller/KafkaController.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
> b/core/src/main/scala/kafka/controller/KafkaController.scala
> index 401bf1e..2fa1341 100644
> --- a/core/src/main/scala/kafka/controller/KafkaController.scala
> +++ b/core/src/main/scala/kafka/controller/KafkaController.scala
> @@ -28,7 +28,6 @@ import kafka.cluster.Broker
>  import kafka.common._
>  import kafka.log.LogConfig
>  import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
> -import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
>  import kafka.utils.ZkUtils._
>  import kafka.utils._
>  import kafka.utils.Utils._
> @@ -37,6 +36,8 @@ import org.I0Itec.zkclient.{IZkDataListener,
> IZkStateListener, ZkClient}
>  import org.I0Itec.zkclient.exception.{ZkNodeExistsException,
> ZkNoNodeException}
>  import java.util.concurrent.atomic.AtomicInteger
>  import java.util.concurrent.locks.ReentrantLock
> +import scala.None
> +import kafka.server._
>  import scala.Some
>  import kafka.common.TopicAndPartition
>
> @@ -154,7 +155,7 @@ object KafkaController extends Logging {
>    }
>  }
>
> -class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
> extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
> +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
> brokerState: BrokerState) extends Logging with KafkaMetricsGroup with
> KafkaControllerMBean {
>    this.logIdent = "[Controller " + config.brokerId + "]: "
>    private var isRunning = true
>    private val stateChangeLogger = KafkaController.stateChangeLogger
> @@ -316,6 +317,7 @@ class KafkaController(val config : KafkaConfig,
> zkClient: ZkClient) extends Logg
>        controllerContext.allTopics.foreach(topic =>
> partitionStateMachine.registerPartitionChangeListener(topic))
>        Utils.registerMBean(this, KafkaController.MBeanName)
>        info("Broker %d is ready to serve as the new controller with epoch
> %d".format(config.brokerId, epoch))
> +      brokerState.newState(RunningAsController)
>        maybeTriggerPartitionReassignment()
>        maybeTriggerPreferredReplicaElection()
>        /* send partition leadership info to all live brokers */
> @@ -351,6 +353,7 @@ class KafkaController(val config : KafkaConfig,
> zkClient: ZkClient) extends Logg
>          controllerContext.controllerChannelManager.shutdown()
>          controllerContext.controllerChannelManager = null
>        }
> +      brokerState.newState(RunningAsBroker)
>      }
>    }
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/log/LogManager.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/log/LogManager.scala
> b/core/src/main/scala/kafka/log/LogManager.scala
> index ab72cff..1946c94 100644
> --- a/core/src/main/scala/kafka/log/LogManager.scala
> +++ b/core/src/main/scala/kafka/log/LogManager.scala
> @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
>  import kafka.utils._
>  import scala.collection._
>  import kafka.common.{TopicAndPartition, KafkaException}
> -import kafka.server.OffsetCheckpoint
> +import kafka.server.{RecoveringFromUncleanShutdown, BrokerState,
> OffsetCheckpoint}
>
>  /**
>   * The entry point to the kafka log management subsystem. The log manager
> is responsible for log creation, retrieval, and cleaning.
> @@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File],
>                   val flushCheckpointMs: Long,
>                   val retentionCheckMs: Long,
>                   scheduler: Scheduler,
> +                 val brokerState: BrokerState,
>                   private val time: Time) extends Logging {
>    val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
>    val LockFile = ".lock"
> @@ -109,6 +110,9 @@ class LogManager(val logDirs: Array[File],
>          val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
>          if(cleanShutDownFile.exists())
>            info("Found clean shutdown file. Skipping recovery for all logs
> in data directory '%s'".format(dir.getAbsolutePath))
> +        else
> +          brokerState.newState(RecoveringFromUncleanShutdown)
> +
>          for(dir <- subDirs) {
>            if(dir.isDirectory) {
>              info("Loading log '" + dir.getName + "'")
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServer.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
> b/core/src/main/scala/kafka/server/KafkaServer.scala
> index c208f83..c22e51e 100644
> --- a/core/src/main/scala/kafka/server/KafkaServer.scala
> +++ b/core/src/main/scala/kafka/server/KafkaServer.scala
> @@ -31,16 +31,19 @@ import kafka.cluster.Broker
>  import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
>  import kafka.common.ErrorMapping
>  import kafka.network.{Receive, BlockingChannel, SocketServer}
> +import kafka.metrics.KafkaMetricsGroup
> +import com.yammer.metrics.core.Gauge
>
>  /**
>   * Represents the lifecycle of a single Kafka broker. Handles all
> functionality required
>   * to start up and shutdown a single Kafka node.
>   */
> -class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
> extends Logging {
> +class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
> extends Logging with KafkaMetricsGroup {
>    this.logIdent = "[Kafka Server " + config.brokerId + "], "
>    private var isShuttingDown = new AtomicBoolean(false)
>    private var shutdownLatch = new CountDownLatch(1)
>    private var startupComplete = new AtomicBoolean(false)
> +  val brokerState: BrokerState = new BrokerState
>    val correlationId: AtomicInteger = new AtomicInteger(0)
>    var socketServer: SocketServer = null
>    var requestHandlerPool: KafkaRequestHandlerPool = null
> @@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>    val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
>    var zkClient: ZkClient = null
>
> +  newGauge(
> +    "BrokerState",
> +    new Gauge[Int] {
> +      def value = brokerState.currentState
> +    }
> +  )
> +
>    /**
>     * Start up API for bringing up a single instance of the Kafka server.
>     * Instantiates the LogManager, the SocketServer and the request
> handlers - KafkaRequestHandlers
>     */
>    def startup() {
>      info("starting")
> +    brokerState.newState(Starting)
>      isShuttingDown = new AtomicBoolean(false)
>      shutdownLatch = new CountDownLatch(1)
>
> @@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
> SystemTime) extends Logg
>      zkClient = initZk()
>
>      /* start log manager */
> -    logManager = createLogManager(zkClient)
> +    logManager = createLogManager(zkClient, brokerState)
>      logManager.startup()
>
>      socketServer = new SocketServer(config.brokerId,
> @@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>      /* start offset manager */
>      offsetManager = createOffsetManager()
>
> -    kafkaController = new KafkaController(config, zkClient)
> +    kafkaController = new KafkaController(config, zkClient, brokerState)
>
>      /* start processing requests */
>      apis = new KafkaApis(socketServer.requestChannel, replicaManager,
> offsetManager, zkClient, config.brokerId, config, kafkaController)
>      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
> socketServer.requestChannel, apis, config.numIoThreads)
> +    brokerState.newState(RunningAsBroker)
>
>      Mx4jLoader.maybeLoad()
>
> @@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>        var prevController : Broker = null
>        var shutdownSuceeded : Boolean = false
>        try {
> +        brokerState.newState(PendingControlledShutdown)
>          while (!shutdownSuceeded && remainingRetries > 0) {
>            remainingRetries = remainingRetries - 1
>
> @@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>                // send the controlled shutdown request
>                val request = new
> ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
>                channel.send(request)
> +
>                response = channel.receive()
> +
>                val shutdownResponse =
> ControlledShutdownResponse.readFrom(response.buffer)
>                if (shutdownResponse.errorCode == ErrorMapping.NoError &&
> shutdownResponse.partitionsRemaining != null &&
>                    shutdownResponse.partitionsRemaining.size == 0) {
> @@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>      val canShutdown = isShuttingDown.compareAndSet(false, true)
>      if (canShutdown) {
>        Utils.swallow(controlledShutdown())
> +      brokerState.newState(BrokerShuttingDown)
>        if(kafkaHealthcheck != null)
>          Utils.swallow(kafkaHealthcheck.shutdown())
>        if(socketServer != null)
> @@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>        if(zkClient != null)
>          Utils.swallow(zkClient.close())
>
> +      brokerState.newState(NotRunning)
>        shutdownLatch.countDown()
>        startupComplete.set(false)
>        info("shut down completed")
> @@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>
>    def getLogManager(): LogManager = logManager
>
> -  private def createLogManager(zkClient: ZkClient): LogManager = {
> +  private def createLogManager(zkClient: ZkClient, brokerState:
> BrokerState): LogManager = {
>      val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
>                                       segmentMs = 60L * 60L * 1000L *
> config.logRollHours,
>                                       flushInterval =
> config.logFlushIntervalMessages,
> @@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>                     flushCheckpointMs =
> config.logFlushOffsetCheckpointIntervalMs,
>                     retentionCheckMs = config.logCleanupIntervalMs,
>                     scheduler = kafkaScheduler,
> +                   brokerState = brokerState,
>                     time = time)
>    }
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> index acda52b..cef3b84 100644
> --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> @@ -52,6 +52,14 @@ class KafkaServerStartable(val serverConfig:
> KafkaConfig) extends Logging {
>      }
>    }
>
> +  /**
> +   * Allow setting broker state from the startable.
> +   * This is needed when a custom kafka server startable want to emit new
> states that it introduces.
> +   */
> +  def setServerState(newState: Byte) {
> +    server.brokerState.newState(newState)
> +  }
> +
>    def awaitShutdown() =
>      server.awaitShutdown
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> index be1a1ee..d03d4c4 100644
> --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> @@ -21,10 +21,9 @@ import java.io._
>  import junit.framework.Assert._
>  import org.junit.Test
>  import org.scalatest.junit.JUnit3Suite
> -import kafka.server.KafkaConfig
> +import kafka.server.{BrokerState, OffsetCheckpoint}
>  import kafka.common._
>  import kafka.utils._
> -import kafka.server.OffsetCheckpoint
>
>  class LogManagerTest extends JUnit3Suite {
>
> @@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite {
>                                  flushCheckpointMs = 100000L,
>                                  retentionCheckMs = 1000L,
>                                  scheduler = time.scheduler,
> -                                time = time)
> +                                time = time,
> +                                brokerState = new BrokerState())
>      logManager.startup
>      logDir = logManager.logDirs(0)
>    }
> @@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite {
>      logManager.shutdown()
>
>      val config = logConfig.copy(segmentSize = 10 * (setSize - 1),
> retentionSize = 5L * 10L * setSize + 10L)
> -    logManager = new LogManager(Array(logDir), Map(), config,
> cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time)
> +    logManager = new LogManager(
> +      logDirs = Array(logDir),
> +      topicConfigs = Map(),
> +      defaultConfig = config,
> +      cleanerConfig = cleanerConfig,
> +      flushCheckMs = 1000L,
> +      flushCheckpointMs = 100000L,
> +      retentionCheckMs = 1000L,
> +      scheduler = time.scheduler,
> +      brokerState = new BrokerState(),
> +      time = time
> +    )
>      logManager.startup
>
>      // create a log
> @@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite {
>    def testTimeBasedFlush() {
>      logManager.shutdown()
>      val config = logConfig.copy(flushMs = 1000)
> -    logManager = new LogManager(Array(logDir), Map(), config,
> cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
> +    logManager = new LogManager(
> +      logDirs = Array(logDir),
> +      topicConfigs = Map(),
> +      defaultConfig = config,
> +      cleanerConfig = cleanerConfig,
> +      flushCheckMs = 1000L,
> +      flushCheckpointMs = 10000L,
> +      retentionCheckMs = 1000L,
> +      scheduler = time.scheduler,
> +      brokerState = new BrokerState(),
> +      time = time
> +    )
>      logManager.startup
>      val log = logManager.createLog(TopicAndPartition(name, 0), config)
>      val lastFlush = log.lastFlushTime
> @@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite {
>                       TestUtils.tempDir(),
>                       TestUtils.tempDir())
>      logManager.shutdown()
> -    logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig,
> 1000L, 10000L, 1000L, time.scheduler, time)
> +    logManager = new LogManager(
> +      logDirs = dirs,
> +      topicConfigs = Map(),
> +      defaultConfig = logConfig,
> +      cleanerConfig = cleanerConfig,
> +      flushCheckMs = 1000L,
> +      flushCheckpointMs = 10000L,
> +      retentionCheckMs = 1000L,
> +      scheduler = time.scheduler,
> +      brokerState = new BrokerState(),
> +      time = time
> +    )
>
>      // verify that logs are always assigned to the least loaded partition
>      for(partition <- 0 until 20) {
> @@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite {
>    @Test
>    def testTwoLogManagersUsingSameDirFails() {
>      try {
> -      new LogManager(Array(logDir), Map(), logConfig, cleanerConfig,
> 1000L, 10000L, 1000L, time.scheduler, time)
> +      new LogManager(
> +        logDirs = Array(logDir),
> +        topicConfigs = Map(),
> +        defaultConfig = logConfig,
> +        cleanerConfig = cleanerConfig,
> +        flushCheckMs = 1000L,
> +        flushCheckpointMs = 10000L,
> +        retentionCheckMs = 1000L,
> +        scheduler = time.scheduler,
> +        brokerState = new BrokerState(),
> +        time = time
> +      )
>        fail("Should not be able to create a second log manager instance
> with the same data directory")
>      } catch {
>        case e: KafkaException => // this is good
> @@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite {
>        flushCheckpointMs = 100000L,
>        retentionCheckMs = 1000L,
>        scheduler = time.scheduler,
> -      time = time)
> +      time = time,
> +      brokerState = new BrokerState())
>      logManager.startup
>      verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)),
> logManager)
>    }
> @@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite {
>        flushCheckpointMs = 100000L,
>        retentionCheckMs = 1000L,
>        scheduler = time.scheduler,
> -      time = time)
> +      time = time,
> +      brokerState = new BrokerState())
>      logManager.startup
>      verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)),
> logManager)
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> ----------------------------------------------------------------------
> diff --git
> a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> index a78f7cf..558a5d6 100644
> ---
> a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> +++
> b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> @@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
>
> flushCheckpointMs = 10000L,
>                                                           retentionCheckMs
> = 30000,
>                                                           scheduler = new
> KafkaScheduler(1),
> +                                                         brokerState =
> new BrokerState(),
>                                                           time = new
> MockTime))
>
>    @After
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> ----------------------------------------------------------------------
> diff --git
> a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> index 41ebc7a..518d416 100644
> --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> @@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite {
>        flushCheckpointMs = 100000L,
>        retentionCheckMs = 1000L,
>        scheduler = time.scheduler,
> +      brokerState = new BrokerState(),
>        time = time)
>    }
>
>
>