You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/12/10 02:16:04 UTC

[kafka] branch 2.3 updated: KAFKA-9212; Ensure LeaderAndIsr state updated in controller context during reassignment (#7795) (#7805)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new baf7766  KAFKA-9212; Ensure LeaderAndIsr state updated in controller context during reassignment (#7795) (#7805)
baf7766 is described below

commit baf7766bd1e7d7b9cc82d37e366bb7c0c6845573
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Dec 9 18:15:33 2019 -0800

    KAFKA-9212; Ensure LeaderAndIsr state updated in controller context during reassignment (#7795) (#7805)
    
    KIP-320 improved fetch semantics by adding leader epoch validation. This relies on
    reliable propagation of leader epoch information from the controller. Unfortunately, we
    have encountered a bug during partition reassignment in which the leader epoch in the
    controller context does not get properly updated. This causes UpdateMetadata requests
    to be sent with stale epoch information which results in the metadata caches on the
    brokers falling out of sync.
    
    This bug has existed for a long time, but it is only a problem due to the new epoch
    validation done by the client. Because the client includes the stale leader epoch in its
    requests, the leader rejects them, yet the stale metadata cache on the brokers prevents
    the consumer from getting the latest epoch. Hence the consumer cannot make progress
    while a reassignment is ongoing.
    
    Although it is straightforward to fix this problem in the controller for the new releases
    (which this patch does), it is not so easy to fix older brokers which means new clients
    could still encounter brokers with this bug. To address this problem, this patch also
    modifies the client to treat the leader epoch returned from the Metadata response as
    "unreliable" if it comes from an older version of the protocol. The client in this case will
    discard the returned epoch and it won't be included in any requests.
    
    Also, note that the correct epoch is still forwarded to replicas correctly in the
    LeaderAndIsr request, so this bug does not affect replication.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Stanislav Kozlovski <st...@outlook.com>, Ismael Juma <is...@juma.me.uk>
---
 .../java/org/apache/kafka/clients/Metadata.java    |  64 ++++++------
 .../kafka/common/requests/MetadataResponse.java    |  28 +++++-
 .../org/apache/kafka/clients/MetadataTest.java     | 109 +++++++++++++++++++++
 .../scala/kafka/controller/KafkaController.scala   |   4 +-
 .../admin/ReassignPartitionsClusterTest.scala      |  41 ++++++++
 5 files changed, 212 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 2a3652a..4614087 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -304,28 +304,29 @@ public class Metadata implements Closeable {
 
                 for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
                     // Even if the partition's metadata includes an error, we need to handle the update to catch new epochs
-                    updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> {
-                        int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH);
-                        Node leader = partitionInfo.leader();
-
-                        if (leader != null && !leader.equals(brokersById.get(leader.id()))) {
-                            // If we are reusing metadata from a previous response (which is possible if it
-                            // contained a larger epoch), we may not have leader information available in the
-                            // latest response. To keep the state consistent, we override the partition metadata
-                            // so that the leader is set consistently with the broker metadata
-
-                            PartitionInfo partitionInfoWithoutLeader = new PartitionInfo(
-                                    partitionInfo.topic(),
-                                    partitionInfo.partition(),
-                                    brokersById.get(leader.id()),
-                                    partitionInfo.replicas(),
-                                    partitionInfo.inSyncReplicas(),
-                                    partitionInfo.offlineReplicas());
-                            partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfoWithoutLeader, epoch));
-                        } else {
-                            partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch));
-                        }
-                    });
+                    updatePartitionInfo(metadata.topic(), partitionMetadata,
+                        metadataResponse.hasReliableLeaderEpochs(), partitionInfoAndEpoch -> {
+                            Node leader = partitionInfoAndEpoch.partitionInfo().leader();
+
+                            if (leader != null && !leader.equals(brokersById.get(leader.id()))) {
+                                // If we are reusing metadata from a previous response (which is possible if it
+                                // contained a larger epoch), we may not have leader information available in the
+                                // latest response. To keep the state consistent, we override the partition metadata
+                                // so that the leader is set consistently with the broker metadata
+                                PartitionInfo partitionInfo = partitionInfoAndEpoch.partitionInfo();
+                                PartitionInfo partitionInfoWithoutLeader = new PartitionInfo(
+                                        partitionInfo.topic(),
+                                        partitionInfo.partition(),
+                                        brokersById.get(leader.id()),
+                                        partitionInfo.replicas(),
+                                        partitionInfo.inSyncReplicas(),
+                                        partitionInfo.offlineReplicas());
+                                partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfoWithoutLeader,
+                                        partitionInfoAndEpoch.epoch()));
+                            } else {
+                                partitions.add(partitionInfoAndEpoch);
+                            }
+                        });
 
                     if (partitionMetadata.error().exception() instanceof InvalidMetadataException) {
                         log.debug("Requesting metadata update for partition {} due to error {}",
@@ -350,25 +351,26 @@ public class Metadata implements Closeable {
      */
     private void updatePartitionInfo(String topic,
                                      MetadataResponse.PartitionMetadata partitionMetadata,
-                                     Consumer<PartitionInfo> partitionInfoConsumer) {
-
+                                     boolean hasReliableLeaderEpoch,
+                                     Consumer<MetadataCache.PartitionInfoAndEpoch> partitionInfoConsumer) {
         TopicPartition tp = new TopicPartition(topic, partitionMetadata.partition());
-        if (partitionMetadata.leaderEpoch().isPresent()) {
+
+        if (hasReliableLeaderEpoch && partitionMetadata.leaderEpoch().isPresent()) {
             int newEpoch = partitionMetadata.leaderEpoch().get();
             // If the received leader epoch is at least the same as the previous one, update the metadata
             if (updateLastSeenEpoch(tp, newEpoch, oldEpoch -> newEpoch >= oldEpoch, false)) {
-                partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata));
+                PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
+                partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info, newEpoch));
             } else {
                 // Otherwise ignore the new metadata and use the previously cached info
-                PartitionInfo previousInfo = cache.cluster().partition(tp);
-                if (previousInfo != null) {
-                    partitionInfoConsumer.accept(previousInfo);
-                }
+                cache.getPartitionInfo(tp).ifPresent(partitionInfoConsumer);
             }
         } else {
             // Handle old cluster formats as well as error responses where leader and epoch are missing
             lastSeenLeaderEpochs.remove(tp);
-            partitionInfoConsumer.accept(MetadataResponse.partitionMetaToInfo(topic, partitionMetadata));
+            PartitionInfo info = MetadataResponse.partitionMetaToInfo(topic, partitionMetadata);
+            partitionInfoConsumer.accept(new MetadataCache.PartitionInfoAndEpoch(info,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH));
         }
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 350da2b..41fa434 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -59,13 +59,25 @@ public class MetadataResponse extends AbstractResponse {
 
     private final MetadataResponseData data;
     private volatile Holder holder;
+    private final boolean hasReliableLeaderEpochs;
 
     public MetadataResponse(MetadataResponseData data) {
-        this.data = data;
+        this(data, true);
     }
 
     public MetadataResponse(Struct struct, short version) {
-        this(new MetadataResponseData(struct, version));
+        // Kafka versions 2.3 and below do not support a sufficient Metadata version to
+        // be able to leverage leader epochs correctly for validation. Specifically, the
+        // brokers cannot be relied on to propagate leader epoch information accurately
+        // while a reassignment is in progress. Relying on a stale epoch can lead to
+        // FENCED_LEADER_EPOCH errors which can prevent consumption throughout the course
+        // of a reassignment. Hence it is safer to disable this validation.
+        this(new MetadataResponseData(struct, version), false);
+    }
+
+    private MetadataResponse(MetadataResponseData data, boolean hasReliableLeaderEpochs) {
+        this.data = data;
+        this.hasReliableLeaderEpochs = hasReliableLeaderEpochs;
     }
 
     @Override
@@ -212,6 +224,18 @@ public class MetadataResponse extends AbstractResponse {
         return this.data.clusterId();
     }
 
+    /**
+     * Check whether the leader epochs returned from the response can be relied on
+     * for epoch validation in Fetch, ListOffsets, and OffsetsForLeaderEpoch requests.
+     * If not, then the client will not retain the leader epochs and hence will not
+     * forward them in requests.
+     *
+     * @return true if the epoch can be used for validation
+     */
+    public boolean hasReliableLeaderEpochs() {
+        return hasReliableLeaderEpochs;
+    }
+
     public static MetadataResponse parse(ByteBuffer buffer, short version) {
         return new MetadataResponse(ApiKeys.METADATA.responseSchema(version).read(buffer), version);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 9a9026d..7bda534 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -29,7 +29,9 @@ import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBrok
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopicCollection;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -45,6 +47,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.Assert.assertEquals;
@@ -144,6 +147,112 @@ public class MetadataTest {
         assertEquals(0, metadata.timeToNextUpdate(now + 1));
     }
 
+    /**
+     * Prior to Kafka version 2.4 (which coincides with Metadata version 9), the broker does not propagate leader epoch
+     * information accurately while a reassignment is in progress, so we cannot rely on it. This is explained in more
+     * detail in MetadataResponse's constructor.
+     */
+    @Test
+    public void testIgnoreLeaderEpochInOlderMetadataResponse() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+
+        MetadataResponsePartition partitionMetadata = new MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setLeaderId(5)
+                .setLeaderEpoch(10)
+                .setReplicaNodes(Arrays.asList(1, 2, 3))
+                .setIsrNodes(Arrays.asList(1, 2, 3))
+                .setOfflineReplicas(Collections.emptyList())
+                .setErrorCode(Errors.NONE.code());
+
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
+                .setName(tp.topic())
+                .setErrorCode(Errors.NONE.code())
+                .setPartitions(Collections.singletonList(partitionMetadata))
+                .setIsInternal(false);
+
+        MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
+        topics.add(topicMetadata);
+
+        MetadataResponseData data = new MetadataResponseData()
+                .setClusterId("clusterId")
+                .setControllerId(0)
+                .setTopics(topics)
+                .setBrokers(new MetadataResponseBrokerCollection());
+
+        for (short version = ApiKeys.METADATA.oldestVersion(); version < 9; version++) {
+            Struct struct = data.toStruct(version);
+            MetadataResponse response = new MetadataResponse(struct, version);
+            assertFalse(response.hasReliableLeaderEpochs());
+            metadata.update(response, 100);
+            assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
+            MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
+            assertEquals(-1, info.epoch());
+        }
+
+        for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++) {
+            Struct struct = data.toStruct(version);
+            MetadataResponse response = new MetadataResponse(struct, version);
+            assertTrue(response.hasReliableLeaderEpochs());
+            metadata.update(response, 100);
+            assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
+            MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
+            assertEquals(10, info.epoch());
+        }
+    }
+
+    @Test
+    public void testStaleMetadata() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+
+        MetadataResponsePartition partitionMetadata = new MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setLeaderId(1)
+                .setLeaderEpoch(10)
+                .setReplicaNodes(Arrays.asList(1, 2, 3))
+                .setIsrNodes(Arrays.asList(1, 2, 3))
+                .setOfflineReplicas(Collections.emptyList())
+                .setErrorCode(Errors.NONE.code());
+
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
+                .setName(tp.topic())
+                .setErrorCode(Errors.NONE.code())
+                .setPartitions(Collections.singletonList(partitionMetadata))
+                .setIsInternal(false);
+
+        MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
+        topics.add(topicMetadata);
+
+        MetadataResponseData data = new MetadataResponseData()
+                .setClusterId("clusterId")
+                .setControllerId(0)
+                .setTopics(topics)
+                .setBrokers(new MetadataResponseBrokerCollection());
+
+        metadata.update(new MetadataResponse(data), 100);
+
+        // Older epoch with changed ISR should be ignored
+        partitionMetadata
+                .setPartitionIndex(tp.partition())
+                .setLeaderId(1)
+                .setLeaderEpoch(9)
+                .setReplicaNodes(Arrays.asList(1, 2, 3))
+                .setIsrNodes(Arrays.asList(1, 2))
+                .setOfflineReplicas(Collections.emptyList())
+                .setErrorCode(Errors.NONE.code());
+
+        metadata.update(new MetadataResponse(data), 101);
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+        assertTrue(metadata.partitionInfoIfCurrent(tp).isPresent());
+        MetadataCache.PartitionInfoAndEpoch info = metadata.partitionInfoIfCurrent(tp).get();
+
+        List<Integer> cachedIsr = Arrays.stream(info.partitionInfo().inSyncReplicas())
+                .map(Node::id).collect(Collectors.toList());
+        assertEquals(Arrays.asList(1, 2, 3), cachedIsr);
+        assertEquals(10, info.epoch());
+    }
+
     @Test
     public void testFailedUpdate() {
         long time = 100;
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 2034c74..29fb0f8 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -947,7 +947,9 @@ class KafkaController(val config: KafkaConfig,
             zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch, controllerContext.epochZkVersion)
           if (successfulUpdates.contains(partition)) {
             val finalLeaderAndIsr = successfulUpdates(partition)
-            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
+            val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch)
+            controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+            finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch)
             info(s"Updated leader epoch for partition $partition to ${finalLeaderAndIsr.leaderEpoch}")
             true
           } else if (failedUpdates.contains(partition)) {
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 654a92e..f629707 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -628,11 +628,52 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
     assertEquals(Seq.empty, zkClient.getReplicasForPartition(new TopicPartition("customers", 0)))
   }
 
