You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/11 02:40:54 UTC

[kafka] branch 2.3 updated: KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment (#8453)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 6679fea  KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment (#8453)
6679fea is described below

commit 6679fea2908b391c7a0fad42b6fed2c6dd852a8a
Author: Lucent-Wong <ma...@live.cn>
AuthorDate: Thu Jun 11 08:59:32 2020 +0800

    KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment (#8453)
    
    With Incremental Cooperative Rebalancing, if a worker returns after it's been out of the group for sometime (essentially as a zombie worker) and hasn't voluntarily revoked its own connectors and tasks in the meantime, there's the possibility that these assignments have been distributed to other workers and redundant connectors and tasks might be running now in the Connect cluster.
    
    This PR complements previous fixes such as KAFKA-9184, KAFKA-9849 and KAFKA-9851 providing a last line of defense against zombie tasks: if at any rebalance round the leader worker detects that there are duplicate assignments in the group, it revokes them completely and resolves duplication with a correct assignment in the rebalancing round that will follow task revocation.
    
    Author: Wang <yw...@ebay.com>
    
    Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
 .../IncrementalCooperativeAssignor.java            |  59 ++++++++
 .../IncrementalCooperativeAssignorTest.java        | 162 ++++++++++++++++++++-
 2 files changed, 217 insertions(+), 4 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index a32ffdc..0eeaca6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -237,6 +237,10 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments);
         log.debug("Connector and task to delete assignments: {}", toRevoke);
 
+        // Revoking redundant connectors/tasks if the the workers have duplicate assignments
+        toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments));
+        log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke);
+
         // Recompute the complete assignment excluding the deleted connectors-and-tasks
         completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
         connectorAssignments =
@@ -352,6 +356,61 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         return previousAssignment;
     }
 
+    private ConnectorsAndTasks duplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs) {
+        Set<String> connectors = memberConfigs.entrySet().stream()
+                .flatMap(memberConfig -> memberConfig.getValue().assignment().connectors().stream())
+                .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
+                .entrySet().stream()
+                .filter(entry -> entry.getValue() > 1L)
+                .map(entry -> entry.getKey())
+                .collect(Collectors.toSet());
+
+        Set<ConnectorTaskId> tasks = memberConfigs.values().stream()
+                .flatMap(state -> state.assignment().tasks().stream())
+                .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
+                .entrySet().stream()
+                .filter(entry -> entry.getValue() > 1L)
+                .map(entry -> entry.getKey())
+                .collect(Collectors.toSet());
+        return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
+    }
+
+    private Map<String, ConnectorsAndTasks> computeDuplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs,
+                                             Map<String, Collection<String>> connectorAssignments,
+                                             Map<String, Collection<ConnectorTaskId>> taskAssignment) {
+        ConnectorsAndTasks duplicatedAssignments = duplicatedAssignments(memberConfigs);
+        log.debug("Duplicated assignments: {}", duplicatedAssignments);
+
+        Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
+        if (!duplicatedAssignments.connectors().isEmpty()) {
+            connectorAssignments.entrySet().stream()
+                    .forEach(entry -> {
+                        Set<String> duplicatedConnectors = new HashSet<>(duplicatedAssignments.connectors());
+                        duplicatedConnectors.retainAll(entry.getValue());
+                        if (!duplicatedConnectors.isEmpty()) {
+                            toRevoke.computeIfAbsent(
+                                entry.getKey(),
+                                v -> new ConnectorsAndTasks.Builder().build()
+                            ).connectors().addAll(duplicatedConnectors);
+                        }
+                    });
+        }
+        if (!duplicatedAssignments.tasks().isEmpty()) {
+            taskAssignment.entrySet().stream()
+                    .forEach(entry -> {
+                        Set<ConnectorTaskId> duplicatedTasks = new HashSet<>(duplicatedAssignments.tasks());
+                        duplicatedTasks.retainAll(entry.getValue());
+                        if (!duplicatedTasks.isEmpty()) {
+                            toRevoke.computeIfAbsent(
+                                entry.getKey(),
+                                v -> new ConnectorsAndTasks.Builder().build()
+                            ).tasks().addAll(duplicatedTasks);
+                        }
+                    });
+        }
+        return toRevoke;
+    }
+
     // visible for testing
     protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                                          ConnectorsAndTasks newSubmissions,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index dfa5183..6daf469 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -1078,6 +1078,148 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.delay);
     }
 
+    @Test
+    public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker with duplicate assignment joining and all connectors running on previous worker
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment();
+        duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2));
+        duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 4));
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, duplicatedWorkerAssignment));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 2, 8, "worker1", "worker2");
+
+        // Third assignment after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 4, 0, 2, "worker1", "worker2");
+
+        // fourth rebalance after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 2, 0, 0, "worker1", "worker2");
+
+        // Fifth rebalance should not change assignments
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
+        verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
+    }
+
+    @Test
+    public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        //delete connector1
+        configState = clusterConfigState(offset, 2, 1, 4);
+        when(coordinator.configSnapshot()).thenReturn(configState);
+
+        // Second assignment with a second worker with duplicate assignment joining and the duplicated assignment is deleted at the same time
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        ExtendedAssignment duplicatedWorkerAssignment = newExpandableAssignment();
+        duplicatedWorkerAssignment.connectors().addAll(newConnectors(1, 2));
+        duplicatedWorkerAssignment.tasks().addAll(newTasks("connector1", 0, 4));
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, duplicatedWorkerAssignment));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 2, 8, "worker1", "worker2");
+
+        // Third assignment after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2");
+
+        // fourth rebalance after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 2, 0, 0, "worker1", "worker2");
+
+        // Fifth rebalance should not change assignments
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
+        verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
+    }
+
     private WorkerLoad emptyWorkerLoad(String worker) {
         return new WorkerLoad.Builder(worker).build();
     }
@@ -1096,19 +1238,31 @@ public class IncrementalCooperativeAssignorTest {
     }
 
     private static List<ConnectorTaskId> newTasks(int start, int end) {
+        return newTasks("task", start, end);
+    }
+
+    private static List<ConnectorTaskId> newTasks(String connectorName, int start, int end) {
         return IntStream.range(start, end)
-                .mapToObj(i -> new ConnectorTaskId("task", i))
+                .mapToObj(i -> new ConnectorTaskId(connectorName, i))
                 .collect(Collectors.toList());
     }
 
     private static ClusterConfigState clusterConfigState(long offset,
                                                          int connectorNum,
                                                          int taskNum) {
+        return clusterConfigState(offset, 1, connectorNum, taskNum);
+    }
+
+    private static ClusterConfigState clusterConfigState(long offset,
+                                                         int connectorStart,
+                                                         int connectorNum,
+                                                         int taskNum) {
+        int connectorNumEnd = connectorStart + connectorNum - 1;
         return new ClusterConfigState(
                 offset,
-                connectorTaskCounts(1, connectorNum, taskNum),
-                connectorConfigs(1, connectorNum),
-                connectorTargetStates(1, connectorNum, TargetState.STARTED),
+                connectorTaskCounts(connectorStart, connectorNumEnd, taskNum),
+                connectorConfigs(connectorStart, connectorNumEnd),
+                connectorTargetStates(connectorStart, connectorNumEnd, TargetState.STARTED),
                 taskConfigs(0, connectorNum, connectorNum * taskNum),
                 Collections.emptySet());
     }