You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/06/02 17:24:41 UTC

[kafka] branch 2.3 updated: KAFKA-8463: Fix redundant reassignment of tasks when leader worker leaves (#6859)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new a2daa49  KAFKA-8463: Fix redundant reassignment of tasks when leader worker leaves (#6859)
a2daa49 is described below

commit a2daa49ef80dfd8ba5a9128bd59a81c292cc552f
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Sun Jun 2 10:19:19 2019 -0700

    KAFKA-8463: Fix redundant reassignment of tasks when leader worker leaves (#6859)
    
    Author: Konstantine Karantasis <ko...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../IncrementalCooperativeAssignor.java            |   4 +-
 .../IncrementalCooperativeAssignorTest.java        | 103 ++++++++++++++++++---
 2 files changed, 94 insertions(+), 13 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index ae36837..8e56ca8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -195,8 +195,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         log.debug("Lost assignments: {}", lostAssignments);
 
         // Derived set: The set of new connectors-and-tasks is a derived set from the set
-        // difference of configured - previous
-        ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment);
+        // difference of configured - previous - active
+        ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment, activeAssignments);
         log.debug("New assignments: {}", newSubmissions);
 
         // A collection of the complete assignment
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 71ccefd..7085a7f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -120,6 +120,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1");
 
         // Second assignment with a second worker joining and all connectors running on previous worker
@@ -131,6 +132,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 1, 4, "worker1", "worker2");
 
         // Third assignment after revocations
@@ -141,6 +143,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(1, 4, 0, 0, "worker1", "worker2");
 
         // A fourth rebalance should not change assignments
@@ -151,6 +154,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 0, "worker1", "worker2");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -173,6 +177,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
         // Second assignment with only one worker remaining in the group. The worker that left the
@@ -186,6 +191,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(rebalanceDelay, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 0, "worker1");
 
         time.sleep(rebalanceDelay / 2);
@@ -199,6 +205,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(rebalanceDelay / 2, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 0, "worker1");
 
         time.sleep(rebalanceDelay / 2 + 1);
@@ -211,6 +218,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(1, 4, 0, 0, "worker1");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -233,6 +241,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
         // Second assignment with only one worker remaining in the group. The worker that left the
@@ -246,6 +255,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(rebalanceDelay, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 0, "worker1");
 
         time.sleep(rebalanceDelay / 2);
@@ -259,6 +269,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(rebalanceDelay / 2, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 0, "worker1");
 
         time.sleep(rebalanceDelay / 4);
@@ -273,6 +284,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(rebalanceDelay / 4, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 0, "worker1", "worker2");
 
         time.sleep(rebalanceDelay / 4);
@@ -286,6 +298,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(1, 4, 0, 0, "worker1", "worker2");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -301,16 +314,18 @@ public class IncrementalCooperativeAssignorTest {
         when(coordinator.configSnapshot()).thenReturn(configState);
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
-        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        // First assignment with 3 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2", "worker3");
 
-        // Second assignment with only one worker remaining in the group. The worker that left the
+        // Second assignment with two workers remaining in the group. The worker that left the
         // group was the leader. The new leader has no previous assignments and is not tracking a
         // delay upon a leader's exit
         applyAssignments(returnedAssignments);
@@ -328,7 +343,8 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertAssignment(1, 4, 0, 0, "worker2");
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 3, 0, 0, "worker2", "worker3");
 
         // Third (incidental) assignment with still only one worker in the group.
         applyAssignments(returnedAssignments);
@@ -337,7 +353,8 @@ public class IncrementalCooperativeAssignorTest {
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertAssignment(0, 0, 0, 0, "worker2");
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 0, "worker2", "worker3");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
@@ -352,16 +369,18 @@ public class IncrementalCooperativeAssignorTest {
         when(coordinator.configSnapshot()).thenReturn(configState);
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
-        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        // First assignment with 3 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2", "worker3");
 
-        // Second assignment with only one worker remaining in the group. The worker that left the
+        // Second assignment with two workers remaining in the group. The worker that left the
         // group was the leader. The new leader has no previous assignments and is not tracking a
         // delay upon a leader's exit
         applyAssignments(returnedAssignments);
@@ -379,7 +398,8 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertAssignment(1, 4, 0, 0, "worker2");
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 3, 0, 0, "worker2", "worker3");
 
         // Third assignment with the previous leader returning as a follower. In this case, the
         // arrival of the previous leader is treated as an arrival of a new worker. Reassignment
