You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2015/01/13 00:45:37 UTC

kafka git commit: KAFKA-1070 Auto assign broker id; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk e52a6181b -> b1b80860a


KAFKA-1070 Auto assign broker id; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1b80860
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1b80860
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1b80860

Branch: refs/heads/trunk
Commit: b1b80860a01cc378cfada3549a3480f0773c3ff8
Parents: e52a618
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Mon Jan 12 15:45:13 2015 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Jan 12 15:45:29 2015 -0800

----------------------------------------------------------------------
 .../common/GenerateBrokerIdException.scala      |  27 +++
 .../common/InconsistentBrokerIdException.scala  |  27 +++
 .../kafka/log/LogCleanerManager.scala.orig      | 203 +++++++++++++++++++
 .../kafka/server/BrokerMetadataCheckpoint.scala |  83 ++++++++
 .../main/scala/kafka/server/KafkaConfig.scala   |  20 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  87 ++++++--
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  38 +++-
 .../server/ServerGenerateBrokerIdTest.scala     | 127 ++++++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala |   9 +-
 9 files changed, 596 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala b/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala
new file mode 100644
index 0000000..13784fe
--- /dev/null
+++ b/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala
@@ -0,0 +1,27 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.common
+
+/**
+  * Thrown when there is a failure to generate a zookeeper sequenceId to use as brokerId
+  */
+class GenerateBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this(cause: Throwable) = this(null, cause)
+  def this() = this(null, null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
new file mode 100644
index 0000000..0c0d1cd
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
@@ -0,0 +1,27 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.common
+
+/**
+  * Indicates the brokerId stored in logDirs is not consistent across logDirs.
+  */
+class InconsistentBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this(cause: Throwable) = this(null, cause)
+  def this() = this(null, null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig b/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig
new file mode 100644
index 0000000..e8ced6a
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import kafka.utils.{Logging, Pool}
+import kafka.server.OffsetCheckpoint
+import collection.mutable
+import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.Utils._
+import java.util.concurrent.TimeUnit
+import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
+
+private[log] sealed trait LogCleaningState
+private[log] case object LogCleaningInProgress extends LogCleaningState
+private[log] case object LogCleaningAborted extends LogCleaningState
+private[log] case object LogCleaningPaused extends LogCleaningState
+
+/**
+ *  Manage the state of each partition being cleaned.
+ *  If a partition is to be cleaned, it enters the LogCleaningInProgress state.
+ *  While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters
+ *  the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state.
+ *  While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
+ *  requested to be resumed.
+ */
+private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
+  
+  override val loggerName = classOf[LogCleaner].getName
+  
+  /* the offset checkpoints holding the last cleaned point for each log */
+  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
+
+  /* the set of logs currently being cleaned */
+  private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
+
+  /* a global lock used to control all access to the in-progress set and the offset checkpoints */
+  private val lock = new ReentrantLock
+  
+  /* for coordinating the pausing and the cleaning of a partition */
+  private val pausedCleaningCond = lock.newCondition()
+  
+  /* a gauge for tracking the cleanable ratio of the dirtiest log */
+  @volatile private var dirtiestLogCleanableRatio = 0.0
+  newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
+
+  /**
+   * @return the position processed for all logs.
+   */
+  def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
+    checkpoints.values.flatMap(_.read()).toMap
+
+   /**
+    * Choose the log to clean next and add it to the in-progress set. We recompute this
+    * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
+    * the log manager maintains.
+    */
+  def grabFilthiestLog(): Option[LogToClean] = {
+    inLock(lock) {
+      val lastClean = allCleanerCheckpoints()
+      val dirtyLogs = logs.filter(l => l._2.config.compact)          // skip any logs marked for delete rather than dedupe
+                          .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
+                          .map(l => LogToClean(l._1, l._2,           // create a LogToClean instance for each
+                                               lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
+                          .filter(l => l.totalBytes > 0)             // skip any empty logs
+      this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
+      val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
+      if(cleanableLogs.isEmpty) {
+        None
+      } else {
+        val filthiest = cleanableLogs.max
+        inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
+        Some(filthiest)
+      }
+    }
+  }
+
+  /**
+   *  Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
+   *  the partition is aborted.
+   *  This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
+   */
+  def abortCleaning(topicAndPartition: TopicAndPartition) {
+    inLock(lock) {
+      abortAndPauseCleaning(topicAndPartition)
+      resumeCleaning(topicAndPartition)
+      info("The cleaning for partition %s is aborted".format(topicAndPartition))
+    }
+  }
+
+  /**
+   *  Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
+   *  This call blocks until the cleaning of the partition is aborted and paused.
+   *  1. If the partition is not in progress, mark it as paused.
+   *  2. Otherwise, first mark the state of the partition as aborted.
+   *  3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it
+   *     throws a LogCleaningAbortedException to stop the cleaning task.
+   *  4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
+   *  5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
+   */
+  def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
+    inLock(lock) {
+      inProgress.get(topicAndPartition) match {
+        case None =>
+          inProgress.put(topicAndPartition, LogCleaningPaused)
+        case Some(state) =>
+          state match {
+            case LogCleaningInProgress =>
+              inProgress.put(topicAndPartition, LogCleaningAborted)
+            case s =>
+              throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
+                                              .format(topicAndPartition, s))
+          }
+      }
+      while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
+        pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
+      info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
+    }
+  }
+
+  /**
+   *  Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
+   */
+  def resumeCleaning(topicAndPartition: TopicAndPartition) {
+    inLock(lock) {
+      inProgress.get(topicAndPartition) match {
+        case None =>
+          throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
+                                          .format(topicAndPartition))
+        case Some(state) =>
+          state match {
+            case LogCleaningPaused =>
+              inProgress.remove(topicAndPartition)
+            case s =>
+              throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
+                                              .format(topicAndPartition, s))
+          }
+      }
+    }
+    info("Compaction for partition %s is resumed".format(topicAndPartition))
+  }
+
+  /**
+   *  Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
+   */
+  def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
+    inProgress.get(topicAndPartition) match {
+      case None => return false
+      case Some(state) =>
+        if (state == expectedState)
+          return true
+        else
+          return false
+    }
+  }
+
+  /**
+   *  Check if the cleaning for a partition is aborted. If so, throw an exception.
+   */
+  def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
+    inLock(lock) {
+      if (isCleaningInState(topicAndPartition, LogCleaningAborted))
+        throw new LogCleaningAbortedException()
+    }
+  }
+
+  /**
+   * Save out the endOffset and remove the given log from the in-progress set, if not aborted.
+   */
+  def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
+    inLock(lock) {
+      inProgress(topicAndPartition) match {
+        case LogCleaningInProgress =>
+          val checkpoint = checkpoints(dataDir)
+          val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
+          checkpoint.write(offsets)
+          inProgress.remove(topicAndPartition)
+        case LogCleaningAborted =>
+          inProgress.put(topicAndPartition, LogCleaningPaused)
+          pausedCleaningCond.signalAll()
+        case s =>
+          throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
new file mode 100644
index 0000000..0e542ff
--- /dev/null
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -0,0 +1,83 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.io._
+import java.util.Properties
+import kafka.utils._
+
+
+case class BrokerMetadata(brokerId: Int)
+
+/**
+  * This class saves broker's metadata to a file
+  */
+class BrokerMetadataCheckpoint(val file: File) extends Logging {
+  private val lock = new Object()
+  new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
+
+  def write(brokerMetadata: BrokerMetadata) = {
+    lock synchronized {
+      try {
+        val brokerMetaProps = new Properties()
+        brokerMetaProps.setProperty("version", 0.toString)
+        brokerMetaProps.setProperty("broker.id", brokerMetadata.brokerId.toString)
+        val temp = new File(file.getAbsolutePath + ".tmp")
+        val fileOutputStream = new FileOutputStream(temp)
+        brokerMetaProps.store(fileOutputStream,"")
+        fileOutputStream.flush()
+        fileOutputStream.getFD().sync()
+        fileOutputStream.close()
+        // swap new BrokerMetadata file with previous one
+        if(!temp.renameTo(file)) {
+          // renameTo() fails on windows if destination file exists.
+          file.delete()
+          if(!temp.renameTo(file))
+            throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath(), file.getAbsolutePath()))
+        }
+      } catch {
+        case ie: IOException =>
+          error("Failed to write meta.properties due to ",ie)
+          throw ie
+      }
+    }
+  }
+
+  def read(): Option[BrokerMetadata] = {
+    lock synchronized {
+      try {
+        val brokerMetaProps = new VerifiableProperties(Utils.loadProps(file.getAbsolutePath()))
+        val version = brokerMetaProps.getIntInRange("version", (0, Int.MaxValue))
+        version match {
+          case 0 =>
+            val brokerId = brokerMetaProps.getIntInRange("broker.id", (0, Int.MaxValue))
+            return Some(BrokerMetadata(brokerId))
+          case _ =>
+            throw new IOException("Unrecognized version of the server meta.properties file: " + version)
+        }
+      } catch {
+        case e: FileNotFoundException =>
+          warn("No meta.properties file under dir %s".format(file.getAbsolutePath(), e.getMessage))
+          None
+        case e1: Exception =>
+          error("Failed to read meta.properties file under dir %s due to %s".format(file.getAbsolutePath(), e1.getMessage))
+          throw e1
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6e26c54..bbd3fd7 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -35,13 +35,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   private def getLogRetentionTimeMillis(): Long = {
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
-    
+
     if(props.containsKey("log.retention.ms")){
        props.getIntInRange("log.retention.ms", (1, Int.MaxValue))
     }
     else if(props.containsKey("log.retention.minutes")){
        millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
-    } 
+    }
     else {
        millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
     }
@@ -49,7 +49,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
 
   private def getLogRollTimeMillis(): Long = {
     val millisInHour = 60L * 60L * 1000L
-    
+
     if(props.containsKey("log.roll.ms")){
        props.getIntInRange("log.roll.ms", (1, Int.MaxValue))
     }
@@ -71,8 +71,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
 
   /*********** General Configuration ***********/
 
-  /* the broker id for this server */
-  val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
+  /* Max number that can be used for a broker.id  */
+  val MaxReservedBrokerId = props.getIntInRange("reserved.broker.max.id", 1000, (0, Int.MaxValue))
+
+  /* The broker id for this server.
+   * To avoid conflicts between zookeeper generated brokerId and user's config.brokerId
+   * added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1.
+   */
+  var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, MaxReservedBrokerId)) else -1
 
   /* the maximum size of message that the server can receive */
   val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
@@ -117,10 +123,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
 
   /* the maximum number of bytes in a socket request */
   val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue))
-  
+
   /* the maximum number of connections we allow from each ip address */
   val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue))
