You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/05 23:49:12 UTC

[kafka] branch 2.6 updated: KAFKA-9851: Revoking Connect tasks due to connectivity issues should also clear the running assignment (#8804)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new bc82c7d  KAFKA-9851: Revoking Connect tasks due to connectivity issues should also clear the running assignment (#8804)
bc82c7d is described below

commit bc82c7d2284ad03969a7c7f8bd1d934b8cb65437
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Fri Jun 5 15:56:02 2020 -0700

    KAFKA-9851: Revoking Connect tasks due to connectivity issues should also clear the running assignment (#8804)
    
    Until recently revocation of connectors and tasks was the result of a rebalance that contained a new assignment. Therefore the view of the running assignment was kept consistent outside the call to `RebalanceListener#onRevoke`. However, after KAFKA-9184 the need appeared for the worker to revoke tasks voluntarily and proactively without having received a new assignment.
    
    This commit will allow the worker to restart tasks that have been stopped as a result of voluntary revocation after a rebalance reassigns these tasks to the work.
    
    The fix is tested by extending an existing integration test.
    
    Reviewers: Randall Hauch <rh...@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     | 52 +++++++++++++++-------
 .../integration/ConnectWorkerIntegrationTest.java  | 16 +++++++
 .../clusters/EmbeddedConnectClusterAssertions.java |  4 +-
 3 files changed, 54 insertions(+), 18 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 851b75f..4cc7539 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1109,26 +1109,39 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     private void startWork() {
         // Start assigned connectors and tasks
-        log.info("Starting connectors and tasks using config offset {}", assignment.offset());
-
         List<Callable<Void>> callables = new ArrayList<>();
-        for (String connectorName : assignmentDifference(assignment.connectors(), runningAssignment.connectors())) {
-            callables.add(getConnectorStartingCallable(connectorName));
-        }
 
-        // These tasks have been stopped by this worker due to task reconfiguration. In order to
-        // restart them, they are removed just before the overall task startup from the set of
-        // currently running tasks. Therefore, they'll be restarted only if they are included in
-        // the assignment that was just received after rebalancing.
-        runningAssignment.tasks().removeAll(tasksToRestart);
-        tasksToRestart.clear();
-        for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks())) {
-            callables.add(getTaskStartingCallable(taskId));
+        // The sets in runningAssignment may change when onRevoked is called voluntarily by this
+        // herder (e.g. when a broker coordinator failure is detected). Otherwise the
+        // runningAssignment is always replaced by the assignment here.
+        synchronized (this) {
+            log.info("Starting connectors and tasks using config offset {}", assignment.offset());
+            log.debug("Received assignment: {}", assignment);
+            log.debug("Currently running assignment: {}", runningAssignment);
+
+            for (String connectorName : assignmentDifference(assignment.connectors(), runningAssignment.connectors())) {
+                callables.add(getConnectorStartingCallable(connectorName));
+            }
+
+            // These tasks have been stopped by this worker due to task reconfiguration. In order to
+            // restart them, they are removed just before the overall task startup from the set of
+            // currently running tasks. Therefore, they'll be restarted only if they are included in
+            // the assignment that was just received after rebalancing.
+            log.debug("Tasks to restart from currently running assignment: {}", tasksToRestart);
+            runningAssignment.tasks().removeAll(tasksToRestart);
+            tasksToRestart.clear();
+            for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks())) {
+                callables.add(getTaskStartingCallable(taskId));
+            }
         }
+
         startAndStop(callables);
-        runningAssignment = member.currentProtocolVersion() == CONNECT_PROTOCOL_V0
-                            ? ExtendedAssignment.empty()
-                            : assignment;
+
+        synchronized (this) {
+            runningAssignment = member.currentProtocolVersion() == CONNECT_PROTOCOL_V0
+                                ? ExtendedAssignment.empty()
+                                : assignment;
+        }
 
         log.info("Finished starting connectors and tasks");
     }
@@ -1638,6 +1651,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 startAndStop(callables);
                 log.info("Finished stopping tasks in preparation for rebalance");
 
+                synchronized (DistributedHerder.this) {
+                    log.debug("Removing connectors from running assignment {}", connectors);
+                    runningAssignment.connectors().removeAll(connectors);
+                    log.debug("Removing tasks from running assignment {}", tasks);
+                    runningAssignment.tasks().removeAll(tasks);
+                }
+
                 if (isTopicTrackingEnabled) {
                     // Send tombstones to reset active topics for removed connectors only after
                     // connectors and tasks have been stopped, or these tombstones will be overwritten
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 076e462..c7d0f35 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -48,6 +48,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -184,6 +185,7 @@ public class ConnectWorkerIntegrationTest {
      */
     @Test
     public void testBrokerCoordinator() throws Exception {
+        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
         workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000));
         connect = connectBuilder.workerProps(workerProps).build();
         // start the clusters
@@ -204,6 +206,9 @@ public class ConnectWorkerIntegrationTest {
         connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
                 "Connector tasks did not start in time.");
 
+        // expect that the connector will be stopped once the coordinator is detected to be down
+        StartAndStopLatch stopLatch = connectorHandle.expectedStops(1, false);
+
         connect.kafka().stopOnlyKafka();
 
         connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
@@ -213,6 +218,12 @@ public class ConnectWorkerIntegrationTest {
         // heartbeat timeout * 2 + 4sec
         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
 
+        // Wait for the connector to be stopped
+        assertTrue("Failed to stop connector and tasks after coordinator failure within "
+                        + CONNECTOR_SETUP_DURATION_MS + "ms",
+                stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
+
+        StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, false);
         connect.kafka().startOnlyKafkaOnSamePorts();
 
         // Allow for the kafka brokers to come back online
@@ -226,6 +237,11 @@ public class ConnectWorkerIntegrationTest {
 
         connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
                 "Connector tasks did not start in time.");
+
+        // Expect that the connector has started again
+        assertTrue("Failed to stop connector and tasks after coordinator failure within "
+                        + CONNECTOR_SETUP_DURATION_MS + "ms",
+                startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
     }
 
     /**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index 587ea04..6e1f87e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -46,7 +46,7 @@ public class EmbeddedConnectClusterAssertions {
     private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
     public static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
     public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
-    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+    public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
     private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
 
     private final EmbeddedConnectCluster connect;
@@ -363,7 +363,7 @@ public class EmbeddedConnectClusterAssertions {
             waitForCondition(
                 () -> checkConnectorAndTasksAreStopped(connectorName),
                 CONNECTOR_SETUP_DURATION_MS,
-                "At least the connector or one of its tasks is still");
+                "At least the connector or one of its tasks is still running");
         } catch (AssertionError e) {
             throw new AssertionError(detailMessage, e);
         }