@@ -391,7 +411,8 @@ public class IncrementalCooperativeAssignorTest {
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
 
         // Fourth assignment after revocations
         applyAssignments(returnedAssignments);
@@ -401,7 +422,8 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertAssignment(1, 4, 0, 0, "worker1", "worker2");
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 2, 0, 0, "worker1", "worker2", "worker3");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
@@ -429,6 +451,7 @@ public class IncrementalCooperativeAssignorTest {
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
         // This was the assignment that should have been sent, but didn't make it after all the way
         assertDelay(0, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
         // Second assignment happens with members returning the same assignments (memberConfigs)
@@ -441,6 +464,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
         assertDelay(rebalanceDelay, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 0, "worker1", "worker2");
 
         time.sleep(rebalanceDelay / 2);
@@ -454,6 +478,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(rebalanceDelay / 2, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 0, "worker1", "worker2");
 
         time.sleep(rebalanceDelay / 2 + 1);
@@ -466,6 +491,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -488,6 +514,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
         when(coordinator.configSnapshot()).thenReturn(configState);
@@ -509,6 +536,7 @@ public class IncrementalCooperativeAssignorTest {
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
         // This was the assignment that should have been sent, but didn't make it after all the way
         assertDelay(0, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
 
         // Third assignment happens with members returning the same assignments (memberConfigs)
@@ -519,6 +547,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
         assertDelay(0, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -538,6 +567,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(3, 12, 0, 0, "worker1", "worker2");
 
         // Second assignment with an updated config state that reflects removal of a connector
@@ -550,6 +580,7 @@ public class IncrementalCooperativeAssignorTest {
         returnedAssignments = assignmentsCapture.getValue();
         assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 1, 4, "worker1", "worker2");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
@@ -1076,4 +1107,54 @@ public class IncrementalCooperativeAssignorTest {
                 .forEach(a -> assertEquals(
                         "Wrong rebalance delay in " + a, expectedDelay, a.delay()));
     }
+
+    private void assertNoReassignments(Map<String, ExtendedWorkerState> existingAssignments,
+                                       Map<String, ExtendedWorkerState> newAssignments) {
+        assertNoDuplicateInAssignment(existingAssignments);
+        assertNoDuplicateInAssignment(newAssignments);
+
+        List<String> existingConnectors = existingAssignments.values().stream()
+                .flatMap(a -> a.assignment().connectors().stream())
+                .collect(Collectors.toList());
+        List<String> newConnectors = newAssignments.values().stream()
+                .flatMap(a -> a.assignment().connectors().stream())
+                .collect(Collectors.toList());
+
+        List<ConnectorTaskId> existingTasks = existingAssignments.values().stream()
+                .flatMap(a -> a.assignment().tasks().stream())
+                .collect(Collectors.toList());
+
+        List<ConnectorTaskId> newTasks = newAssignments.values().stream()
+                .flatMap(a -> a.assignment().tasks().stream())
+                .collect(Collectors.toList());
+
+        existingConnectors.retainAll(newConnectors);
+        assertThat("Found connectors in new assignment that already exist in current assignment",
+                Collections.emptyList(),
+                is(existingConnectors));
+        existingTasks.retainAll(newTasks);
+        assertThat("Found tasks in new assignment that already exist in current assignment",
+                Collections.emptyList(),
+                is(existingConnectors));
+    }
+
+    private void assertNoDuplicateInAssignment(Map<String, ExtendedWorkerState> existingAssignment) {
+        List<String> existingConnectors = existingAssignment.values().stream()
+                .flatMap(a -> a.assignment().connectors().stream())
+                .collect(Collectors.toList());
+        Set<String> existingUniqueConnectors = new HashSet<>(existingConnectors);
+        existingConnectors.removeAll(existingUniqueConnectors);
+        assertThat("Connectors should be unique in assignments but duplicates where found",
+                Collections.emptyList(),
+                is(existingConnectors));
+
+        List<ConnectorTaskId> existingTasks = existingAssignment.values().stream()
+                .flatMap(a -> a.assignment().tasks().stream())
+                .collect(Collectors.toList());
+        Set<ConnectorTaskId> existingUniqueTasks = new HashSet<>(existingTasks);
+        existingTasks.removeAll(existingUniqueTasks);
+        assertThat("Tasks should be unique in assignments but duplicates where found",
+                Collections.emptyList(),
+                is(existingTasks));
+    }
 }