-  
+
   /* per-ip or hostname overrides to the default maximum number of connections */
   val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1691ad7..a069eb9 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -25,11 +25,12 @@ import kafka.utils._
 import java.util.concurrent._
 import atomic.{AtomicInteger, AtomicBoolean}
 import java.io.File
+import collection.mutable
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.Broker
 import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
-import kafka.common.ErrorMapping
+import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
 import kafka.network.{Receive, BlockingChannel, SocketServer}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
@@ -39,10 +40,11 @@ import com.yammer.metrics.core.Gauge
  * to start up and shutdown a single Kafka node.
  */
 class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
-  this.logIdent = "[Kafka Server " + config.brokerId + "], "
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
   private var startupComplete = new AtomicBoolean(false)
+  private var brokerId: Int = -1
+
   val brokerState: BrokerState = new BrokerState
   val correlationId: AtomicInteger = new AtomicInteger(0)
   var socketServer: SocketServer = null
@@ -56,6 +58,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   var kafkaController: KafkaController = null
   val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
   var zkClient: ZkClient = null
+  val brokerMetaPropsFile = "meta.properties"
+  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
 
   newGauge(
     "BrokerState",
@@ -77,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
       /* start scheduler */
       kafkaScheduler.startup()
-    
+
       /* setup zookeeper */
       zkClient = initZk()
 
@@ -85,6 +89,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       logManager = createLogManager(zkClient, brokerState)
       logManager.startup()
 
+      /* generate brokerId */
+      config.brokerId =  getBrokerId
+      this.logIdent = "[Kafka Server " + config.brokerId + "], "
+
       socketServer = new SocketServer(config.brokerId,
                                       config.hostName,
                                       config.port,
@@ -104,26 +112,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       offsetManager = createOffsetManager()
 
       kafkaController = new KafkaController(config, zkClient, brokerState)
-    
+
       /* start processing requests */
       apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
       requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
       brokerState.newState(RunningAsBroker)
-   
+
       Mx4jLoader.maybeLoad()
 
       replicaManager.startup()
 
       kafkaController.startup()
-    
+
       topicConfigManager = new TopicConfigManager(zkClient, logManager)
       topicConfigManager.startup()
-    
+
       /* tell everyone we are alive */
       kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
       kafkaHealthcheck.startup()
 
-    
       registerStats()
       startupComplete.set(true)
       info("started")
@@ -181,10 +188,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       info("Starting controlled shutdown")
       var channel : BlockingChannel = null
       var prevController : Broker = null
-      var shutdownSuceeded : Boolean = false
+      var shutdownSucceeded : Boolean = false
       try {
         brokerState.newState(PendingControlledShutdown)
-        while (!shutdownSuceeded && remainingRetries > 0) {
+        while (!shutdownSucceeded && remainingRetries > 0) {
           remainingRetries = remainingRetries - 1
 
           // 1. Find the controller and establish a connection to it.
@@ -223,7 +230,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
               val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
               if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
                   shutdownResponse.partitionsRemaining.size == 0) {
-                shutdownSuceeded = true
+                shutdownSucceeded = true
                 info ("Controlled shutdown succeeded")
               }
               else {
@@ -239,7 +246,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                 // ignore and try again
             }
           }
-          if (!shutdownSuceeded) {
+          if (!shutdownSucceeded) {
             Thread.sleep(config.controlledShutdownRetryBackoffMs)
             warn("Retrying controlled shutdown after the previous attempt failed...")
           }
@@ -251,7 +258,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
           channel = null
         }
       }
-      if (!shutdownSuceeded) {
+      if (!shutdownSucceeded) {
         warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
       }
     }
@@ -307,7 +314,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   def awaitShutdown(): Unit = shutdownLatch.await()
 
   def getLogManager(): LogManager = logManager
-  
+
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
     val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
                                      segmentMs = config.logRollTimeMillis,
@@ -359,5 +366,55 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler)
   }
 
