You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Neha Narkhede <ne...@gmail.com> on 2014/05/07 03:02:05 UTC

Re: git commit: kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel Koshy and Jun Rao

I think we forgot to commit the new BrokerState file.


On Tue, May 6, 2014 at 4:36 PM, <ju...@apache.org> wrote:

> Repository: kafka
> Updated Branches:
>   refs/heads/trunk 44c39c4ea -> 9b6bf4078
>
>
> kafka-1384; Log Broker state; patched by Timothy Chen; reviewed by Joel
> Koshy and Jun Rao
>
>
> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
> Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9b6bf407
> Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9b6bf407
> Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9b6bf407
>
> Branch: refs/heads/trunk
> Commit: 9b6bf407874ef0fda12d8b2cc7f8331ce4aebeea
> Parents: 44c39c4
> Author: Timothy Chen <tn...@gmail.com>
> Authored: Tue May 6 16:36:09 2014 -0700
> Committer: Jun Rao <ju...@gmail.com>
> Committed: Tue May 6 16:36:09 2014 -0700
>
> ----------------------------------------------------------------------
>  .../kafka/controller/KafkaController.scala      |  7 ++-
>  core/src/main/scala/kafka/log/LogManager.scala  |  6 +-
>  .../main/scala/kafka/server/KafkaServer.scala   | 26 ++++++--
>  .../kafka/server/KafkaServerStartable.scala     |  8 +++
>  .../scala/unit/kafka/log/LogManagerTest.scala   | 64 +++++++++++++++++---
>  .../server/HighwatermarkPersistenceTest.scala   |  1 +
>  .../unit/kafka/server/ReplicaManagerTest.scala  |  1 +
>  7 files changed, 97 insertions(+), 16 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/controller/KafkaController.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
> b/core/src/main/scala/kafka/controller/KafkaController.scala
> index 401bf1e..2fa1341 100644
> --- a/core/src/main/scala/kafka/controller/KafkaController.scala
> +++ b/core/src/main/scala/kafka/controller/KafkaController.scala
> @@ -28,7 +28,6 @@ import kafka.cluster.Broker
>  import kafka.common._
>  import kafka.log.LogConfig
>  import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
> -import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
>  import kafka.utils.ZkUtils._
>  import kafka.utils._
>  import kafka.utils.Utils._
> @@ -37,6 +36,8 @@ import org.I0Itec.zkclient.{IZkDataListener,
> IZkStateListener, ZkClient}
>  import org.I0Itec.zkclient.exception.{ZkNodeExistsException,
> ZkNoNodeException}
>  import java.util.concurrent.atomic.AtomicInteger
>  import java.util.concurrent.locks.ReentrantLock
> +import scala.None
> +import kafka.server._
>  import scala.Some
>  import kafka.common.TopicAndPartition
>
> @@ -154,7 +155,7 @@ object KafkaController extends Logging {
>    }
>  }
>
> -class KafkaController(val config : KafkaConfig, zkClient: ZkClient)
> extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
> +class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val
> brokerState: BrokerState) extends Logging with KafkaMetricsGroup with
> KafkaControllerMBean {
>    this.logIdent = "[Controller " + config.brokerId + "]: "
>    private var isRunning = true
>    private val stateChangeLogger = KafkaController.stateChangeLogger
> @@ -316,6 +317,7 @@ class KafkaController(val config : KafkaConfig,
> zkClient: ZkClient) extends Logg
>        controllerContext.allTopics.foreach(topic =>
> partitionStateMachine.registerPartitionChangeListener(topic))
>        Utils.registerMBean(this, KafkaController.MBeanName)
>        info("Broker %d is ready to serve as the new controller with epoch
> %d".format(config.brokerId, epoch))
> +      brokerState.newState(RunningAsController)
>        maybeTriggerPartitionReassignment()
>        maybeTriggerPreferredReplicaElection()
>        /* send partition leadership info to all live brokers */
> @@ -351,6 +353,7 @@ class KafkaController(val config : KafkaConfig,
> zkClient: ZkClient) extends Logg
>          controllerContext.controllerChannelManager.shutdown()
>          controllerContext.controllerChannelManager = null
>        }
> +      brokerState.newState(RunningAsBroker)
>      }
>    }
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/log/LogManager.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/log/LogManager.scala
> b/core/src/main/scala/kafka/log/LogManager.scala
> index ab72cff..1946c94 100644
> --- a/core/src/main/scala/kafka/log/LogManager.scala
> +++ b/core/src/main/scala/kafka/log/LogManager.scala
> @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
>  import kafka.utils._
>  import scala.collection._
>  import kafka.common.{TopicAndPartition, KafkaException}
> -import kafka.server.OffsetCheckpoint
> +import kafka.server.{RecoveringFromUncleanShutdown, BrokerState,
> OffsetCheckpoint}
>
>  /**
>   * The entry point to the kafka log management subsystem. The log manager
> is responsible for log creation, retrieval, and cleaning.
> @@ -43,6 +43,7 @@ class LogManager(val logDirs: Array[File],
>                   val flushCheckpointMs: Long,
>                   val retentionCheckMs: Long,
>                   scheduler: Scheduler,
> +                 val brokerState: BrokerState,
>                   private val time: Time) extends Logging {
>    val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
>    val LockFile = ".lock"
> @@ -109,6 +110,9 @@ class LogManager(val logDirs: Array[File],
>          val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
>          if(cleanShutDownFile.exists())
>            info("Found clean shutdown file. Skipping recovery for all logs
> in data directory '%s'".format(dir.getAbsolutePath))
> +        else
> +          brokerState.newState(RecoveringFromUncleanShutdown)
> +
>          for(dir <- subDirs) {
>            if(dir.isDirectory) {
>              info("Loading log '" + dir.getName + "'")
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServer.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
> b/core/src/main/scala/kafka/server/KafkaServer.scala
> index c208f83..c22e51e 100644
> --- a/core/src/main/scala/kafka/server/KafkaServer.scala
> +++ b/core/src/main/scala/kafka/server/KafkaServer.scala
> @@ -31,16 +31,19 @@ import kafka.cluster.Broker
>  import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
>  import kafka.common.ErrorMapping
>  import kafka.network.{Receive, BlockingChannel, SocketServer}
> +import kafka.metrics.KafkaMetricsGroup
> +import com.yammer.metrics.core.Gauge
>
>  /**
>   * Represents the lifecycle of a single Kafka broker. Handles all
> functionality required
>   * to start up and shutdown a single Kafka node.
>   */
> -class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
> extends Logging {
> +class KafkaServer(val config: KafkaConfig, time: Time = SystemTime)
> extends Logging with KafkaMetricsGroup {
>    this.logIdent = "[Kafka Server " + config.brokerId + "], "
>    private var isShuttingDown = new AtomicBoolean(false)
>    private var shutdownLatch = new CountDownLatch(1)
>    private var startupComplete = new AtomicBoolean(false)
> +  val brokerState: BrokerState = new BrokerState
>    val correlationId: AtomicInteger = new AtomicInteger(0)
>    var socketServer: SocketServer = null
>    var requestHandlerPool: KafkaRequestHandlerPool = null
> @@ -54,12 +57,20 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>    val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
>    var zkClient: ZkClient = null
>
> +  newGauge(
> +    "BrokerState",
> +    new Gauge[Int] {
> +      def value = brokerState.currentState
> +    }
> +  )
> +
>    /**
>     * Start up API for bringing up a single instance of the Kafka server.
>     * Instantiates the LogManager, the SocketServer and the request
> handlers - KafkaRequestHandlers
>     */
>    def startup() {
>      info("starting")
> +    brokerState.newState(Starting)
>      isShuttingDown = new AtomicBoolean(false)
>      shutdownLatch = new CountDownLatch(1)
>
> @@ -70,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time =
> SystemTime) extends Logg
>      zkClient = initZk()
>
>      /* start log manager */
> -    logManager = createLogManager(zkClient)
> +    logManager = createLogManager(zkClient, brokerState)
>      logManager.startup()
>
>      socketServer = new SocketServer(config.brokerId,
> @@ -88,11 +99,12 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>      /* start offset manager */
>      offsetManager = createOffsetManager()
>
> -    kafkaController = new KafkaController(config, zkClient)
> +    kafkaController = new KafkaController(config, zkClient, brokerState)
>
>      /* start processing requests */
>      apis = new KafkaApis(socketServer.requestChannel, replicaManager,
> offsetManager, zkClient, config.brokerId, config, kafkaController)
>      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
> socketServer.requestChannel, apis, config.numIoThreads)
> +    brokerState.newState(RunningAsBroker)
>
>      Mx4jLoader.maybeLoad()
>
> @@ -143,6 +155,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>        var prevController : Broker = null
>        var shutdownSuceeded : Boolean = false
>        try {
> +        brokerState.newState(PendingControlledShutdown)
>          while (!shutdownSuceeded && remainingRetries > 0) {
>            remainingRetries = remainingRetries - 1
>
> @@ -177,7 +190,9 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>                // send the controlled shutdown request
>                val request = new
> ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
>                channel.send(request)
> +
>                response = channel.receive()
> +
>                val shutdownResponse =
> ControlledShutdownResponse.readFrom(response.buffer)
>                if (shutdownResponse.errorCode == ErrorMapping.NoError &&
> shutdownResponse.partitionsRemaining != null &&
>                    shutdownResponse.partitionsRemaining.size == 0) {
> @@ -223,6 +238,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>      val canShutdown = isShuttingDown.compareAndSet(false, true)
>      if (canShutdown) {
>        Utils.swallow(controlledShutdown())
> +      brokerState.newState(BrokerShuttingDown)
>        if(kafkaHealthcheck != null)
>          Utils.swallow(kafkaHealthcheck.shutdown())
>        if(socketServer != null)
> @@ -243,6 +259,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>        if(zkClient != null)
>          Utils.swallow(zkClient.close())
>
> +      brokerState.newState(NotRunning)
>        shutdownLatch.countDown()
>        startupComplete.set(false)
>        info("shut down completed")
> @@ -256,7 +273,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>
>    def getLogManager(): LogManager = logManager
>
> -  private def createLogManager(zkClient: ZkClient): LogManager = {
> +  private def createLogManager(zkClient: ZkClient, brokerState:
> BrokerState): LogManager = {
>      val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
>                                       segmentMs = 60L * 60L * 1000L *
> config.logRollHours,
>                                       flushInterval =
> config.logFlushIntervalMessages,
> @@ -289,6 +306,7 @@ class KafkaServer(val config: KafkaConfig, time: Time
> = SystemTime) extends Logg
>                     flushCheckpointMs =
> config.logFlushOffsetCheckpointIntervalMs,
>                     retentionCheckMs = config.logCleanupIntervalMs,
>                     scheduler = kafkaScheduler,
> +                   brokerState = brokerState,
>                     time = time)
>    }
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> index acda52b..cef3b84 100644
> --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
> @@ -52,6 +52,14 @@ class KafkaServerStartable(val serverConfig:
> KafkaConfig) extends Logging {
>      }
>    }
>
> +  /**
> +   * Allow setting broker state from the startable.
> +   * This is needed when a custom kafka server startable want to emit new
> states that it introduces.
> +   */
> +  def setServerState(newState: Byte) {
> +    server.brokerState.newState(newState)
> +  }
> +
>    def awaitShutdown() =
>      server.awaitShutdown
>
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> ----------------------------------------------------------------------
> diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> index be1a1ee..d03d4c4 100644
> --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
> @@ -21,10 +21,9 @@ import java.io._
>  import junit.framework.Assert._
>  import org.junit.Test
>  import org.scalatest.junit.JUnit3Suite
> -import kafka.server.KafkaConfig
> +import kafka.server.{BrokerState, OffsetCheckpoint}
>  import kafka.common._
>  import kafka.utils._
> -import kafka.server.OffsetCheckpoint
>
>  class LogManagerTest extends JUnit3Suite {
>
> @@ -49,7 +48,8 @@ class LogManagerTest extends JUnit3Suite {
>                                  flushCheckpointMs = 100000L,
>                                  retentionCheckMs = 1000L,
>                                  scheduler = time.scheduler,
> -                                time = time)
> +                                time = time,
> +                                brokerState = new BrokerState())
>      logManager.startup
>      logDir = logManager.logDirs(0)
>    }
> @@ -125,7 +125,18 @@ class LogManagerTest extends JUnit3Suite {
>      logManager.shutdown()
>
>      val config = logConfig.copy(segmentSize = 10 * (setSize - 1),
> retentionSize = 5L * 10L * setSize + 10L)
> -    logManager = new LogManager(Array(logDir), Map(), config,
> cleanerConfig, 1000L, 100000L, 1000L, time.scheduler, time)
> +    logManager = new LogManager(
> +      logDirs = Array(logDir),
> +      topicConfigs = Map(),
> +      defaultConfig = config,
> +      cleanerConfig = cleanerConfig,
> +      flushCheckMs = 1000L,
> +      flushCheckpointMs = 100000L,
> +      retentionCheckMs = 1000L,
> +      scheduler = time.scheduler,
> +      brokerState = new BrokerState(),
> +      time = time
> +    )
>      logManager.startup
>
>      // create a log
> @@ -165,7 +176,18 @@ class LogManagerTest extends JUnit3Suite {
>    def testTimeBasedFlush() {
>      logManager.shutdown()
>      val config = logConfig.copy(flushMs = 1000)
> -    logManager = new LogManager(Array(logDir), Map(), config,
> cleanerConfig, 1000L, 10000L, 1000L, time.scheduler, time)
> +    logManager = new LogManager(
> +      logDirs = Array(logDir),
> +      topicConfigs = Map(),
> +      defaultConfig = config,
> +      cleanerConfig = cleanerConfig,
> +      flushCheckMs = 1000L,
> +      flushCheckpointMs = 10000L,
> +      retentionCheckMs = 1000L,
> +      scheduler = time.scheduler,
> +      brokerState = new BrokerState(),
> +      time = time
> +    )
>      logManager.startup
>      val log = logManager.createLog(TopicAndPartition(name, 0), config)
>      val lastFlush = log.lastFlushTime
> @@ -187,7 +209,18 @@ class LogManagerTest extends JUnit3Suite {
>                       TestUtils.tempDir(),
>                       TestUtils.tempDir())
>      logManager.shutdown()
> -    logManager = new LogManager(dirs, Map(), logConfig, cleanerConfig,
> 1000L, 10000L, 1000L, time.scheduler, time)
> +    logManager = new LogManager(
> +      logDirs = dirs,
> +      topicConfigs = Map(),
> +      defaultConfig = logConfig,
> +      cleanerConfig = cleanerConfig,
> +      flushCheckMs = 1000L,
> +      flushCheckpointMs = 10000L,
> +      retentionCheckMs = 1000L,
> +      scheduler = time.scheduler,
> +      brokerState = new BrokerState(),
> +      time = time
> +    )
>
>      // verify that logs are always assigned to the least loaded partition
>      for(partition <- 0 until 20) {
> @@ -204,7 +237,18 @@ class LogManagerTest extends JUnit3Suite {
>    @Test
>    def testTwoLogManagersUsingSameDirFails() {
>      try {
> -      new LogManager(Array(logDir), Map(), logConfig, cleanerConfig,
> 1000L, 10000L, 1000L, time.scheduler, time)
> +      new LogManager(
> +        logDirs = Array(logDir),
> +        topicConfigs = Map(),
> +        defaultConfig = logConfig,
> +        cleanerConfig = cleanerConfig,
> +        flushCheckMs = 1000L,
> +        flushCheckpointMs = 10000L,
> +        retentionCheckMs = 1000L,
> +        scheduler = time.scheduler,
> +        brokerState = new BrokerState(),
> +        time = time
> +      )
>        fail("Should not be able to create a second log manager instance
> with the same data directory")
>      } catch {
>        case e: KafkaException => // this is good
> @@ -234,7 +278,8 @@ class LogManagerTest extends JUnit3Suite {
>        flushCheckpointMs = 100000L,
>        retentionCheckMs = 1000L,
>        scheduler = time.scheduler,
> -      time = time)
> +      time = time,
> +      brokerState = new BrokerState())
>      logManager.startup
>      verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)),
> logManager)
>    }
> @@ -256,7 +301,8 @@ class LogManagerTest extends JUnit3Suite {
>        flushCheckpointMs = 100000L,
>        retentionCheckMs = 1000L,
>        scheduler = time.scheduler,
> -      time = time)
> +      time = time,
> +      brokerState = new BrokerState())
>      logManager.startup
>      verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)),
> logManager)
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> ----------------------------------------------------------------------
> diff --git
> a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> index a78f7cf..558a5d6 100644
> ---
> a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> +++
> b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
> @@ -40,6 +40,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
>
> flushCheckpointMs = 10000L,
>                                                           retentionCheckMs
> = 30000,
>                                                           scheduler = new
> KafkaScheduler(1),
> +                                                         brokerState =
> new BrokerState(),
>                                                           time = new
> MockTime))
>
>    @After
>
>
> http://git-wip-us.apache.org/repos/asf/kafka/blob/9b6bf407/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> ----------------------------------------------------------------------
> diff --git
> a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> index 41ebc7a..518d416 100644
> --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
> @@ -69,6 +69,7 @@ class ReplicaManagerTest extends JUnit3Suite {
>        flushCheckpointMs = 100000L,
>        retentionCheckMs = 1000L,
>        scheduler = time.scheduler,
> +      brokerState = new BrokerState(),
>        time = time)
>    }
>
>
>