You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/05/07 01:36:13 UTC
git commit: kafka-1384; Log Broker state; patched by Timothy Chen;
reviewed by Joel Koshy and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 44c39c4ea -> 9b6bf4078
kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel Koshy and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b6bf407
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b6bf407
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b6bf407
Branch: refs/heads/trunk
Commit: 9b6bf407874ef0fda12d8b2cc7f8331ce4aebeea
Parents: 44c39c4
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue May 6 16:36:09 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue May 6 16:36:09 2014 -0700
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 7 ++-
core/src/main/scala/kafka/log/LogManager.scala | 6 +-
.../main/scala/kafka/server/KafkaServer.scala | 26 ++++++--
.../kafka/server/KafkaServerStartable.scala | 8 +++
.../scala/unit/kafka/log/LogManagerTest.scala | 64 +++++++++++++++++---
.../server/HighwatermarkPersistenceTest.scala | 1 +
.../unit/kafka/server/ReplicaManagerTest.scala | 1 +
7 files changed, 97 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 401bf1e..2fa1341 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,7 +28,6 @@ import kafka.cluster.Broker
import kafka.common._
import kafka.log.LogConfig
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
-import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
import kafka.utils.ZkUtils._
import kafka.utils._
import kafka.utils.Utils._
@@ -37,6 +36,8 @@ import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
+import scala.None
+import kafka.server._
import scala.Some
import kafka.common.TopicAndPartition
@@ -154,7 +155,7 @@ object KafkaController extends Logging {
}
}
-class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
+class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
@@ -316,6 +317,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
Utils.registerMBean(this, KafkaController.MBeanName)
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
+ brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
/* send partition leadership info to all live brokers */
@@ -351,6 +353,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
+ brokerState.newState(RunningAsBroker)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ab72cff..1946c94 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import kafka.utils._
import scala.collection._
import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.OffsetCheckpoint
+import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File],
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
+ val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
@@ -109,6 +110,9 @@ class LogManager(val logDirs: Array[File],
val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
if(cleanShutDownFile.exists())
info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
+ else
+ brokerState.newState(RecoveringFromUncleanShutdown)
+
for(dir <- subDirs) {
if(dir.isDirectory) {
info("Loading log '" + dir.getName + "'")
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c208f83..c22e51e 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -31,16 +31,19 @@ import kafka.cluster.Broker
import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
import kafka.common.ErrorMapping
import kafka.network.{Receive, BlockingChannel, SocketServer}
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
* to start up and shutdown a single Kafka node.
*/
-class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Kafka Server " + config.brokerId + "], "
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
private var startupComplete = new AtomicBoolean(false)
+ val brokerState: BrokerState = new BrokerState
val correlationId: AtomicInteger = new AtomicInteger(0)
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
@@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
var zkClient: ZkClient = null
+ newGauge(
+ "BrokerState",
+ new Gauge[Int] {
+ def value = brokerState.currentState
+ }
+ )
+
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
info("starting")
+ brokerState.newState(Starting)
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
@@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
zkClient = initZk()
/* start log manager */
- logManager = createLogManager(zkClient)
+ logManager = createLogManager(zkClient, brokerState)
logManager.startup()
socketServer = new SocketServer(config.brokerId,
@@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
/* start offset manager */
offsetManager = createOffsetManager()
- kafkaController = new KafkaController(config, zkClient)
+ kafkaController = new KafkaController(config, zkClient, brokerState)
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+ brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
@@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var prevController : Broker = null
var shutdownSuceeded : Boolean = false
try {
+ brokerState.newState(PendingControlledShutdown)
while (!shutdownSuceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
@@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
// send the controlled shutdown request
val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
channel.send(request)
+
response = channel.receive()
+
val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
shutdownResponse.partitionsRemaining.size == 0) {
@@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown) {
Utils.swallow(controlledShutdown())
+ brokerState.newState(BrokerShuttingDown)
if(kafkaHealthcheck != null)
Utils.swallow(kafkaHealthcheck.shutdown())
if(socketServer != null)
@@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
if(zkClient != null)
Utils.swallow(zkClient.close())
+ brokerState.newState(NotRunning)
shutdownLatch.countDown()
startupComplete.set(false)
info("shut down completed")
@@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def getLogManager(): LogManager = logManager
- private def createLogManager(zkClient: ZkClient): LogManager = {
+ private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = 60L * 60L * 1000L * config.logRollHours,
flushInterval = config.logFlushIntervalMessages,
@@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
scheduler = kafkaScheduler,
+ brokerState = brokerState,
time = time)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index acda52b..cef3b84 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -52,6 +52,14 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
}
}
+ /**
+ * Allow setting broker state from the startable.
+ * This is needed when a custom kafka server startable want to emit new states that it introduces.
+ */
+ def setServerState(newState: Byte) {
+ server.brokerState.newState(newState)
+ }
+
def awaitShutdown() =
server.awaitShutdown
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index be1a1ee..d03d4c4 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -21,10 +21,9 @@ import java.io._
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
-import kafka.server.KafkaConfig
+import kafka.server.{BrokerState, OffsetCheckpoint}
import kafka.common._
import kafka.utils._
-import kafka.server.OffsetCheckpoint
class LogManagerTest extends JUnit3Suite {
@@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite {
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
- time = time)
+ time = time,
+ brokerState = new BrokerState())
logManager.startup
logDir = logManager.logDirs(0)
}
@@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite {
logManager.shutdown()
val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
- logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time)
+ logManager = new LogManager(
+ logDirs = Array(logDir),
+ topicConfigs = Map(),
+ defaultConfig = config,
+ cleanerConfig = cleanerConfig,
+ flushCheckMs = 1000L,
+ flushCheckpointMs = 100000L,
+ retentionCheckMs = 1000L,
+ scheduler = time.scheduler,
+ brokerState = new BrokerState(),
+ time = time
+ )
logManager.startup
// create a log
@@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite {
def testTimeBasedFlush() {
logManager.shutdown()
val config = logConfig.copy(flushMs = 1000)
- logManager = new LogManager(Array(logDir), Map(), config, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
+ logManager = new LogManager(
+ logDirs = Array(logDir),
+ topicConfigs = Map(),
+ defaultConfig = config,
+ cleanerConfig = cleanerConfig,
+ flushCheckMs = 1000L,
+ flushCheckpointMs = 10000L,
+ retentionCheckMs = 1000L,
+ scheduler = time.scheduler,
+ brokerState = new BrokerState(),
+ time = time
+ )
logManager.startup
val log = logManager.createLog(TopicAndPartition(name, 0), config)
val lastFlush = log.lastFlushTime
@@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite {
TestUtils.tempDir(),
TestUtils.tempDir())
logManager.shutdown()
- logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
+ logManager = new LogManager(
+ logDirs = dirs,
+ topicConfigs = Map(),
+ defaultConfig = logConfig,
+ cleanerConfig = cleanerConfig,
+ flushCheckMs = 1000L,
+ flushCheckpointMs = 10000L,
+ retentionCheckMs = 1000L,
+ scheduler = time.scheduler,
+ brokerState = new BrokerState(),
+ time = time
+ )
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
@@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite {
@Test
def testTwoLogManagersUsingSameDirFails() {
try {
- new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
+ new LogManager(
+ logDirs = Array(logDir),
+ topicConfigs = Map(),
+ defaultConfig = logConfig,
+ cleanerConfig = cleanerConfig,
+ flushCheckMs = 1000L,
+ flushCheckpointMs = 10000L,
+ retentionCheckMs = 1000L,
+ scheduler = time.scheduler,
+ brokerState = new BrokerState(),
+ time = time
+ )
fail("Should not be able to create a second log manager instance with the same data directory")
} catch {
case e: KafkaException => // this is good
@@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite {
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
- time = time)
+ time = time,
+ brokerState = new BrokerState())
logManager.startup
verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
}
@@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite {
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
- time = time)
+ time = time,
+ brokerState = new BrokerState())
logManager.startup
verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index a78f7cf..558a5d6 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
flushCheckpointMs = 10000L,
retentionCheckMs = 30000,
scheduler = new KafkaScheduler(1),
+ brokerState = new BrokerState(),
time = new MockTime))
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 41ebc7a..518d416 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite {
flushCheckpointMs = 100000L,
retentionCheckMs = 1000L,
scheduler = time.scheduler,
+ brokerState = new BrokerState(),
time = time)
}
Re: git commit: kafka-1384; Log Broker state; patched by Timothy
Chen; reviewed by Joel Koshy and Jun Rao
Posted by Neha Narkhede <ne...@gmail.com>.
I think we forgot to commit the new BrokerState file.
On Tue, May 6, 2014 at 4:36 PM, <ju...@apache.org> wrote:
> Repository: kafka
> Updated Branches:
> refs/heads/trunk 44c39c4ea -> 9b6bf4078
>
>
> kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel
> Koshy and Jun Rao
>
>
> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
> Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b6bf407
> Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b6bf407
> Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b6bf407
>
> Branch: refs/heads/trunk
> Commit: 9b6bf407874ef0fda12d8b2cc7f8331ce4aebeea
> Parents: 44c39c4
> Author: Timothy Chen <tn...@gmail.com>
> Authored: Tue May 6 16:36:09 2014 -0700
> Committer: Jun Rao <ju...@gmail.com>
> Committed: Tue May 6 16:36:09 2014 -0700
>
> ----------------------------------------------------------------------
> .../kafka/controller/KafkaController.scala | 7 ++-
> core/src/main/scala/kafka/log/LogManager.scala | 6 +-
> .../main/scala/kafka/server/KafkaServer.scala | 26 ++++++--
> .../kafka/server/KafkaServerStartable.scala | 8 +++
> .../scala/unit/kafka/log/LogManagerTest.scala | 64 +++++++++++++++++---
> .../server/HighwatermarkPersistenceTest.scala | 1 +
> .../unit/kafka/server/ReplicaManagerTest.scala | 1 +
> 7 files changed, 97 insertions(+), 16 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/controller/KafkaController.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
> b/core/src/main/scala/kafka/controller/KafkaController.scala
> index 401bf1e..2fa1341 100644
> --- a/core/src/main/scala/kafka/controller/KafkaController.scala
> +++ b/core/src/main/scala/kafka/controller/KafkaController.scala
> @@ -28,7 +28,6 @@ import kafka.cluster.Broker
> import kafka.common._
> import kafka.log.LogConfig
> import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
> -import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
> import kafka.utils.ZkUtils._
> import kafka.utils._
> import kafka.utils.Utils._
> @@ -37,6 +36,8 @@ import org.I0Itec.zkclient.{IZkDataListener,
> IZkStateListener, ZkClient}
> import org.I0Itec.zkclient.exception.{ZkNodeExistsException,
> ZkNoNodeException}
> import java.util.concurrent.atomic.AtomicInteger
> import java.util.concurrent.locks.ReentrantLock
> +import scala.None
> +import kafka.server._
> import scala.Some
> import kafka.common.TopicAndPartition
>
> @@ -154,7 +155,7 @@ object KafkaController extends Logging {
> }
> }
>
> -class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
> extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
> +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
> brokerState: BrokerState) extends Logging with KafkaMetricsGroup with
> KafkaControllerMBean {
> this.logIdent = "[Controller " + config.brokerId + "]: "
> private var isRunning = true
> private val stateChangeLogger = KafkaController.stateChangeLogger
> @@ -316,6 +317,7 @@ class KafkaController(val config : KafkaConfig,
> zkClient: ZkClient) extends Logg
> controllerContext.allTopics.foreach(topic =>
> partitionStateMachine.registerPartitionChangeListener(topic))
> Utils.registerMBean(this, KafkaController.MBeanName)
> info("Broker %d is ready to serve as the new controller with epoch
> %d".format(config.brokerId, epoch))
> + brokerState.newState(RunningAsController)
> maybeTriggerPartitionReassignment()
> maybeTriggerPreferredReplicaElection()
> /* send partition leadership info to all live brokers */
> @@ -351,6 +353,7 @@ class KafkaController(val config : KafkaConfig,
> zkClient: ZkClient) extends Logg
> controllerContext.controllerChannelManager.shutdown()
> controllerContext.controllerChannelManager = null
> }
> + brokerState.newState(RunningAsBroker)
> }
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/log/LogManager.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/log/LogManager.scala
> b/core/src/main/scala/kafka/log/LogManager.scala
> index ab72cff..1946c94 100644
> --- a/core/src/main/scala/kafka/log/LogManager.scala
> +++ b/core/src/main/scala/kafka/log/LogManager.scala
> @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
> import kafka.utils._
> import scala.collection._
> import kafka.common.{TopicAndPartition, KafkaException}
> -import kafka.server.OffsetCheckpoint
> +import kafka.server.{RecoveringFromUncleanShutdown, BrokerState,
> OffsetCheckpoint}
>
> /**
> * The entry point to the kafka log management subsystem. The log manager
> is responsible for log creation, retrieval, and cleaning.
> @@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File],
> val flushCheckpointMs: Long,
> val retentionCheckMs: Long,
> scheduler: Scheduler,
> + val brokerState: BrokerState,
> private val time: Time) extends Logging {
> val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
> val LockFile = ".lock"
> @@ -109,6 +110,9 @@ class LogManager(val logDirs: Array[File],
> val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
> if(cleanShutDownFile.exists())
> info("Found clean shutdown file. Skipping recovery for all logs
> in data directory '%s'".format(dir.getAbsolutePath))
> + else
> + brokerState.newState(RecoveringFromUncleanShutdown)
> +
> for(dir <- subDirs) {
> if(dir.isDirectory) {
> info("Loading log '" + dir.getName + "'")
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServer.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
> b/core/src/main/scala/kafka/server/KafkaServer.scala
> index c208f83..c22e51e 100644
> --- a/core/src/main/scala/kafka/server/KafkaServer.scala
> +++ b/core/src/main/scala/kafka/server/KafkaServer.scala
> @@ -31,16 +31,19 @@ import kafka.cluster.Broker
> import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
> import kafka.common.ErrorMapping
> import kafka.network.{Receive, BlockingChannel, SocketServer}
> +import kafka.metrics.KafkaMetricsGroup
> +import com.yammer.metrics.core.Gauge
>
> /**
> * Represents the lifecycle of a single Kafka broker. Handles all
> functionality required
> * to start up and shutdown a single Kafka node.
> */
> -class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
> extends Logging {
> +class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
> extends Logging with KafkaMetricsGroup {
> this.logIdent = "[Kafka Server " + config.brokerId + "], "
> private var isShuttingDown = new AtomicBoolean(false)
> private var shutdownLatch = new CountDownLatch(1)
> private var startupComplete = new AtomicBoolean(false)
> + val brokerState: BrokerState = new BrokerState
> val correlationId: AtomicInteger = new AtomicInteger(0)
> var socketServer: SocketServer = null
> var requestHandlerPool: KafkaRequestHandlerPool = null
> @@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
> var zkClient: ZkClient = null
>
> + newGauge(
> + "BrokerState",
> + new Gauge[Int] {
> + def value = brokerState.currentState
> + }
> + )
> +
> /**
> * Start up API for bringing up a single instance of the Kafka server.
> * Instantiates the LogManager, the SocketServer and the request
> handlers - KafkaRequestHandlers
> */
> def startup() {
> info("starting")
> + brokerState.newState(Starting)
> isShuttingDown = new AtomicBoolean(false)
> shutdownLatch = new CountDownLatch(1)
>
> @@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
> SystemTime) extends Logg
> zkClient = initZk()
>
> /* start log manager */
> - logManager = createLogManager(zkClient)
> + logManager = createLogManager(zkClient, brokerState)
> logManager.startup()
>
> socketServer = new SocketServer(config.brokerId,
> @@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> /* start offset manager */
> offsetManager = createOffsetManager()
>
> - kafkaController = new KafkaController(config, zkClient)
> + kafkaController = new KafkaController(config, zkClient, brokerState)
>
> /* start processing requests */
> apis = new KafkaApis(socketServer.requestChannel, replicaManager,
> offsetManager, zkClient, config.brokerId, config, kafkaController)
> requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
> socketServer.requestChannel, apis, config.numIoThreads)
> + brokerState.newState(RunningAsBroker)
>
> Mx4jLoader.maybeLoad()
>
> @@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> var prevController : Broker = null
> var shutdownSuceeded : Boolean = false
> try {
> + brokerState.newState(PendingControlledShutdown)
> while (!shutdownSuceeded && remainingRetries > 0) {
> remainingRetries = remainingRetries - 1
>
> @@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> // send the controlled shutdown request
> val request = new
> ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
> channel.send(request)
> +
> response = channel.receive()
> +
> val shutdownResponse =
> ControlledShutdownResponse.readFrom(response.buffer)
> if (shutdownResponse.errorCode == ErrorMapping.NoError &&
> shutdownResponse.partitionsRemaining != null &&
> shutdownResponse.partitionsRemaining.size == 0) {
> @@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> val canShutdown = isShuttingDown.compareAndSet(false, true)
> if (canShutdown) {
> Utils.swallow(controlledShutdown())
> + brokerState.newState(BrokerShuttingDown)
> if(kafkaHealthcheck != null)
> Utils.swallow(kafkaHealthcheck.shutdown())
> if(socketServer != null)
> @@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> if(zkClient != null)
> Utils.swallow(zkClient.close())
>
> + brokerState.newState(NotRunning)
> shutdownLatch.countDown()
> startupComplete.set(false)
> info("shut down completed")
> @@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>
> def getLogManager(): LogManager = logManager
>
> - private def createLogManager(zkClient: ZkClient): LogManager = {
> + private def createLogManager(zkClient: ZkClient, brokerState:
> BrokerState): LogManager = {
> val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
> segmentMs = 60L * 60L * 1000L *
> config.logRollHours,
> flushInterval =
> config.logFlushIntervalMessages,
> @@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> flushCheckpointMs =
> config.logFlushOffsetCheckpointIntervalMs,
> retentionCheckMs = config.logCleanupIntervalMs,
> scheduler = kafkaScheduler,
> + brokerState = brokerState,
> time = time)
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> index acda52b..cef3b84 100644
> --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> @@ -52,6 +52,14 @@ class KafkaServerStartable(val serverConfig:
> KafkaConfig) extends Logging {
> }
> }
>
> + /**
> + * Allow setting broker state from the startable.
> + * This is needed when a custom kafka server startable want to emit new
> states that it introduces.
> + */
> + def setServerState(newState: Byte) {
> + server.brokerState.newState(newState)
> + }
> +
> def awaitShutdown() =
> server.awaitShutdown
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> index be1a1ee..d03d4c4 100644
> --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> @@ -21,10 +21,9 @@ import java.io._
> import junit.framework.Assert._
> import org.junit.Test
> import org.scalatest.junit.JUnit3Suite
> -import kafka.server.KafkaConfig
> +import kafka.server.{BrokerState, OffsetCheckpoint}
> import kafka.common._
> import kafka.utils._
> -import kafka.server.OffsetCheckpoint
>
> class LogManagerTest extends JUnit3Suite {
>
> @@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite {
> flushCheckpointMs = 100000L,
> retentionCheckMs = 1000L,
> scheduler = time.scheduler,
> - time = time)
> + time = time,
> + brokerState = new BrokerState())
> logManager.startup
> logDir = logManager.logDirs(0)
> }
> @@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite {
> logManager.shutdown()
>
> val config = logConfig.copy(segmentSize = 10 * (setSize - 1),
> retentionSize = 5L * 10L * setSize + 10L)
> - logManager = new LogManager(Array(logDir), Map(), config,
> cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time)
> + logManager = new LogManager(
> + logDirs = Array(logDir),
> + topicConfigs = Map(),
> + defaultConfig = config,
> + cleanerConfig = cleanerConfig,
> + flushCheckMs = 1000L,
> + flushCheckpointMs = 100000L,
> + retentionCheckMs = 1000L,
> + scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> + time = time
> + )
> logManager.startup
>
> // create a log
> @@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite {
> def testTimeBasedFlush() {
> logManager.shutdown()
> val config = logConfig.copy(flushMs = 1000)
> - logManager = new LogManager(Array(logDir), Map(), config,
> cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
> + logManager = new LogManager(
> + logDirs = Array(logDir),
> + topicConfigs = Map(),
> + defaultConfig = config,
> + cleanerConfig = cleanerConfig,
> + flushCheckMs = 1000L,
> + flushCheckpointMs = 10000L,
> + retentionCheckMs = 1000L,
> + scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> + time = time
> + )
> logManager.startup
> val log = logManager.createLog(TopicAndPartition(name, 0), config)
> val lastFlush = log.lastFlushTime
> @@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite {
> TestUtils.tempDir(),
> TestUtils.tempDir())
> logManager.shutdown()
> - logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig,
> 1000L, 10000L, 1000L, time.scheduler, time)
> + logManager = new LogManager(
> + logDirs = dirs,
> + topicConfigs = Map(),
> + defaultConfig = logConfig,
> + cleanerConfig = cleanerConfig,
> + flushCheckMs = 1000L,
> + flushCheckpointMs = 10000L,
> + retentionCheckMs = 1000L,
> + scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> + time = time
> + )
>
> // verify that logs are always assigned to the least loaded partition
> for(partition <- 0 until 20) {
> @@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite {
> @Test
> def testTwoLogManagersUsingSameDirFails() {
> try {
> - new LogManager(Array(logDir), Map(), logConfig, cleanerConfig,
> 1000L, 10000L, 1000L, time.scheduler, time)
> + new LogManager(
> + logDirs = Array(logDir),
> + topicConfigs = Map(),
> + defaultConfig = logConfig,
> + cleanerConfig = cleanerConfig,
> + flushCheckMs = 1000L,
> + flushCheckpointMs = 10000L,
> + retentionCheckMs = 1000L,
> + scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> + time = time
> + )
> fail("Should not be able to create a second log manager instance
> with the same data directory")
> } catch {
> case e: KafkaException => // this is good
> @@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite {
> flushCheckpointMs = 100000L,
> retentionCheckMs = 1000L,
> scheduler = time.scheduler,
> - time = time)
> + time = time,
> + brokerState = new BrokerState())
> logManager.startup
> verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)),
> logManager)
> }
> @@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite {
> flushCheckpointMs = 100000L,
> retentionCheckMs = 1000L,
> scheduler = time.scheduler,
> - time = time)
> + time = time,
> + brokerState = new BrokerState())
> logManager.startup
> verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)),
> logManager)
> }
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> ----------------------------------------------------------------------
> diff --git
> a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> index a78f7cf..558a5d6 100644
> ---
> a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> +++
> b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> @@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
>
> flushCheckpointMs = 10000L,
> retentionCheckMs
> = 30000,
> scheduler = new
> KafkaScheduler(1),
> + brokerState =
> new BrokerState(),
> time = new
> MockTime))
>
> @After
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> ----------------------------------------------------------------------
> diff --git
> a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> index 41ebc7a..518d416 100644
> --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> @@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite {
> flushCheckpointMs = 100000L,
> retentionCheckMs = 1000L,
> scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> time = time)
> }
>
>
>