You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/15 11:57:39 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

rajinisivaram commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r543243916



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -171,6 +196,14 @@ public long brokerEpoch() {
         return data.ungroupedPartitionStates();
     }
 
+    public Map<String, Uuid> topicIds() {
+        Map<String, Uuid> topicIds = new HashMap<>();
+        for (LeaderAndIsrTopicState ts : data.topicStates()) {
+            topicIds.put(ts.topicName(), ts.topicId());
+        }
+        return topicIds;

Review comment:
       Should be able to replace this with something like:
   ```
   return data.topicStates().stream()
     .collect(Collectors.toMap(LeaderAndIsrTopicState::topicName, LeaderAndIsrTopicState::topicId));
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -58,8 +68,17 @@ public Errors error() {
         Errors error = error();
         if (error != Errors.NONE)

Review comment:
       nit: We should add braces here since there are multiple lines inside the `if` statement

##########
File path: core/src/main/scala/kafka/server/PartitionMetadataFile.scala
##########
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+import java.util.regex.Pattern
+
+import kafka.utils.Logging
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.utils.Utils
+
+
+
+object PartitionMetadataFile {
+  private val LeaderEpochCheckpointFilename = "partition.metadata"
+  private val WhiteSpacesPattern = Pattern.compile(":\\s+")
+  private val CurrentVersion = 0
+
+  def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)
+
+  object PartitionMetadataFileFormatter {
+    def toFile(data: PartitionMetadata): String = {
+      s"version: ${data.version}\ntopic_id: ${data.topicId}"
+    }
+
+  }
+
+  class PartitionMetadataReadBuffer[T](location: String,
+                                       reader: BufferedReader,
+                                       version: Int) extends Logging {
+    def read(): PartitionMetadata = {
+      def malformedLineException(line: String) =
+        new IOException(s"Malformed line in checkpoint file ($location): '$line'")
+
+      var line: String = null
+      var metadataTopicId: Uuid = null
+      try {
+        line = reader.readLine()
+        WhiteSpacesPattern.split(line) match {
+          case Array(_, version) =>
+            if (version.toInt == CurrentVersion) {
+              line = reader.readLine()
+              WhiteSpacesPattern.split(line) match {
+                case Array(_, topicId) => metadataTopicId = Uuid.fromString(topicId)
+                case _ => throw malformedLineException(line)
+              }
+              new PartitionMetadata(CurrentVersion, metadataTopicId)
+            } else {
+              throw new IOException(s"Unrecognized version of the checkpoint file ($location): " + version)

Review comment:
       update error message?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)
+                  } else {
+                    stateChangeLogger.warn("Partition metadata file already contains content.")

Review comment:
       why is this a warning?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -58,8 +68,17 @@ public Errors error() {
         Errors error = error();
         if (error != Errors.NONE)

Review comment:
       nit: We should add braces here since there are multiple lines inside the `if` statement

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get

Review comment:
       Should we do `controllerContext.topicNames.get(topic.topicId).foreach` instead of get to avoid throwing exception if topic is not in the context?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get
+      topic.partitionErrors().forEach { partition =>
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp

Review comment:
       We can make an inner method within this method with the common logic.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -322,6 +327,11 @@ class Log(@volatile private var _dir: File,
     // deletion.
     producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
     loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)
+
+    // Recover topic ID if present
+    if (!partitionMetadataFile.get.isEmpty()) {
+      topicId = partitionMetadataFile.get.read().topicId
+    }

Review comment:
       Seems neater to use `partitionMetadataFile.foreach` instead of using `.get` twice.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1021,6 +1036,13 @@ class Log(@volatile private var _dir: File,
           // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
           initializeLeaderEpochCache()
+          if (!partitionMetadataFile.isEmpty && !partitionMetadataFile.get.isEmpty()) {

Review comment:
       `.foreach` may be better than lots of .`get`s

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1021,6 +1036,13 @@ class Log(@volatile private var _dir: File,
           // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
           initializeLeaderEpochCache()
+          if (!partitionMetadataFile.isEmpty && !partitionMetadataFile.get.isEmpty()) {
+            val partitionMetadata = partitionMetadataFile.get.read()
+            initializePartitionMetadata()
+            partitionMetadataFile.get.write(partitionMetadata.topicId)

Review comment:
       Why are we reading and writing again?

##########
File path: core/src/main/scala/kafka/server/PartitionMetadataFile.scala
##########
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+import java.util.regex.Pattern
+
+import kafka.utils.Logging
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.utils.Utils
+
+
+
+object PartitionMetadataFile {
+  private val LeaderEpochCheckpointFilename = "partition.metadata"

Review comment:
       `LeaderEpochCheckpointFilename` => `PartitionMetadataFilename`?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)

Review comment:
       As above, we can use `id`

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))

Review comment:
       we already have the topic id in `id`.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1445,15 +1483,38 @@ class ReplicaManager(val config: KafkaConfig,
           replicaFetcherManager.shutdownIdleFetcherThreads()
           replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
           onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-          val responsePartitions = responseMap.iterator.map { case (tp, error) =>
-            new LeaderAndIsrPartitionError()
-              .setTopicName(tp.topic)
-              .setPartitionIndex(tp.partition)
-              .setErrorCode(error.code)
-          }.toBuffer
-          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code)
-            .setPartitionErrors(responsePartitions.asJava))
+          if (leaderAndIsrRequest.version() < 5) {
+            val responsePartitions = responseMap.iterator.map { case (tp, error) =>
+              new LeaderAndIsrPartitionError()
+                .setTopicName(tp.topic)
+                .setPartitionIndex(tp.partition)
+                .setErrorCode(error.code)
+            }.toBuffer
+            new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+              .setErrorCode(Errors.NONE.code)
+              .setPartitionErrors(responsePartitions.asJava))
+          } else {
+            val topics = new mutable.HashMap[String, List[LeaderAndIsrPartitionError]]
+            responseMap.asJava.forEach { case (tp, error) =>
+              if (!topics.contains(tp.topic)) {
+                topics.put(tp.topic, List(new LeaderAndIsrPartitionError()
+                                                                .setPartitionIndex(tp.partition)
+                                                                .setErrorCode(error.code)))
+              } else {
+                topics.put(tp.topic, new LeaderAndIsrPartitionError()
+                  .setPartitionIndex(tp.partition)
+                  .setErrorCode(error.code)::topics.get(tp.topic).get)

Review comment:
       Replace `topics.get(tp.topic).get` with `topics(tp.topic)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org