You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/06/02 17:24:41 UTC
[kafka] branch 2.3 updated: KAFKA-8463: Fix redundant reassignment
of tasks when leader worker leaves (#6859)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new a2daa49 KAFKA-8463: Fix redundant reassignment of tasks when leader worker leaves (#6859)
a2daa49 is described below
commit a2daa49ef80dfd8ba5a9128bd59a81c292cc552f
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Sun Jun 2 10:19:19 2019 -0700
KAFKA-8463: Fix redundant reassignment of tasks when leader worker leaves (#6859)
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../IncrementalCooperativeAssignor.java | 4 +-
.../IncrementalCooperativeAssignorTest.java | 103 ++++++++++++++++++---
2 files changed, 94 insertions(+), 13 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index ae36837..8e56ca8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -195,8 +195,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
log.debug("Lost assignments: {}", lostAssignments);
// Derived set: The set of new connectors-and-tasks is a derived set from the set
- // difference of configured - previous
- ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment);
+ // difference of configured - previous - active
+ ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments);
log.debug("New assignments: {}", newSubmissions);
// A collection of the complete assignment
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 71ccefd..7085a7f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -120,6 +120,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(2, 8, 0, 0, "worker1");
// Second assignment with a second worker joining and all connectors running on previous worker
@@ -131,6 +132,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 1, 4, "worker1", "worker2");
// Third assignment after revocations
@@ -141,6 +143,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(1, 4, 0, 0, "worker1", "worker2");
// A fourth rebalance should not change assignments
@@ -151,6 +154,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 0, "worker1", "worker2");
verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -173,6 +177,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(2, 8, 0, 0, "worker1", "worker2");
// Second assignment with only one worker remaining in the group. The worker that left the
@@ -186,6 +191,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(rebalanceDelay, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 0, "worker1");
time.sleep(rebalanceDelay / 2);
@@ -199,6 +205,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(rebalanceDelay / 2, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 0, "worker1");
time.sleep(rebalanceDelay / 2 + 1);
@@ -211,6 +218,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(1, 4, 0, 0, "worker1");
verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -233,6 +241,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(2, 8, 0, 0, "worker1", "worker2");
// Second assignment with only one worker remaining in the group. The worker that left the
@@ -246,6 +255,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(rebalanceDelay, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 0, "worker1");
time.sleep(rebalanceDelay / 2);
@@ -259,6 +269,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(rebalanceDelay / 2, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 0, "worker1");
time.sleep(rebalanceDelay / 4);
@@ -273,6 +284,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(rebalanceDelay / 4, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 0, "worker1", "worker2");
time.sleep(rebalanceDelay / 4);
@@ -286,6 +298,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(1, 4, 0, 0, "worker1", "worker2");
verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -301,16 +314,18 @@ public class IncrementalCooperativeAssignorTest {
when(coordinator.configSnapshot()).thenReturn(configState);
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
- // First assignment with 2 workers and 2 connectors configured but not yet assigned
+ // First assignment with 3 workers and 2 connectors configured but not yet assigned
memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+ memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
- assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
+ assertAssignment(2, 8, 0, 0, "worker1", "worker2", "worker3");
- // Second assignment with only one worker remaining in the group. The worker that left the
+ // Second assignment with two workers remaining in the group. The worker that left the
// group was the leader. The new leader has no previous assignments and is not tracking a
// delay upon a leader's exit
applyAssignments(returnedAssignments);
@@ -328,7 +343,8 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
- assertAssignment(1, 4, 0, 0, "worker2");
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
+ assertAssignment(1, 3, 0, 0, "worker2", "worker3");
// Third (incidental) assignment with still only one worker in the group.
applyAssignments(returnedAssignments);
@@ -337,7 +353,8 @@ public class IncrementalCooperativeAssignorTest {
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
- assertAssignment(0, 0, 0, 0, "worker2");
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
+ assertAssignment(0, 0, 0, 0, "worker2", "worker3");
verify(coordinator, times(rebalanceNum)).configSnapshot();
verify(coordinator, times(rebalanceNum)).leaderState(any());
@@ -352,16 +369,18 @@ public class IncrementalCooperativeAssignorTest {
when(coordinator.configSnapshot()).thenReturn(configState);
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
- // First assignment with 2 workers and 2 connectors configured but not yet assigned
+ // First assignment with 3 workers and 2 connectors configured but not yet assigned
memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+ memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
- assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
+ assertAssignment(2, 8, 0, 0, "worker1", "worker2", "worker3");
- // Second assignment with only one worker remaining in the group. The worker that left the
+ // Second assignment with two workers remaining in the group. The worker that left the
// group was the leader. The new leader has no previous assignments and is not tracking a
// delay upon a leader's exit
applyAssignments(returnedAssignments);
@@ -379,7 +398,8 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
- assertAssignment(1, 4, 0, 0, "worker2");
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
+ assertAssignment(1, 3, 0, 0, "worker2", "worker3");
// Third assignment with the previous leader returning as a follower. In this case, the
// arrival of the previous leader is treated as an arrival of a new worker. Reassignment
@@ -391,7 +411,8 @@ public class IncrementalCooperativeAssignorTest {
++rebalanceNum;
returnedAssignments = assignmentsCapture.getValue();
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
- assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
+ assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
// Fourth assignment after revocations
applyAssignments(returnedAssignments);
@@ -401,7 +422,8 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
- assertAssignment(1, 4, 0, 0, "worker1", "worker2");
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
+ assertAssignment(0, 2, 0, 0, "worker1", "worker2", "worker3");
verify(coordinator, times(rebalanceNum)).configSnapshot();
verify(coordinator, times(rebalanceNum)).leaderState(any());
@@ -429,6 +451,7 @@ public class IncrementalCooperativeAssignorTest {
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
// This was the assignment that should have been sent, but didn't make it after all the way
assertDelay(0, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(2, 8, 0, 0, "worker1", "worker2");
// Second assignment happens with members returning the same assignments (memberConfigs)
@@ -441,6 +464,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
assertDelay(rebalanceDelay, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 0, "worker1", "worker2");
time.sleep(rebalanceDelay / 2);
@@ -454,6 +478,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(rebalanceDelay / 2, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 0, "worker1", "worker2");
time.sleep(rebalanceDelay / 2 + 1);
@@ -466,6 +491,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(2, 8, 0, 0, "worker1", "worker2");
verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -488,6 +514,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(2, 8, 0, 0, "worker1", "worker2");
when(coordinator.configSnapshot()).thenReturn(configState);
@@ -509,6 +536,7 @@ public class IncrementalCooperativeAssignorTest {
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
// This was the assignment that should have been sent, but didn't make it after all the way
assertDelay(0, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
// Third assignment happens with members returning the same assignments (memberConfigs)
@@ -519,6 +547,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
assertDelay(0, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -538,6 +567,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(3, 12, 0, 0, "worker1", "worker2");
// Second assignment with an updated config state that reflects removal of a connector
@@ -550,6 +580,7 @@ public class IncrementalCooperativeAssignorTest {
returnedAssignments = assignmentsCapture.getValue();
assertDelay(0, returnedAssignments);
expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+ assertNoReassignments(memberConfigs, expectedMemberConfigs);
assertAssignment(0, 0, 1, 4, "worker1", "worker2");
verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -1076,4 +1107,54 @@ public class IncrementalCooperativeAssignorTest {
.forEach(a -> assertEquals(
"Wrong rebalance delay in " + a, expectedDelay, a.delay()));
}
+
+ private void assertNoReassignments(Map<String, ExtendedWorkerState> existingAssignments,
+ Map<String, ExtendedWorkerState> newAssignments) {
+ assertNoDuplicateInAssignment(existingAssignments);
+ assertNoDuplicateInAssignment(newAssignments);
+
+ List<String> existingConnectors = existingAssignments.values().stream()
+ .flatMap(a -> a.assignment().connectors().stream())
+ .collect(Collectors.toList());
+ List<String> newConnectors = newAssignments.values().stream()
+ .flatMap(a -> a.assignment().connectors().stream())
+ .collect(Collectors.toList());
+
+ List<ConnectorTaskId> existingTasks = existingAssignments.values().stream()
+ .flatMap(a -> a.assignment().tasks().stream())
+ .collect(Collectors.toList());
+
+ List<ConnectorTaskId> newTasks = newAssignments.values().stream()
+ .flatMap(a -> a.assignment().tasks().stream())
+ .collect(Collectors.toList());
+
+ existingConnectors.retainAll(newConnectors);
+ assertThat("Found connectors in new assignment that already exist in current assignment",
+ Collections.emptyList(),
+ is(existingConnectors));
+ existingTasks.retainAll(newTasks);
+ assertThat("Found tasks in new assignment that already exist in current assignment",
+ Collections.emptyList(),
+ is(existingConnectors));
+ }
+
+ private void assertNoDuplicateInAssignment(Map<String, ExtendedWorkerState> existingAssignment) {
+ List<String> existingConnectors = existingAssignment.values().stream()
+ .flatMap(a -> a.assignment().connectors().stream())
+ .collect(Collectors.toList());
+ Set<String> existingUniqueConnectors = new HashSet<>(existingConnectors);
+ existingConnectors.removeAll(existingUniqueConnectors);
+ assertThat("Connectors should be unique in assignments but duplicates where found",
+ Collections.emptyList(),
+ is(existingConnectors));
+
+ List<ConnectorTaskId> existingTasks = existingAssignment.values().stream()
+ .flatMap(a -> a.assignment().tasks().stream())
+ .collect(Collectors.toList());
+ Set<ConnectorTaskId> existingUniqueTasks = new HashSet<>(existingTasks);
+ existingTasks.removeAll(existingUniqueTasks);
+ assertThat("Tasks should be unique in assignments but duplicates where found",
+ Collections.emptyList(),
+ is(existingTasks));
+ }
}