You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/02/19 19:32:15 UTC
[kafka] 02/02: KAFKA-12332;
Error partitions from topics with invalid IDs in LISR requests
(#10143)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0b4aa2b2dfdf11a7b6bcc885e8161b6f2c0ef5e1
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Feb 19 14:08:00 2021 -0500
KAFKA-12332; Error partitions from topics with invalid IDs in LISR requests (#10143)
Changes how invalid IDs are handled in LeaderAndIsr requests. The ID check now occurs before leader epoch. If the ID exists and is invalid, the partition is ignored and a new `INCONSISTENT_TOPIC_ID` error is returned in the response.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../errors/InconsistentTopicIdException.java | 27 ++++++++++
.../org/apache/kafka/common/protocol/Errors.java | 4 +-
core/src/main/scala/kafka/cluster/Partition.scala | 40 +++++++++++++-
.../main/scala/kafka/server/ReplicaManager.scala | 31 +++--------
.../unit/kafka/server/ReplicaManagerTest.scala | 62 ++++++++++++++++++++--
5 files changed, 134 insertions(+), 30 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java
new file mode 100644
index 0000000..1dfe468
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InconsistentTopicIdException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class InconsistentTopicIdException extends InvalidMetadataException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InconsistentTopicIdException(String message) {
+ super(message);
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 03c1248..34c4206 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
import org.apache.kafka.common.errors.InconsistentVoterSetException;
import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
@@ -354,7 +355,8 @@ public enum Errors {
"Requested position is not greater than or equal to zero, and less than the size of the snapshot.",
PositionOutOfRangeException::new),
UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new),
- DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);
+ DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new),
+ INCONSISTENT_TOPIC_ID(102, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 6f6cb88..cfd029b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@@ -428,6 +428,44 @@ class Partition(val topicPartition: TopicPartition,
this.log = Some(log)
}
+ /**
+ * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+ * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
+ *
+ * @param requestTopicId the topic ID from the request
+ * @return true if the request topic id is consistent, false otherwise
+ */
+ def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
+ // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
+ // The topic ID was not inconsistent, so return true.
+ // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
+ if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
+ true
+ else {
+ log match {
+ case None => true
+ case Some(log) => {
+ // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+ // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+ // Topic ID is consistent since we are just setting it here.
+ if (log.topicId == Uuid.ZERO_UUID) {
+ log.partitionMetadataFile.write(requestTopicId)
+ log.topicId = requestTopicId
+ true
+ } else if (log.topicId != requestTopicId) {
+ stateChangeLogger.error(s"Topic Id in memory: ${log.topicId} does not" +
+ s" match the topic Id for partition $topicPartition provided in the request: " +
+ s"$requestTopicId.")
+ false
+ } else {
+ // topic ID in log exists and matches request topic ID
+ true
+ }
+ }
+ }
+ }
+ }
+
// remoteReplicas will be called in the hot path, and must be inexpensive
def remoteReplicas: Iterable[Replica] =
remoteReplicasMap.values
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index ba50c86..820af7b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -37,7 +37,7 @@ import kafka.server.metadata.ConfigRepository
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient
-import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -1364,11 +1364,15 @@ class ReplicaManager(val config: KafkaConfig,
Some(partition)
}
- // Next check partition's leader epoch
+ // Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
- if (requestLeaderEpoch > currentLeaderEpoch) {
+ val requestTopicId = topicIds.get(topicPartition.topic)
+
+ if (!partition.checkOrSetTopicId(requestTopicId)) {
+ responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
+ } else if (requestLeaderEpoch > currentLeaderEpoch) {
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
if (partitionState.replicas.contains(localBrokerId))
@@ -1424,27 +1428,8 @@ class ReplicaManager(val config: KafkaConfig,
* In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
* we need to map this topic-partition to OfflinePartition instead.
*/
- val local = localLog(topicPartition)
- if (local.isEmpty)
+ if (localLog(topicPartition).isEmpty)
markPartitionOffline(topicPartition)
- else {
- val id = topicIds.get(topicPartition.topic())
- // Ensure we have not received a request from an older protocol
- if (id != null && !id.equals(Uuid.ZERO_UUID)) {
- val log = local.get
- // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
- // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
- if (log.topicId.equals(Uuid.ZERO_UUID)) {
- log.partitionMetadataFile.write(id)
- log.topicId = id
- // Warn if the topic ID in the request does not match the log.
- } else if (!log.topicId.equals(id)) {
- stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
- s" match the topic Id provided in the request: " +
- s"${id.toString}.")
- }
- }
- }
}
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c31bf8e..9b289e5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2229,6 +2229,7 @@ class ReplicaManagerTest {
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+ val topicNames = topicIds.asScala.map(_.swap).asJava
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
@@ -2244,7 +2245,8 @@ class ReplicaManagerTest {
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+ val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+ assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds.get(topicPartition.topic())
val log = replicaManager.localLog(topicPartition).get
@@ -2258,6 +2260,51 @@ class ReplicaManagerTest {
}
@Test
+ def testInvalidIdReturnsError() = {
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
+ try {
+ val brokerList = Seq[Integer](0, 1).asJava
+ val topicPartition = new TopicPartition(topic, 0)
+ replicaManager.createPartition(topicPartition)
+ .createLogIfNotExists(isNew = false, isFutureReplica = false,
+ new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+ val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+ val topicNames = topicIds.asScala.map(_.swap).asJava
+
+ val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+ val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
+
+ def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, Uuid]): LeaderAndIsrRequest =
+ new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+ Seq(new LeaderAndIsrPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(0)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(epoch)
+ .setIsr(brokerList)
+ .setZkVersion(0)
+ .setReplicas(brokerList)
+ .setIsNew(true)).asJava,
+ topicIds,
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+ val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topicIds), (_, _) => ())
+ assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
+
+ val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, topicIds), (_, _) => ())
+ assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition))
+
+ // Send request with invalid ID.
+ val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
+ assertEquals(Errors.INCONSISTENT_TOPIC_ID, response3.partitionErrors(invalidTopicNames).get(topicPartition))
+
+ val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(2, invalidTopicIds), (_, _) => ())
+ assertEquals(Errors.INCONSISTENT_TOPIC_ID, response4.partitionErrors(invalidTopicNames).get(topicPartition))
+ } finally replicaManager.shutdown(checkpointHW = false)
+ }
+
+ @Test
def testPartitionMetadataFileNotCreated() = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
@@ -2268,6 +2315,7 @@ class ReplicaManagerTest {
.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> Uuid.randomUuid()).asJava
+ val topicNames = topicIds.asScala.map(_.swap).asJava
def leaderAndIsrRequest(epoch: Int, name: String, version: Short): LeaderAndIsrRequest = LeaderAndIsrRequest.parse(
new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch,
@@ -2285,28 +2333,32 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build().serialize(), version)
// There is no file if the topic does not have an associated topic ID.
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+ val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic", ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val log = replicaManager.localLog(topicPartition).get
assertFalse(log.partitionMetadataFile.exists())
+ assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
// There is no file if the topic has the default UUID.
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
+ val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic, ApiKeys.LEADER_AND_ISR.latestVersion), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val log2 = replicaManager.localLog(topicPartition).get
assertFalse(log2.partitionMetadataFile.exists())
+ assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition))
// There is no file if the request an older version
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
+ val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 0), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
val log3 = replicaManager.localLog(topicPartitionFoo).get
assertFalse(log3.partitionMetadataFile.exists())
+ assertEquals(Errors.NONE, response3.partitionErrors(topicNames).get(topicPartitionFoo))
// There is no file if the request is an older version
- replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "foo", 4), (_, _) => ())
+ val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(1, "foo", 4), (_, _) => ())
assertFalse(replicaManager.localLog(topicPartitionFoo).isEmpty)
val log4 = replicaManager.localLog(topicPartitionFoo).get
assertFalse(log4.partitionMetadataFile.exists())
+ assertEquals(Errors.NONE, response4.partitionErrors(topicNames).get(topicPartitionFoo))
} finally replicaManager.shutdown(checkpointHW = false)
}