You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/08/15 18:40:03 UTC

[kafka] branch 3.3 updated: KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epoch is ahead (#12514)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 16a7713830 KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epoch is ahead (#12514)
16a7713830 is described below

commit 16a77138305c429b3adc62b34c128eb242cadfb5
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Aug 15 11:34:29 2022 -0700

    KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epoch is ahead (#12514)
    
    Similar to https://github.com/apache/kafka/pull/12506. For the Kraft controller, we should return NOT_CONTROLLER if the leader/partition epoch in the request is ahead of the controller.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>
---
 .../kafka/controller/ReplicationControlManager.java | 21 +++++++++++++++++++--
 .../controller/ReplicationControlManagerTest.java   | 12 +++++++++++-
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 4ffb339967..df7097df83 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -125,6 +125,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
 import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION;
 import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
 import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
 import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
 import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
 import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED;
@@ -1071,7 +1072,23 @@ public class ReplicationControlManager {
 
             return UNKNOWN_TOPIC_OR_PARTITION;
         }
-        if (partitionData.leaderEpoch() != partition.leaderEpoch) {
+
+        // If the partition leader has a higher leader/partition epoch, then it is likely
+        // that this node is no longer the active controller. We return NOT_CONTROLLER in
+        // this case to give the leader an opportunity to find the new controller.
+        if (partitionData.leaderEpoch() > partition.leaderEpoch) {
+            log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
+                    "the current leader epoch is {}, which is greater than the local value {}.",
+                brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
+            return NOT_CONTROLLER;
+        }
+        if (partitionData.partitionEpoch() > partition.partitionEpoch) {
+            log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
+                    "the current partition epoch is {}, which is greater than the local value {}.",
+                brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch());
+            return NOT_CONTROLLER;
+        }
+        if (partitionData.leaderEpoch() < partition.leaderEpoch) {
             log.debug("Rejecting AlterPartition request from node {} for {}-{} because " +
                     "the current leader epoch is {}, not {}.", brokerId, topic.name,
                     partitionId, partition.leaderEpoch, partitionData.leaderEpoch());
@@ -1085,7 +1102,7 @@ public class ReplicationControlManager {
 
             return INVALID_REQUEST;
         }
-        if (partitionData.partitionEpoch() != partition.partitionEpoch) {
+        if (partitionData.partitionEpoch() < partition.partitionEpoch) {
             log.info("Rejecting AlterPartition request from node {} for {}-{} because " +
                     "the current partition epoch is {}, not {}.", brokerId,
                     topic.name, partitionId, partition.partitionEpoch,
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index d33776ca10..82e378a982 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -122,6 +122,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT
 import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
 import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
 import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
 import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
 import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
 import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
@@ -960,7 +961,16 @@ public class ReplicationControlManagerTest {
         ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = sendAlterPartition(
             replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
             topicIdPartition.topicId(), invalidLeaderEpochRequest);
-        assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, FENCED_LEADER_EPOCH);
+        assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, NOT_CONTROLLER);
+
+        // Invalid partition epoch
+        PartitionData invalidPartitionEpochRequest = newAlterPartition(
+            replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED);
+        invalidPartitionEpochRequest.setPartitionEpoch(500);
+        ControllerResult<AlterPartitionResponseData> invalidPartitionEpochResult = sendAlterPartition(
+            replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId),
+            topicIdPartition.topicId(), invalidPartitionEpochRequest);
+        assertAlterPartitionResponse(invalidPartitionEpochResult, topicIdPartition, NOT_CONTROLLER);
 
         // Invalid ISR (3 is not a valid replica)
         PartitionData invalidIsrRequest1 = newAlterPartition(