You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2024/02/16 18:27:50 UTC
(kafka) branch trunk updated: KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR (#15359)
This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 756f44a3e58 KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR (#15359)
756f44a3e58 is described below
commit 756f44a3e582902827ae45daf382556c7fba99a0
Author: Calvin Liu <83...@users.noreply.github.com>
AuthorDate: Fri Feb 16 10:27:43 2024 -0800
KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR (#15359)
When completing the partition reassignment, the new ISR should have all the target replicas.
Reviewers: Justine Olshan <jo...@confluent.io>, David Mao <dm...@confluent.io>
---
.../apache/kafka/controller/PartitionReassignmentReplicas.java | 4 +---
.../kafka/controller/PartitionReassignmentReplicasTest.java | 10 ++++++++++
.../apache/kafka/controller/ReplicationControlManagerTest.java | 6 +++---
3 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
index 62f84adea4c..56c3188f741 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
@@ -120,9 +120,7 @@ class PartitionReassignmentReplicas {
}
if (newTargetReplicas.isEmpty()) return Optional.empty();
}
- for (int replica : adding) {
- if (!newTargetIsr.contains(replica)) return Optional.empty();
- }
+ if (!newTargetIsr.containsAll(newTargetReplicas)) return Optional.empty();
return Optional.of(
new CompletedReassignment(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
index dd1d567e587..b2bc540bda5 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java
@@ -196,6 +196,16 @@ public class PartitionReassignmentReplicasTest {
build()));
}
+ @Test
+ public void testDoesNotCompleteReassignmentIfIsrDoesNotHaveAllTargetReplicas() {
+ PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
+ new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(0, 1, 3)));
+ assertTrue(replicas.isReassignmentInProgress());
+ Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional =
+ replicas.maybeCompleteReassignment(Arrays.asList(3));
+ assertFalse(reassignmentOptional.isPresent());
+ }
+
@Test
public void testOriginalReplicas() {
PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index bf7f6c82e05..3d54720be92 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -1989,7 +1989,7 @@ public class ReplicationControlManagerTest {
new AlterPartitionReassignmentsRequestData().setTopics(asList(
new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
- setReplicas(asList(1, 2, 3)),
+ setReplicas(asList(1, 2, 4)),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(asList(1, 2, 3, 0)),
new ReassignablePartition().setPartitionIndex(2).
@@ -2019,11 +2019,11 @@ public class ReplicationControlManagerTest {
setErrorMessage(null))))),
alterResult.response());
ctx.replay(alterResult.records());
- assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2}).
+ assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 4}).setIsr(new int[] {1, 2, 4}).
setDirectories(new Uuid[] {
Uuid.fromString("TESTBROKER00001DIRAAAA"),
Uuid.fromString("TESTBROKER00002DIRAAAA"),
- Uuid.fromString("TESTBROKER00003DIRAAAA")
+ Uuid.fromString("TESTBROKER00004DIRAAAA")
}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(1).setPartitionEpoch(2).build(), replication.getPartition(fooId, 0));
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 0}).setIsr(new int[] {0, 1, 2}).