You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/01/19 03:40:03 UTC

kafka git commit: KAFKA-3012: Avoid reserved.broker.max.id collisions on upgrade

Repository: kafka
Updated Branches:
  refs/heads/trunk c06542ede -> 5c337d759


KAFKA-3012: Avoid reserved.broker.max.id collisions on upgrade

Provides a configuration to opt out of broker id generation.

Author: Grant Henke <gr...@gmail.com>

Reviewers: Gwen Shapira

Closes #762 from granthenke/id-generation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c337d75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c337d75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c337d75

Branch: refs/heads/trunk
Commit: 5c337d759892ae6d46d6901a24e9b97cebd2a4da
Parents: c06542e
Author: Grant Henke <gr...@gmail.com>
Authored: Mon Jan 18 18:39:55 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Jan 18 18:39:55 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaConfig.scala  | 11 ++++++++++-
 core/src/main/scala/kafka/server/KafkaServer.scala  |  6 +++---
 .../scala/unit/kafka/server/KafkaConfigTest.scala   |  4 +++-
 .../kafka/server/ServerGenerateBrokerIdTest.scala   | 16 ++++++++++++++++
 docs/upgrade.html                                   |  8 +++++++-
 5 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 54b79ab..4911809 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -42,6 +42,7 @@ object Defaults {
   val ZkEnableSecureAcls = false
 
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnable = true
   val MaxReservedBrokerId = 1000
   val BrokerId = -1
   val MessageMaxBytes = 1000000 + MessageSet.LogOverhead
@@ -190,6 +191,7 @@ object KafkaConfig {
   val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
   val ZkEnableSecureAclsProp = "zookeeper.set.acl"
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnableProp = "broker.id.generation.enable"
   val MaxReservedBrokerIdProp = "reserved.broker.max.id"
   val BrokerIdProp = "broker.id"
   val MessageMaxBytesProp = "message.max.bytes"
@@ -338,6 +340,7 @@ object KafkaConfig {
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
   val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server? When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
   val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be generated." +
   "To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids" +
@@ -522,6 +525,7 @@ object KafkaConfig {
       .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)
 
       /** ********* General Configuration ***********/
+      .define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BrokerIdGenerationEnable, MEDIUM, BrokerIdGenerationEnableDoc)
       .define(MaxReservedBrokerIdProp, INT, Defaults.MaxReservedBrokerId, atLeast(0), MEDIUM, MaxReservedBrokerIdDoc)
       .define(BrokerIdProp, INT, Defaults.BrokerId, HIGH, BrokerIdDoc)
       .define(MessageMaxBytesProp, INT, Defaults.MessageMaxBytes, atLeast(0), HIGH, MessageMaxBytesDoc)
@@ -718,6 +722,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
 
   /** ********* General Configuration ***********/
+  val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
   val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
@@ -927,7 +932,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   validateValues()
 
   private def validateValues() {
-    require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
+    if(brokerIdGenerationEnable) {
+      require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
+    } else {
+      require(brokerId >= 0, "broker.id must be equal or greater than 0")
+    }
     require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
     require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index aaa6ea9..454633e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -609,9 +609,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   }
 
   /**
-    * Generates new brokerId or reads from meta.properties based on following conditions
+    * Generates new brokerId if enabled or reads from meta.properties based on following conditions
     * <ol>
-    * <li> config has no broker.id provided , generates a broker.id based on Zookeeper's sequence
+    * <li> config has no broker.id provided and broker id generation is enabled, generates a broker.id based on Zookeeper's sequence
     * <li> stored broker.id in meta.properties doesn't match in all the log.dirs throws InconsistentBrokerIdException
     * <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException
     * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
@@ -637,7 +637,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
       throw new InconsistentBrokerIdException("Failed to match brokerId across logDirs")
     else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId)
       throw new InconsistentBrokerIdException("Configured brokerId %s doesn't match stored brokerId %s in meta.properties".format(brokerId, brokerIdSet.last))
-    else if(brokerIdSet.size == 0 && brokerId < 0)  // generate a new brokerId from Zookeeper
+    else if(brokerIdSet.size == 0 && brokerId < 0 && config.brokerIdGenerationEnable)  // generate a new brokerId from Zookeeper
       brokerId = generateBrokerId
     else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
       brokerId = brokerIdSet.last

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 8a5038f..9ddc2c1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -408,7 +408,7 @@ class KafkaConfigTest {
         case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
-          
+
         case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.HostNameProp => // ignore string
         case KafkaConfig.AdvertisedHostNameProp => //ignore string
@@ -526,6 +526,7 @@ class KafkaConfigTest {
     defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
     // For ZkConnectionTimeoutMs
     defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+    defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
     defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
     defaults.put(KafkaConfig.BrokerIdProp, "1")
     defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
@@ -542,6 +543,7 @@ class KafkaConfigTest {
     val config = KafkaConfig.fromProps(defaults)
     Assert.assertEquals("127.0.0.1:2181", config.zkConnect)
     Assert.assertEquals(1234, config.zkConnectionTimeoutMs)
+    Assert.assertEquals(false, config.brokerIdGenerationEnable)
     Assert.assertEquals(1, config.maxReservedBrokerId)
     Assert.assertEquals(1, config.brokerId)
     Assert.assertEquals("127.0.0.1", config.hostName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 9afb2ca..60ec561 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -82,6 +82,22 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testDisableGeneratedBrokerId() {
+    val props3 = TestUtils.createBrokerConfig(3, zkConnect)
+    props3.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    // Set reserve broker ids to cause collision and ensure disabling broker id generation ignores the setting
+    props3.put(KafkaConfig.MaxReservedBrokerIdProp, "0")
+    val config3 = KafkaConfig.fromProps(props3)
+    val server3 = new KafkaServer(config3)
+    server3.startup()
+    assertEquals(server3.config.brokerId,3)
+    server3.shutdown()
+    assertTrue(verifyBrokerMetadata(server3.config.logDirs,3))
+    CoreUtils.rm(server3.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+  @Test
   def testMultipleLogDirsMetaProps() {
     // add multiple logDirs and check if the generate brokerId is stored in all of them
     val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath +

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c337d75/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index eccd626..c723957 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -39,7 +39,6 @@
     <li> Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker configuration property accordingly. </li>
     <li> Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync. </li>
     <li> Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync. </li>
-    <li> Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics. </li>
     <li> MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single --consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance per source cluster, each with its own consumer configuration. </li>
     <li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em> have been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will still function as usual, only custom code directly importing these classes will be affected. </li>
     <li> The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh. </li>
@@ -49,6 +48,13 @@
     <li> By default all command line tools will print all logging messages to stderr instead of stdout. </li>
 </ul>
 
+<h5><a id="upgrade_901_notable" href="#upgrade_901_notable">Notable changes in 0.9.0.1</a></h5>
+
+<ul>
+    <li> The new broker id generation feature can be disable by setting broker.id.generation.enable to false. </li>
+    <li> Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics. </li>
+</ul>
+
 <h5>Deprecations in 0.9.0.0</h5>
 
 <ul>