You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/25 16:32:41 UTC

[kafka] branch 2.3 updated: KAFKA-9472: Remove deleted Connect tasks from status store (#8118)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new ae2d59c  KAFKA-9472: Remove deleted Connect tasks from status store (#8118)
ae2d59c is described below

commit ae2d59cfba36b9ff2e9eef948bca4d49441db3c4
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Mon May 25 09:10:03 2020 -0700

    KAFKA-9472: Remove deleted Connect tasks from status store (#8118)
    
    Although the statuses for tasks are removed from the status store when their connector is deleted, their statuses are not removed when only the task is deleted, which happens in the case that the number of tasks for a connector is reduced.
    
    This commit adds logic for deleting the statuses for those tasks from the status store whenever a rebalance has completed and the leader of a distributed cluster has detected that there are recently-deleted tasks. Standalone is also updated to accomplish this.
    
    Unit tests for the `DistributedHerder` and `StandaloneHerder` classes are updated and an integration test has been added.
    
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/runtime/AbstractHerder.java      |  7 +++-
 .../apache/kafka/connect/runtime/TaskStatus.java   |  7 ++++
 .../apache/kafka/connect/runtime/WorkerTask.java   |  6 ++++
 .../runtime/distributed/DistributedHerder.java     | 23 ++++++++++---
 .../runtime/standalone/StandaloneHerder.java       |  1 +
 .../integration/ConnectWorkerIntegrationTest.java  | 40 ++++++++++++++++++++--
 .../runtime/distributed/DistributedHerderTest.java |  3 +-
 .../runtime/standalone/StandaloneHerderTest.java   |  3 ++
 8 files changed, 82 insertions(+), 8 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 9a6553d..b5032a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -187,11 +187,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     @Override
     public void onDeletion(String connector) {
         for (TaskStatus status : statusBackingStore.getAll(connector))
-            statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation()));
+            onDeletion(status.id());
         statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
     }
 
     @Override
+    public void onDeletion(ConnectorTaskId id) {
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation()));
+    }
+
+    @Override
     public void pauseConnector(String connector) {
         if (!configBackingStore.contains(connector))
             throw new NotFoundException("Unknown connector " + connector);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
index 5ee9041..62bb1c7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -61,5 +61,12 @@ public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
          */
         void onShutdown(ConnectorTaskId id);
 
+        /**
+         * Invoked after the task has been deleted. Can be called if the
+         * connector tasks have been reduced, or if the connector itself has
+         * been deleted.
+         * @param id The id of the task
+         */
+        void onDeletion(ConnectorTaskId id);
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 28d2a8f..05d18ff 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -415,6 +415,12 @@ abstract class WorkerTask implements Runnable {
             delegateListener.onShutdown(id);
         }
 
+        @Override
+        public void onDeletion(ConnectorTaskId id) {
+            taskStateTimer.changeState(State.DESTROYED, time.milliseconds());
+            delegateListener.onDeletion(id);
+        }
+
         public void recordState(TargetState state) {
             switch (state) {
                 case STARTED:
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index ace8ff7..bad1841 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -45,6 +45,7 @@ import org.apache.kafka.connect.runtime.HerderRequest;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -1378,6 +1379,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
+    private void updateDeletedTaskStatus() {
+        ClusterConfigState snapshot = configBackingStore.snapshot();
+        for (String connector : statusBackingStore.connectors()) {
+            Set<ConnectorTaskId> remainingTasks = new HashSet<>(snapshot.tasks(connector));
+            
+            statusBackingStore.getAll(connector).stream()
+                .map(TaskStatus::id)
+                .filter(task -> !remainingTasks.contains(task))
+                .forEach(this::onDeletion);
+        }
+    }
+
     protected HerderMetrics herderMetrics() {
         return herderMetrics;
     }
@@ -1414,11 +1427,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 herderMetrics.rebalanceStarted(time.milliseconds());
             }
 
-            // Delete the statuses of all connectors removed prior to the start of this rebalance. This has to
-            // be done after the rebalance completes to avoid race conditions as the previous generation attempts
-            // to change the state to UNASSIGNED after tasks have been stopped.
-            if (isLeader())
+            // Delete the statuses of all connectors and tasks removed prior to the start of this rebalance. This
+            // has to be done after the rebalance completes to avoid race conditions as the previous generation
+            // attempts to change the state to UNASSIGNED after tasks have been stopped.
+            if (isLeader()) {
                 updateDeletedConnectorStatus();
+                updateDeletedTaskStatus();
+            }
 
             // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
             // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 00bda92..afb3ec7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -317,6 +317,7 @@ public class StandaloneHerder extends AbstractHerder {
         if (!tasks.isEmpty()) {
             worker.stopAndAwaitTasks(tasks);
             configBackingStore.removeTaskConfigs(connName);
+            tasks.forEach(this::onDeletion);
         }
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 0aa4418..0bcc00a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -101,7 +101,7 @@ public class ConnectWorkerIntegrationTest {
         // create test topic
         connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
 
-        // setup up props for the sink connector
+        // set up props for the source connector
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks));
@@ -192,7 +192,7 @@ public class ConnectWorkerIntegrationTest {
         // create test topic
         connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
 
-        // setup up props for the sink connector
+        // set up props for the source connector
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks));
@@ -234,4 +234,40 @@ public class ConnectWorkerIntegrationTest {
         connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
                 "Connector tasks did not start in time.");
     }
+
+    /**
+     * Verify that the number of tasks listed in the REST API is updated correctly after changes to
+     * the "tasks.max" connector configuration.
+     */
+    @Test
+    public void testTaskStatuses() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+            "Initial group of workers did not start in time.");
+
+        // base connector props
+        Map<String, String> connectorProps = new HashMap<>();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+
+        // start the connector with only one task
+        final int initialNumTasks = 1;
+        connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(initialNumTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, initialNumTasks, "Connector tasks did not start in time");
+
+        // then reconfigure it to use more tasks
+        final int increasedNumTasks = 5;
+        connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(increasedNumTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, increasedNumTasks, "Connector task statuses did not update in time.");
+
+        // then reconfigure it to use fewer tasks
+        final int decreasedNumTasks = 3;
+        connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(decreasedNumTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, decreasedNumTasks, "Connector task statuses did not update in time.");
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index b03ddf3..af8caec 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -193,7 +193,7 @@ public class DistributedHerderTest {
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
 
         herder = PowerMock.createPartialMock(DistributedHerder.class,
-                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
+                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
 
@@ -207,6 +207,7 @@ public class DistributedHerderTest {
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
         PowerMock.mockStatic(Plugins.class);
         PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
+        PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes();
     }
 
     @After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index f8a75bd..ca9fa27 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -266,6 +266,7 @@ public class StandaloneHerderTest {
 
         EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
         statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
+        statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
 
         expectDestroy();
 
@@ -432,6 +433,8 @@ public class StandaloneHerderTest {
         // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
         expectStop();
 
+        statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
+
         statusBackingStore.stop();
         EasyMock.expectLastCall();
         worker.stop();