You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Neha Narkhede <ne...@gmail.com> on 2014/05/07 03:02:05 UTC
Re: git commit: kafka-1384; Log Broker state; patched by Timothy
Chen; reviewed by Joel Koshy and Jun Rao
I think we forgot to commit the new BrokerState file.
On Tue, May 6, 2014 at 4:36 PM, <ju...@apache.org> wrote:
> Repository: kafka
> Updated Branches:
> refs/heads/trunk 44c39c4ea -> 9b6bf4078
>
>
> kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel
> Koshy and Jun Rao
>
>
> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
> Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b6bf407
> Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b6bf407
> Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b6bf407
>
> Branch: refs/heads/trunk
> Commit: 9b6bf407874ef0fda12d8b2cc7f8331ce4aebeea
> Parents: 44c39c4
> Author: Timothy Chen <tn...@gmail.com>
> Authored: Tue May 6 16:36:09 2014 -0700
> Committer: Jun Rao <ju...@gmail.com>
> Committed: Tue May 6 16:36:09 2014 -0700
>
> ----------------------------------------------------------------------
> .../kafka/controller/KafkaController.scala | 7 ++-
> core/src/main/scala/kafka/log/LogManager.scala | 6 +-
> .../main/scala/kafka/server/KafkaServer.scala | 26 ++++++--
> .../kafka/server/KafkaServerStartable.scala | 8 +++
> .../scala/unit/kafka/log/LogManagerTest.scala | 64 +++++++++++++++++---
> .../server/HighwatermarkPersistenceTest.scala | 1 +
> .../unit/kafka/server/ReplicaManagerTest.scala | 1 +
> 7 files changed, 97 insertions(+), 16 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/controller/KafkaController.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
> b/core/src/main/scala/kafka/controller/KafkaController.scala
> index 401bf1e..2fa1341 100644
> --- a/core/src/main/scala/kafka/controller/KafkaController.scala
> +++ b/core/src/main/scala/kafka/controller/KafkaController.scala
> @@ -28,7 +28,6 @@ import kafka.cluster.Broker
> import kafka.common._
> import kafka.log.LogConfig
> import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
> -import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
> import kafka.utils.ZkUtils._
> import kafka.utils._
> import kafka.utils.Utils._
> @@ -37,6 +36,8 @@ import org.I0Itec.zkclient.{IZkDataListener,
> IZkStateListener, ZkClient}
> import org.I0Itec.zkclient.exception.{ZkNodeExistsException,
> ZkNoNodeException}
> import java.util.concurrent.atomic.AtomicInteger
> import java.util.concurrent.locks.ReentrantLock
> +import scala.None
> +import kafka.server._
> import scala.Some
> import kafka.common.TopicAndPartition
>
> @@ -154,7 +155,7 @@ object KafkaController extends Logging {
> }
> }
>
> -class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
> extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
> +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
> brokerState: BrokerState) extends Logging with KafkaMetricsGroup with
> KafkaControllerMBean {
> this.logIdent = "[Controller " + config.brokerId + "]: "
> private var isRunning = true
> private val stateChangeLogger = KafkaController.stateChangeLogger
> @@ -316,6 +317,7 @@ class KafkaController(val config : KafkaConfig,
> zkClient: ZkClient) extends Logg
> controllerContext.allTopics.foreach(topic =>
> partitionStateMachine.registerPartitionChangeListener(topic))
> Utils.registerMBean(this, KafkaController.MBeanName)
> info("Broker %d is ready to serve as the new controller with epoch
> %d".format(config.brokerId, epoch))
> + brokerState.newState(RunningAsController)
> maybeTriggerPartitionReassignment()
> maybeTriggerPreferredReplicaElection()
> /* send partition leadership info to all live brokers */
> @@ -351,6 +353,7 @@ class KafkaController(val config : KafkaConfig,
> zkClient: ZkClient) extends Logg
> controllerContext.controllerChannelManager.shutdown()
> controllerContext.controllerChannelManager = null
> }
> + brokerState.newState(RunningAsBroker)
> }
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/log/LogManager.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/log/LogManager.scala
> b/core/src/main/scala/kafka/log/LogManager.scala
> index ab72cff..1946c94 100644
> --- a/core/src/main/scala/kafka/log/LogManager.scala
> +++ b/core/src/main/scala/kafka/log/LogManager.scala
> @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
> import kafka.utils._
> import scala.collection._
> import kafka.common.{TopicAndPartition, KafkaException}
> -import kafka.server.OffsetCheckpoint
> +import kafka.server.{RecoveringFromUncleanShutdown, BrokerState,
> OffsetCheckpoint}
>
> /**
> * The entry point to the kafka log management subsystem. The log manager
> is responsible for log creation, retrieval, and cleaning.
> @@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File],
> val flushCheckpointMs: Long,
> val retentionCheckMs: Long,
> scheduler: Scheduler,
> + val brokerState: BrokerState,
> private val time: Time) extends Logging {
> val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
> val LockFile = ".lock"
> @@ -109,6 +110,9 @@ class LogManager(val logDirs: Array[File],
> val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
> if(cleanShutDownFile.exists())
> info("Found clean shutdown file. Skipping recovery for all logs
> in data directory '%s'".format(dir.getAbsolutePath))
> + else
> + brokerState.newState(RecoveringFromUncleanShutdown)
> +
> for(dir <- subDirs) {
> if(dir.isDirectory) {
> info("Loading log '" + dir.getName + "'")
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServer.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
> b/core/src/main/scala/kafka/server/KafkaServer.scala
> index c208f83..c22e51e 100644
> --- a/core/src/main/scala/kafka/server/KafkaServer.scala
> +++ b/core/src/main/scala/kafka/server/KafkaServer.scala
> @@ -31,16 +31,19 @@ import kafka.cluster.Broker
> import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
> import kafka.common.ErrorMapping
> import kafka.network.{Receive, BlockingChannel, SocketServer}
> +import kafka.metrics.KafkaMetricsGroup
> +import com.yammer.metrics.core.Gauge
>
> /**
> * Represents the lifecycle of a single Kafka broker. Handles all
> functionality required
> * to start up and shutdown a single Kafka node.
> */
> -class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
> extends Logging {
> +class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
> extends Logging with KafkaMetricsGroup {
> this.logIdent = "[Kafka Server " + config.brokerId + "], "
> private var isShuttingDown = new AtomicBoolean(false)
> private var shutdownLatch = new CountDownLatch(1)
> private var startupComplete = new AtomicBoolean(false)
> + val brokerState: BrokerState = new BrokerState
> val correlationId: AtomicInteger = new AtomicInteger(0)
> var socketServer: SocketServer = null
> var requestHandlerPool: KafkaRequestHandlerPool = null
> @@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
> var zkClient: ZkClient = null
>
> + newGauge(
> + "BrokerState",
> + new Gauge[Int] {
> + def value = brokerState.currentState
> + }
> + )
> +
> /**
> * Start up API for bringing up a single instance of the Kafka server.
> * Instantiates the LogManager, the SocketServer and the request
> handlers - KafkaRequestHandlers
> */
> def startup() {
> info("starting")
> + brokerState.newState(Starting)
> isShuttingDown = new AtomicBoolean(false)
> shutdownLatch = new CountDownLatch(1)
>
> @@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
> SystemTime) extends Logg
> zkClient = initZk()
>
> /* start log manager */
> - logManager = createLogManager(zkClient)
> + logManager = createLogManager(zkClient, brokerState)
> logManager.startup()
>
> socketServer = new SocketServer(config.brokerId,
> @@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> /* start offset manager */
> offsetManager = createOffsetManager()
>
> - kafkaController = new KafkaController(config, zkClient)
> + kafkaController = new KafkaController(config, zkClient, brokerState)
>
> /* start processing requests */
> apis = new KafkaApis(socketServer.requestChannel, replicaManager,
> offsetManager, zkClient, config.brokerId, config, kafkaController)
> requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
> socketServer.requestChannel, apis, config.numIoThreads)
> + brokerState.newState(RunningAsBroker)
>
> Mx4jLoader.maybeLoad()
>
> @@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> var prevController : Broker = null
> var shutdownSuceeded : Boolean = false
> try {
> + brokerState.newState(PendingControlledShutdown)
> while (!shutdownSuceeded && remainingRetries > 0) {
> remainingRetries = remainingRetries - 1
>
> @@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> // send the controlled shutdown request
> val request = new
> ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
> channel.send(request)
> +
> response = channel.receive()
> +
> val shutdownResponse =
> ControlledShutdownResponse.readFrom(response.buffer)
> if (shutdownResponse.errorCode == ErrorMapping.NoError &&
> shutdownResponse.partitionsRemaining != null &&
> shutdownResponse.partitionsRemaining.size == 0) {
> @@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> val canShutdown = isShuttingDown.compareAndSet(false, true)
> if (canShutdown) {
> Utils.swallow(controlledShutdown())
> + brokerState.newState(BrokerShuttingDown)
> if(kafkaHealthcheck != null)
> Utils.swallow(kafkaHealthcheck.shutdown())
> if(socketServer != null)
> @@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> if(zkClient != null)
> Utils.swallow(zkClient.close())
>
> + brokerState.newState(NotRunning)
> shutdownLatch.countDown()
> startupComplete.set(false)
> info("shut down completed")
> @@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>
> def getLogManager(): LogManager = logManager
>
> - private def createLogManager(zkClient: ZkClient): LogManager = {
> + private def createLogManager(zkClient: ZkClient, brokerState:
> BrokerState): LogManager = {
> val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
> segmentMs = 60L * 60L * 1000L *
> config.logRollHours,
> flushInterval =
> config.logFlushIntervalMessages,
> @@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
> flushCheckpointMs =
> config.logFlushOffsetCheckpointIntervalMs,
> retentionCheckMs = config.logCleanupIntervalMs,
> scheduler = kafkaScheduler,
> + brokerState = brokerState,
> time = time)
> }
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> index acda52b..cef3b84 100644
> --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> @@ -52,6 +52,14 @@ class KafkaServerStartable(val serverConfig:
> KafkaConfig) extends Logging {
> }
> }
>
> + /**
> + * Allow setting broker state from the startable.
> + * This is needed when a custom kafka server startable want to emit new
> states that it introduces.
> + */
> + def setServerState(newState: Byte) {
> + server.brokerState.newState(newState)
> + }
> +
> def awaitShutdown() =
> server.awaitShutdown
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> index be1a1ee..d03d4c4 100644
> --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> @@ -21,10 +21,9 @@ import java.io._
> import junit.framework.Assert._
> import org.junit.Test
> import org.scalatest.junit.JUnit3Suite
> -import kafka.server.KafkaConfig
> +import kafka.server.{BrokerState, OffsetCheckpoint}
> import kafka.common._
> import kafka.utils._
> -import kafka.server.OffsetCheckpoint
>
> class LogManagerTest extends JUnit3Suite {
>
> @@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite {
> flushCheckpointMs = 100000L,
> retentionCheckMs = 1000L,
> scheduler = time.scheduler,
> - time = time)
> + time = time,
> + brokerState = new BrokerState())
> logManager.startup
> logDir = logManager.logDirs(0)
> }
> @@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite {
> logManager.shutdown()
>
> val config = logConfig.copy(segmentSize = 10 * (setSize - 1),
> retentionSize = 5L * 10L * setSize + 10L)
> - logManager = new LogManager(Array(logDir), Map(), config,
> cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time)
> + logManager = new LogManager(
> + logDirs = Array(logDir),
> + topicConfigs = Map(),
> + defaultConfig = config,
> + cleanerConfig = cleanerConfig,
> + flushCheckMs = 1000L,
> + flushCheckpointMs = 100000L,
> + retentionCheckMs = 1000L,
> + scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> + time = time
> + )
> logManager.startup
>
> // create a log
> @@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite {
> def testTimeBasedFlush() {
> logManager.shutdown()
> val config = logConfig.copy(flushMs = 1000)
> - logManager = new LogManager(Array(logDir), Map(), config,
> cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
> + logManager = new LogManager(
> + logDirs = Array(logDir),
> + topicConfigs = Map(),
> + defaultConfig = config,
> + cleanerConfig = cleanerConfig,
> + flushCheckMs = 1000L,
> + flushCheckpointMs = 10000L,
> + retentionCheckMs = 1000L,
> + scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> + time = time
> + )
> logManager.startup
> val log = logManager.createLog(TopicAndPartition(name, 0), config)
> val lastFlush = log.lastFlushTime
> @@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite {
> TestUtils.tempDir(),
> TestUtils.tempDir())
> logManager.shutdown()
> - logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig,
> 1000L, 10000L, 1000L, time.scheduler, time)
> + logManager = new LogManager(
> + logDirs = dirs,
> + topicConfigs = Map(),
> + defaultConfig = logConfig,
> + cleanerConfig = cleanerConfig,
> + flushCheckMs = 1000L,
> + flushCheckpointMs = 10000L,
> + retentionCheckMs = 1000L,
> + scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> + time = time
> + )
>
> // verify that logs are always assigned to the least loaded partition
> for(partition <- 0 until 20) {
> @@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite {
> @Test
> def testTwoLogManagersUsingSameDirFails() {
> try {
> - new LogManager(Array(logDir), Map(), logConfig, cleanerConfig,
> 1000L, 10000L, 1000L, time.scheduler, time)
> + new LogManager(
> + logDirs = Array(logDir),
> + topicConfigs = Map(),
> + defaultConfig = logConfig,
> + cleanerConfig = cleanerConfig,
> + flushCheckMs = 1000L,
> + flushCheckpointMs = 10000L,
> + retentionCheckMs = 1000L,
> + scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> + time = time
> + )
> fail("Should not be able to create a second log manager instance
> with the same data directory")
> } catch {
> case e: KafkaException => // this is good
> @@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite {
> flushCheckpointMs = 100000L,
> retentionCheckMs = 1000L,
> scheduler = time.scheduler,
> - time = time)
> + time = time,
> + brokerState = new BrokerState())
> logManager.startup
> verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)),
> logManager)
> }
> @@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite {
> flushCheckpointMs = 100000L,
> retentionCheckMs = 1000L,
> scheduler = time.scheduler,
> - time = time)
> + time = time,
> + brokerState = new BrokerState())
> logManager.startup
> verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)),
> logManager)
> }
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> ----------------------------------------------------------------------
> diff --git
> a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> index a78f7cf..558a5d6 100644
> ---
> a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> +++
> b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> @@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
>
> flushCheckpointMs = 10000L,
> retentionCheckMs
> = 30000,
> scheduler = new
> KafkaScheduler(1),
> + brokerState =
> new BrokerState(),
> time = new
> MockTime))
>
> @After
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> ----------------------------------------------------------------------
> diff --git
> a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> index 41ebc7a..518d416 100644
> --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> @@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite {
> flushCheckpointMs = 100000L,
> retentionCheckMs = 1000L,
> scheduler = time.scheduler,
> + brokerState = new BrokerState(),
> time = time)
> }
>
>
>