-}
+  /**
+    * Generates new brokerId or reads from meta.properties based on following conditions
+    * <ol>
+    * <li> config has no broker.id provided , generates a broker.id based on Zookeeper's sequence
+    * <li> stored broker.id in meta.properties doesn't match in all the log.dirs throws InconsistentBrokerIdException
+    * <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException
+    * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
+    * <ol>
+    * @returns A brokerId.
+    */
+  private def getBrokerId: Int =  {
+    var brokerId = config.brokerId
+    var logDirsWithoutMetaProps: List[String] = List()
+    val brokerIdSet = mutable.HashSet[Int]()
+
+    for (logDir <- config.logDirs) {
+      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
+      brokerMetadataOpt match {
+        case Some(brokerMetadata: BrokerMetadata) =>
+          brokerIdSet.add(brokerMetadata.brokerId)
+        case None =>
+          logDirsWithoutMetaProps ++= List(logDir)
+      }
+    }
+
+    if(brokerIdSet.size > 1)
+      throw new InconsistentBrokerIdException("Failed to match brokerId across logDirs")
+    else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId)
+      throw new InconsistentBrokerIdException("Configured brokerId %s doesn't match stored brokerId %s in meta.properties".format(brokerId, brokerIdSet.last))
+    else if(brokerIdSet.size == 0 && brokerId < 0)  // generate a new brokerId from Zookeeper
+      brokerId = generateBrokerId
+    else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
+      brokerId = brokerIdSet.last
+
+    for(logDir <- logDirsWithoutMetaProps) {
+      val checkpoint = brokerMetadataCheckpoints(logDir)
+      checkpoint.write(new BrokerMetadata(brokerId))
+    }
+
+    return brokerId
+  }
 
