You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2024/02/16 18:27:50 UTC

(kafka) branch trunk updated: KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR (#15359)

This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 756f44a3e58 KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR (#15359)
756f44a3e58 is described below

commit 756f44a3e582902827ae45daf382556c7fba99a0
Author: Calvin Liu <83...@users.noreply.github.com>
AuthorDate: Fri Feb 16 10:27:43 2024 -0800

    KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR (#15359)
    
    When completing the partition reassignment, the new ISR should have all the target replicas.
    
    Reviewers: Justine Olshan <jo...@confluent.io>, David Mao <dm...@confluent.io>
---
 .../apache/kafka/controller/PartitionReassignmentReplicas.java |  4 +---
 .../kafka/controller/PartitionReassignmentReplicasTest.java    | 10 ++++++++++
 .../apache/kafka/controller/ReplicationControlManagerTest.java |  6 +++---
 3 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
index 62f84adea4c..56c3188f741 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
@@ -120,9 +120,7 @@ class PartitionReassignmentReplicas {
             }
             if (newTargetReplicas.isEmpty()) return Optional.empty();
         }
-        for (int replica : adding) {
-            if (!newTargetIsr.contains(replica)) return Optional.empty();
-        }
+        if (!newTargetIsr.containsAll(newTargetReplicas)) return Optional.empty();
 
         return Optional.of(
             new CompletedReassignment(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
index dd1d567e587..b2bc540bda5 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
@@ -196,6 +196,16 @@ public class PartitionReassignmentReplicasTest {
                 build()));
     }
 
+    @Test
+    public void testDoesNotCompleteReassignmentIfIsrDoesNotHaveAllTargetReplicas() {
+        PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+            new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(0, 1, 3)));
+        assertTrue(replicas.isReassignmentInProgress());
+        Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional =
+            replicas.maybeCompleteReassignment(Arrays.asList(3));
+        assertFalse(reassignmentOptional.isPresent());
+    }
+
     @Test
     public void testOriginalReplicas() {
         PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index bf7f6c82e05..3d54720be92 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -1989,7 +1989,7 @@ public class ReplicationControlManagerTest {
                 new AlterPartitionReassignmentsRequestData().setTopics(asList(
                     new ReassignableTopic().setName("foo").setPartitions(asList(
                         new ReassignablePartition().setPartitionIndex(0).
-                            setReplicas(asList(1, 2, 3)),
+                            setReplicas(asList(1, 2, 4)),
                         new ReassignablePartition().setPartitionIndex(1).
                             setReplicas(asList(1, 2, 3, 0)),
                         new ReassignablePartition().setPartitionIndex(2).
@@ -2019,11 +2019,11 @@ public class ReplicationControlManagerTest {
                     setErrorMessage(null))))),
             alterResult.response());
         ctx.replay(alterResult.records());
-        assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2}).
+        assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 4}).setIsr(new int[] {1, 2, 4}).
             setDirectories(new Uuid[] {
                     Uuid.fromString("TESTBROKER00001DIRAAAA"),
                     Uuid.fromString("TESTBROKER00002DIRAAAA"),
-                    Uuid.fromString("TESTBROKER00003DIRAAAA")
+                    Uuid.fromString("TESTBROKER00004DIRAAAA")
             }).
             setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(fooId, 0));
         assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 0}).setIsr(new int[] {0, 1, 2}).