You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/08/14 16:56:22 UTC

[kafka] branch trunk updated: KAFKA-7335; Store clusterId locally to ensure broker joins the right cluster (#7189)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d169f5  KAFKA-7335; Store clusterId locally to ensure broker joins the right cluster (#7189)
4d169f5 is described below

commit 4d169f5a859ac1d4a34134ef5b6def574769aa43
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Aug 14 09:56:03 2019 -0700

    KAFKA-7335; Store clusterId locally to ensure broker joins the right cluster (#7189)
    
    This patch stores `clusterId` in the `meta.properties` file. During startup, the broker checks that it joins the correct cluster and fails fast otherwise.
    
    The `meta.properties' is versioned. I have decided to not bump the version because 1) the clusterId is null anyway if not present in the file; and 2) bumping it means that rolling back to a previous version won't work.
    
    I have refactored the way the metadata is read and written as it was strongly coupled with the brokerId bits. Now, the metadata is read independently during the startup and used to 1) check the clusterId and 2) get or generate the brokerId (as before).
    
    Reviewers: Stanislav Kozlovski <st...@outlook.com>, Jason Gustafson <ja...@confluent.io>
---
 .../InconsistentBrokerMetadataException.scala      |  27 +++++
 .../common/InconsistentClusterIdException.scala    |  27 +++++
 .../kafka/server/BrokerMetadataCheckpoint.scala    |  14 ++-
 core/src/main/scala/kafka/server/KafkaServer.scala | 115 +++++++++++++--------
 .../kafka/server/ServerGenerateClusterIdTest.scala |  97 ++++++++++++++++-
 5 files changed, 232 insertions(+), 48 deletions(-)

diff --git a/core/src/main/scala/kafka/common/InconsistentBrokerMetadataException.scala b/core/src/main/scala/kafka/common/InconsistentBrokerMetadataException.scala
new file mode 100644
index 0000000..2b11512
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InconsistentBrokerMetadataException.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the BrokerMetadata stored in logDirs is not consistent across logDirs.
+ */
+class InconsistentBrokerMetadataException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this(cause: Throwable) = this(null, cause)
+  def this() = this(null, null)
+}
diff --git a/core/src/main/scala/kafka/common/InconsistentClusterIdException.scala b/core/src/main/scala/kafka/common/InconsistentClusterIdException.scala
new file mode 100644
index 0000000..6868dd8
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InconsistentClusterIdException.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the clusterId stored in logDirs is not consistent with the clusterIs stored in ZK.
+ */
+class InconsistentClusterIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this(cause: Throwable) = this(null, cause)
+  def this() = this(null, null)
+}
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 2b915a0..ffbcb5d 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -24,7 +24,13 @@ import java.util.Properties
 import kafka.utils._
 import org.apache.kafka.common.utils.Utils
 
-case class BrokerMetadata(brokerId: Int)
+case class BrokerMetadata(brokerId: Int,
+                          clusterId: Option[String]) {
+
+  override def toString: String  = {
+    s"BrokerMetadata(brokerId=$brokerId, clusterId=${clusterId.map(_.toString).getOrElse("None")})"
+  }
+}
 
 /**
   * This class saves broker's metadata to a file
@@ -38,6 +44,9 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
         val brokerMetaProps = new Properties()
         brokerMetaProps.setProperty("version", 0.toString)
         brokerMetaProps.setProperty("broker.id", brokerMetadata.brokerId.toString)
+        brokerMetadata.clusterId.foreach { clusterId =>
+          brokerMetaProps.setProperty("cluster.id", clusterId)
+        }
         val temp = new File(file.getAbsolutePath + ".tmp")
         val fileOutputStream = new FileOutputStream(temp)
         try {
@@ -66,7 +75,8 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
         version match {
           case 0 =>
             val brokerId = brokerMetaProps.getIntInRange("broker.id", (0, Int.MaxValue))
-            return Some(BrokerMetadata(brokerId))
+            val clusterId = Option(brokerMetaProps.getString("cluster.id", null))
+            return Some(BrokerMetadata(brokerId, clusterId))
           case _ =>
             throw new IOException("Unrecognized version of the server meta.properties file: " + version)
         }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 6c433b7..21c53b3 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import com.yammer.metrics.core.Gauge
 import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0}
 import kafka.cluster.Broker
-import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
+import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException, InconsistentBrokerMetadataException}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
@@ -210,9 +210,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         _clusterId = getOrGenerateClusterId(zkClient)
         info(s"Cluster ID = $clusterId")
 
+        /* load metadata */
+        val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs
+
+        /* check cluster id */
+        if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
+          throw new InconsistentClusterIdException(
+            s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
+            s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")
+
         /* generate brokerId */
-        val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
-        config.brokerId = brokerId
+        config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
         logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
         this.logIdent = logContext.logPrefix
 
@@ -261,8 +269,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         val brokerInfo = createBrokerInfo
         val brokerEpoch = zkClient.registerBroker(brokerInfo)
 
-        // Now that the broker id is successfully registered, checkpoint it
-        checkpointBrokerId(config.brokerId)
+        // Now that the broker is successfully registered, checkpoint its metadata
+        checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))
 
         /* start token manager */
         tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
@@ -674,28 +682,24 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
 
   /**
-    * Generates new brokerId if enabled or reads from meta.properties based on following conditions
-    * <ol>
-    * <li> config has no broker.id provided and broker id generation is enabled, generates a broker.id based on Zookeeper's sequence
-    * <li> stored broker.id in meta.properties doesn't match in all the log.dirs throws InconsistentBrokerIdException
-    * <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException
-    * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
-    * <ol>
-    *
-    * The log directories whose meta.properties can not be accessed due to IOException will be returned to the caller
-    *
-    * @return A 2-tuple containing the brokerId and a sequence of offline log directories.
-    */
-  private def getBrokerIdAndOfflineDirs: (Int, Seq[String]) = {
-    var brokerId = config.brokerId
-    val brokerIdSet = mutable.HashSet[Int]()
+   * Reads the BrokerMetadata. If the BrokerMetadata doesn't match in all the log.dirs, InconsistentBrokerMetadataException is
+   * thrown.
+   *
+   * The log directories whose meta.properties can not be accessed due to IOException will be returned to the caller
+   *
+   * @return A 2-tuple containing the brokerMetadata and a sequence of offline log directories.
+   */
+  private def getBrokerMetadataAndOfflineDirs: (BrokerMetadata, Seq[String]) = {
+    val brokerMetadataMap = mutable.HashMap[String, BrokerMetadata]()
+    val brokerMetadataSet = mutable.HashSet[BrokerMetadata]()
     val offlineDirs = mutable.ArrayBuffer.empty[String]
 
     for (logDir <- config.logDirs) {
       try {
         val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
         brokerMetadataOpt.foreach { brokerMetadata =>
-          brokerIdSet.add(brokerMetadata.brokerId)
+          brokerMetadataMap += (logDir -> brokerMetadata)
+          brokerMetadataSet += brokerMetadata
         }
       } catch {
         case e: IOException =>
@@ -704,40 +708,61 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
       }
     }
 
-    if (brokerIdSet.size > 1)
-      throw new InconsistentBrokerIdException(
-        s"Failed to match broker.id across log.dirs. This could happen if multiple brokers shared a log directory (log.dirs) " +
-        s"or partial data was manually copied from another broker. Found $brokerIdSet")
-    else if (brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId)
-      throw new InconsistentBrokerIdException(
-        s"Configured broker.id $brokerId doesn't match stored broker.id ${brokerIdSet.last} in meta.properties. " +
-        s"If you moved your data, make sure your configured broker.id matches. " +
-        s"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
-    else if (brokerIdSet.isEmpty && brokerId < 0 && config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
-      brokerId = generateBrokerId
-    else if (brokerIdSet.size == 1) // pick broker.id from meta.properties
-      brokerId = brokerIdSet.last
+    if (brokerMetadataSet.size > 1) {
+      val builder = StringBuilder.newBuilder
 
+      for ((logDir, brokerMetadata) <- brokerMetadataMap)
+        builder ++= s"- $logDir -> $brokerMetadata\n"
 
-    (brokerId, offlineDirs)
+      throw new InconsistentBrokerMetadataException(
+        s"BrokerMetadata is not consistent across log.dirs. This could happen if multiple brokers shared a log directory (log.dirs) " +
+        s"or partial data was manually copied from another broker. Found:\n${builder.toString()}"
+      )
+    } else if (brokerMetadataSet.size == 1)
+      (brokerMetadataSet.last, offlineDirs)
+    else
+      (BrokerMetadata(-1, None), offlineDirs)
   }
 
-  private def checkpointBrokerId(brokerId: Int) {
-    var logDirsWithoutMetaProps: List[String] = List()
-
+  /**
+   * Checkpoint the BrokerMetadata to all the online log.dirs
+   *
+   * @param brokerMetadata
+   */
+  private def checkpointBrokerMetadata(brokerMetadata: BrokerMetadata) = {
     for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) {
-      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
-      if (brokerMetadataOpt.isEmpty)
-        logDirsWithoutMetaProps ++= List(logDir)
-    }
-
-    for (logDir <- logDirsWithoutMetaProps) {
       val checkpoint = brokerMetadataCheckpoints(logDir)
-      checkpoint.write(BrokerMetadata(brokerId))
+      checkpoint.write(brokerMetadata)
     }
   }
 
   /**
+   * Generates new brokerId if enabled or reads from meta.properties based on following conditions
+   * <ol>
+   * <li> config has no broker.id provided and broker id generation is enabled, generates a broker.id based on Zookeeper's sequence
+   * <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException
+   * <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
+   * <ol>
+   *
+   * @return The brokerId.
+   */
+  private def getOrGenerateBrokerId(brokerMetadata: BrokerMetadata): Int = {
+    val brokerId = config.brokerId
+
+    if (brokerId >= 0 && brokerMetadata.brokerId >= 0 && brokerMetadata.brokerId != brokerId)
+      throw new InconsistentBrokerIdException(
+        s"Configured broker.id $brokerId doesn't match stored broker.id ${brokerMetadata.brokerId} in meta.properties. " +
+        s"If you moved your data, make sure your configured broker.id matches. " +
+        s"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
+    else if (brokerMetadata.brokerId < 0 && brokerId < 0 && config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper
+      generateBrokerId
+    else if (brokerMetadata.brokerId >= 0) // pick broker.id from meta.properties
+      brokerMetadata.brokerId
+    else
+      brokerId
+  }
+
+  /**
     * Return a sequence id generated by updating the broker sequence id path in ZK.
     * Users can provide brokerId in the config. To avoid conflicts between ZK generated
     * sequence id and configured brokerId, we increment the generated sequence id by KafkaConfig.MaxReservedBrokerId.
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
index e00e6c1..4581e23 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -16,20 +16,28 @@
   */
 package kafka.server
 
+import java.io.File
+
+import kafka.common.{InconsistentBrokerMetadataException, InconsistentClusterIdException}
+
 import scala.concurrent._
 import ExecutionContext.Implicits._
 import scala.concurrent.duration._
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.Assert._
-import org.junit.{Before, After, Test}
+import org.junit.{After, Before, Test}
+import org.scalatest.Assertions.assertThrows
 import org.apache.kafka.test.TestUtils.isValidClusterId
 
+import scala.collection.Seq
+
 class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
   var config1: KafkaConfig = null
   var config2: KafkaConfig = null
   var config3: KafkaConfig = null
   var servers: Seq[KafkaServer] = Seq()
+  val brokerMetaPropsFile = "meta.properties"
 
   @Before
   override def setUp() {
@@ -137,4 +145,91 @@ class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
+  @Test
+  def testConsistentClusterIdFromZookeeperAndFromMetaProps() = {
+    // Check at the first boot
+    val server = TestUtils.createServer(config1)
+    val clusterId = server.clusterId
+
+    assertTrue(verifyBrokerMetadata(server.config.logDirs, clusterId))
+
+    server.shutdown()
+
+    // Check again after reboot
+    server.startup()
+
+    assertEquals(clusterId, server.clusterId)
+    assertTrue(verifyBrokerMetadata(server.config.logDirs, server.clusterId))
+
+    server.shutdown()
+
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+  @Test
+  def testInconsistentClusterIdFromZookeeperAndFromMetaProps() = {
+    forgeBrokerMetadata(config1.logDirs, config1.brokerId, "aclusterid")
+
+    val server = new KafkaServer(config1, threadNamePrefix = Option(this.getClass.getName))
+
+    // Startup fails
+    assertThrows[InconsistentClusterIdException] {
+      server.startup()
+    }
+
+    server.shutdown()
+
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+  @Test
+  def testInconsistentBrokerMetadataBetweenMultipleLogDirs() {
+    // Add multiple logDirs with different BrokerMetadata
+    val logDir1 = TestUtils.tempDir().getAbsolutePath
+    val logDir2 = TestUtils.tempDir().getAbsolutePath
+    val logDirs = logDir1 + "," + logDir2
+
+    forgeBrokerMetadata(logDir1, 1, "ebwOKU-zSieInaFQh_qP4g")
+    forgeBrokerMetadata(logDir2, 1, "blaOKU-zSieInaFQh_qP4g")
+
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.setProperty("log.dir", logDirs)
+    val config = KafkaConfig.fromProps(props)
+
+    val server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName))
+
+    // Startup fails
+    assertThrows[InconsistentBrokerMetadataException] {
+      server.startup()
+    }
+
+    server.shutdown()
+
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+  def forgeBrokerMetadata(logDirs: Seq[String], brokerId: Int, clusterId: String) {
+    for (logDir <- logDirs) {
+      forgeBrokerMetadata(logDir, brokerId, clusterId)
+    }
+  }
+
+  def forgeBrokerMetadata(logDir: String, brokerId: Int, clusterId: String) {
+    val checkpoint = new BrokerMetadataCheckpoint(
+      new File(logDir + File.separator + brokerMetaPropsFile))
+    checkpoint.write(BrokerMetadata(brokerId, Option(clusterId)))
+  }
+
+  def verifyBrokerMetadata(logDirs: Seq[String], clusterId: String): Boolean = {
+    for (logDir <- logDirs) {
+      val brokerMetadataOpt = new BrokerMetadataCheckpoint(
+        new File(logDir + File.separator + brokerMetaPropsFile)).read()
+      brokerMetadataOpt match {
+        case Some(brokerMetadata) =>
+          if (brokerMetadata.clusterId.isDefined && brokerMetadata.clusterId.get != clusterId) return false
+        case _ => return false
+      }
+    }
+    true
+  }
 }