+  private def generateBrokerId: Int = {
+    try {
+      ZkUtils.getBrokerSequenceId(zkClient, config.MaxReservedBrokerId)
+    } catch {
+      case e: Exception =>
+        error("Failed to generate broker.id due to ", e)
+        throw new GenerateBrokerIdException("Failed to generate broker.id", e)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 56e3e88..c14bd45 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -46,6 +46,7 @@ object ZkUtils extends Logging {
   val ReassignPartitionsPath = "/admin/reassign_partitions"
   val DeleteTopicsPath = "/admin/delete_topics"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
+  val BrokerSequenceIdPath = "/brokers/seqid"
 
   def getTopicPath(topic: String): String = {
     BrokerTopicsPath + "/" + topic
@@ -87,7 +88,8 @@ object ZkUtils extends Logging {
   }
 
   def setupCommonPaths(zkClient: ZkClient) {
-    for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath))
+    for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath,
+      DeleteTopicsPath, BrokerSequenceIdPath))
       makeSurePersistentPathExists(zkClient, path)
   }
 
@@ -122,6 +124,14 @@ object ZkUtils extends Logging {
     }
   }
 
+  /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk.
+    * users can provide brokerId in the config , inorder to avoid conflicts between zk generated
+    * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId.
+    */
+  def getBrokerSequenceId(zkClient: ZkClient, MaxReservedBrokerId: Int): Int = {
+    getSequenceId(zkClient, BrokerSequenceIdPath) + MaxReservedBrokerId
+  }
+
   /**
    * Gets the in-sync replicas (ISR) for a specific topic and partition
    */