+  @Test
+  def testProduceAndConsumeWithReassignmentInProgress(): Unit = {
+    val tp0 = new TopicPartition(topicName, 0)
+
+    startBrokers(Seq(100, 101))
+    adminClient = createAdminClient(servers)
+    createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100)), servers = servers)
+
+    produceMessages(tp0.topic, 500, acks = -1, valueLength = 100 * 1000)
+
+    val command = new ReassignPartitionsCommand(zkClient, None,
+      Map(tp0 -> Seq(100, 101)),
+      adminZkClient = adminZkClient)
+    command.reassignPartitions(Throttle(interBrokerLimit = 1, -1))
+
+    awaitReassignmentInProgress(tp0)
+
+    produceMessages(tp0.topic, 500, acks = -1, valueLength = 64)
+    val consumer = TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers))
+    try {
+      consumer.assign(Seq(tp0).asJava)
+      pollUntilAtLeastNumRecords(consumer, numRecords = 1000)
+    } finally {
+      consumer.close()
+    }
+
+    command.maybeLimit(Throttle(interBrokerLimit = Int.MaxValue, -1))
+
+    waitForReassignmentToComplete()
+    assertEquals(Seq(100, 101), zkClient.getReplicasForPartition(tp0))
+  }
+
   def waitForReassignmentToComplete(pause: Long = 100L) {
     waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
       s"Znode ${ReassignPartitionsZNode.path} wasn't deleted", pause = pause)
   }
 
+  def awaitReassignmentInProgress(topicPartition: TopicPartition): Unit = {
+    waitUntilTrue(() => isAssignmentInProgress(topicPartition),
+      "Timed out waiting for expected reassignment to begin")
+  }
+
+  def isAssignmentInProgress(topicPartition: TopicPartition): Boolean = {
+    zkClient.getPartitionReassignment.contains(topicPartition)
+  }
+
   def json(topic: String*): String = {
     val topicStr = topic.map { t => "{\"topic\": \"" + t + "\"}" }.mkString(",")
     s"""{"topics": [$topicStr],"version":1}"""