You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/02/19 19:32:15 UTC

[kafka] 02/02: KAFKA-12332; Error partitions from topics with invalid IDs in LISR requests (#10143)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0b4aa2b2dfdf11a7b6bcc885e8161b6f2c0ef5e1
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Feb 19 14:08:00 2021 -0500

    KAFKA-12332; Error partitions from topics with invalid IDs in LISR requests (#10143)
    
    Changes how invalid IDs are handled in LeaderAndIsr requests. The ID check now occurs before leader epoch. If the ID exists and is invalid, the partition is ignored and a new `INCONSISTENT_TOPIC_ID` error is returned in the response.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../errors/InconsistentTopicIdException.java       | 27 ++++++++++
 .../org/apache/kafka/common/protocol/Errors.java   |  4 +-
 core/src/main/scala/kafka/cluster/Partition.scala  | 40 +++++++++++++-
 .../main/scala/kafka/server/ReplicaManager.scala   | 31 +++--------
 .../unit/kafka/server/ReplicaManagerTest.scala     | 62 ++++++++++++++++++++--
 5 files changed, 134 insertions(+), 30 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java
new file mode 100644
index 0000000..1dfe468
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InconsistentTopicIdException extends InvalidMetadataException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InconsistentTopicIdException(String message) {
+        super(message);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 03c1248..34c4206 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
 import org.apache.kafka.common.errors.InconsistentVoterSetException;
 import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
@@ -354,7 +355,8 @@ public enum Errors {
         "Requested position is not greater than or equal to zero, and less than the size of the snapshot.",
         PositionOutOfRangeException::new),
     UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new),
-    DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);
+    DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new),
+    INCONSISTENT_TOPIC_ID(102, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 6f6cb88..cfd029b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
@@ -428,6 +428,44 @@ class Partition(val topicPartition: TopicPartition,
       this.log = Some(log)
   }
 
+  /**
+   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
+   *
+   * @param requestTopicId the topic ID from the request
+   * @return true if the request topic id is consistent, false otherwise
+   */
+  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
+    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
+    // The topic ID was not inconsistent, so return true.
+    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
+    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
+      true
+    else {
+      log match {
+        case None => true
+        case Some(log) => {
+          // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+          // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+          // Topic ID is consistent since we are just setting it here.
+          if (log.topicId == Uuid.ZERO_UUID) {
+            log.partitionMetadataFile.write(requestTopicId)
+            log.topicId = requestTopicId
+            true
+          } else if (log.topicId != requestTopicId) {
+            stateChangeLogger.error(s"Topic Id in memory: ${log.topicId} does not" +
+              s" match the topic Id for partition $topicPartition provided in the request: " +
+              s"$requestTopicId.")
+            false
+          } else {
+            // topic ID in log exists and matches request topic ID
+            true
+          }
+        }
+      }
+    }
+  }
+
   // remoteReplicas will be called in the hot path, and must be inexpensive
   def remoteReplicas: Iterable[Replica] =
     remoteReplicasMap.values
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ba50c86..820af7b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -37,7 +37,7 @@ import kafka.server.metadata.ConfigRepository
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -1364,11 +1364,15 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
+              val requestTopicId = topicIds.get(topicPartition.topic)
+
+              if (!partition.checkOrSetTopicId(requestTopicId)) {
+                responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
+              } else if (requestLeaderEpoch > currentLeaderEpoch) {
                 // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
                 // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
                 if (partitionState.replicas.contains(localBrokerId))
@@ -1424,27 +1428,8 @@ class ReplicaManager(val config: KafkaConfig,
            * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
            * we need to map this topic-partition to OfflinePartition instead.
            */
-            val local = localLog(topicPartition)
-            if (local.isEmpty)
+            if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
-            else {
-              val id = topicIds.get(topicPartition.topic())
-              // Ensure we have not received a request from an older protocol
-              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
-                val log = local.get
-                // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
-                // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
-                if (log.topicId.equals(Uuid.ZERO_UUID)) {
-                  log.partitionMetadataFile.write(id)
-                  log.topicId = id
-                  // Warn if the topic ID in the request does not match the log.
-                } else if (!log.topicId.equals(id)) {
-                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
-                    s" match the topic Id provided in the request: " +
-                    s"${id.toString}.")
-                }
-              }
-            }
           }
 
           // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c31bf8e..9b289e5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2229,6 +2229,7 @@ class ReplicaManagerTest {
         .createLogIfNotExists(isNew = false, isFutureReplica = false,
           new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
       val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+      val topicNames = topicIds.asScala.map(_.swap).asJava
 
       def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
         Seq(new LeaderAndIsrPartitionState()
@@ -2244,7 +2245,8 @@ class ReplicaManagerTest {
         topicIds,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
 
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+      val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+      assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val id = topicIds.get(topicPartition.topic())
       val log = replicaManager.localLog(topicPartition).get
@@ -2258,6 +2260,51 @@ class ReplicaManagerTest {
   }
 
   @Test
+  def testInvalidIdReturnsError() = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+      val topicNames = topicIds.asScala.map(_.swap).asJava
+
+      val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+      val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
+
+      def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest =
+        new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(epoch)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(true)).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ())
+      assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
+
+      val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, topicIds), (_, _) => ())
+      assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition))
+
+      // Send request with invalid ID.
+      val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
+      assertEquals(Errors.INCONSISTENT_TOPIC_ID, response3.partitionErrors(invalidTopicNames).get(topicPartition))
+
+      val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(2, invalidTopicIds), (_, _) => ())
+      assertEquals(Errors.INCONSISTENT_TOPIC_ID, response4.partitionErrors(invalidTopicNames).get(topicPartition))
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
   def testPartitionMetadataFileNotCreated() = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
     try {
@@ -2268,6 +2315,7 @@ class ReplicaManagerTest {
         .createLogIfNotExists(isNew = false, isFutureReplica = false,
           new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
       val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> Uuid.randomUuid()).asJava
+      val topicNames = topicIds.asScala.map(_.swap).asJava
 
       def leaderAndIsrRequest(epoch: Int, name: String, version: Short): LeaderAndIsrRequest = LeaderAndIsrRequest.parse(
         new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch,
@@ -2285,28 +2333,32 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build().serialize(), version)
 
       // There is no file if the topic does not have an associated topic ID.
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+      val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val log = replicaManager.localLog(topicPartition).get
       assertFalse(log.partitionMetadataFile.exists())
+      assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
 
       // There is no file if the topic has the default UUID.
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+      val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartition).isEmpty)
       val log2 = replicaManager.localLog(topicPartition).get
       assertFalse(log2.partitionMetadataFile.exists())
+      assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition))
 
       // There is no file if the request an older version
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
+      val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
       val log3 = replicaManager.localLog(topicPartitionFoo).get
       assertFalse(log3.partitionMetadataFile.exists())
+      assertEquals(Errors.NONE, response3.partitionErrors(topicNames).get(topicPartitionFoo))
 
       // There is no file if the request is an older version
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 4), (_, _) => ())
+      val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, "foo", 4), (_, _) => ())
       assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
       val log4 = replicaManager.localLog(topicPartitionFoo).get
       assertFalse(log4.partitionMetadataFile.exists())
+      assertEquals(Errors.NONE, response4.partitionErrors(topicNames).get(topicPartitionFoo))
     } finally replicaManager.shutdown(checkpointHW = false)
   }