@@ -696,6 +706,32 @@ object ZkUtils extends Logging {
     }
   }
 
+  /**
+    * This API produces a sequence number by creating / updating given path in zookeeper
+    * It uses the stat returned by the zookeeper and return the version. Every time
+    * client updates the path stat.version gets incremented
+    */
+  def getSequenceId(client: ZkClient, path: String): Int = {
+    try {
+      val stat = client.writeDataReturnStat(path, "", -1)
+      return stat.getVersion
+    } catch {
+      case e: ZkNoNodeException => {
+        createParentPath(client, BrokerSequenceIdPath)
+        try {
+          client.createPersistent(BrokerSequenceIdPath, "")
+          return 0
+        } catch {
+          case e: ZkNodeExistsException =>
+            val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
+            return stat.getVersion
+          case e2: Throwable => throw e2
+        }
+      }
+      case e2: Throwable => throw e2
+    }
+  }
+
   def getAllTopics(zkClient: ZkClient): Seq[String] = {
     val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
     if(topics == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
new file mode 100644
index 0000000..cf2dd94
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -0,0 +1,127 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestUtils, Utils}
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import junit.framework.Assert._
+import java.io.File
+
+class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
+  var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
+  var config1 = new KafkaConfig(props1)
+  var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort)
+  var config2 = new KafkaConfig(props2)
+  val brokerMetaPropsFile = "meta.properties"
+
+
+  @Test
+  def testAutoGenerateBrokerId() {
+    var server1 = new KafkaServer(config1)
+    server1.startup()
+    server1.shutdown()
+    assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
+    // restart the server check to see if it uses the brokerId generated previously
+    server1 = new KafkaServer(config1)
+    server1.startup()
+    assertEquals(server1.config.brokerId, 1001)
+    server1.shutdown()
+    Utils.rm(server1.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus
+  }
+
+  @Test
+  def testUserConfigAndGeneratedBrokerId() {
+    // start the server with broker.id as part of config
+    val server1 = new KafkaServer(config1)
+    val server2 = new KafkaServer(config2)
+    val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
+    val config3 = new KafkaConfig(props3)
+    val server3 = new KafkaServer(config3)
+    server1.startup()
+    assertEquals(server1.config.brokerId,1001)
+    server2.startup()
+    assertEquals(server2.config.brokerId,0)
+    server3.startup()
+    assertEquals(server3.config.brokerId,1002)
+    server1.shutdown()
+    server2.shutdown()
+    server3.shutdown()
+    assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001))
+    assertTrue(verifyBrokerMetadata(server2.config.logDirs,0))
+    assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002))
+    Utils.rm(server1.config.logDirs)
+    Utils.rm(server2.config.logDirs)
+    Utils.rm(server3.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus
+  }
+
+  @Test
+  def testMultipleLogDirsMetaProps() {
+    // add multiple logDirs and check if the generate brokerId is stored in all of them
+    val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath +
+    "," + TestUtils.tempDir().getAbsolutePath
+    props1.setProperty("log.dir",logDirs)
+    config1 = new KafkaConfig(props1)
+    var server1 = new KafkaServer(config1)
+    server1.startup()
+    server1.shutdown()
+    assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
+    // addition to log.dirs after generation of a broker.id from zk should be copied over
+    val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath
+    props1.setProperty("log.dir",newLogDirs)
+    config1 = new KafkaConfig(props1)
+    server1 = new KafkaServer(config1)
+    server1.startup()
+    server1.shutdown()
+    assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
+    Utils.rm(server1.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus
+  }
+
+  @Test
+  def testConsistentBrokerIdFromUserConfigAndMetaProps() {
+    // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException
+    var server1 = new KafkaServer(config1) //auto generate broker Id
+    server1.startup()
+    server1.shutdown()
+    server1 = new KafkaServer(config2) // user specified broker id
+    try {
+      server1.startup()
+    } catch {
+      case e: kafka.common.InconsistentBrokerIdException => //success
+    }
+    server1.shutdown()
+    Utils.rm(server1.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus
+  }
+
+  def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
+    for(logDir <- logDirs) {
+      val brokerMetadataOpt = (new BrokerMetadataCheckpoint(
+        new File(logDir + File.separator + brokerMetaPropsFile))).read()
+      brokerMetadataOpt match {
+        case Some(brokerMetadata: BrokerMetadata) =>
+          if (brokerMetadata.brokerId != brokerId)  return false
+        case _ => return false
+      }
+    }
+    true
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1b80860/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c9e8ba2..ac15d34 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -94,7 +94,7 @@ object TestUtils extends Logging {
         Utils.rm(f)
       }
     })
-    
+
     f
   }
 
@@ -154,7 +154,7 @@ object TestUtils extends Logging {
   def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
     enableControlledShutdown: Boolean = true): Properties = {
     val props = new Properties
-    props.put("broker.id", nodeId.toString)
+    if (nodeId >= 0) props.put("broker.id", nodeId.toString)
     props.put("host.name", "localhost")
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
@@ -700,6 +700,11 @@ object TestUtils extends Logging {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
   }
 
+  def verifyNonDaemonThreadsStatus() {
+    assertEquals(0, Thread.getAllStackTraces.keySet().toArray
+      .map(_.asInstanceOf[Thread])
+      .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka")))
+  }
 
   /**
    * Create new LogManager instance with default configuration for testing