You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/25 16:32:16 UTC

[kafka] branch 2.5 updated: KAFKA-9472: Remove deleted Connect tasks from status store (#8118)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new b59f880  KAFKA-9472: Remove deleted Connect tasks from status store (#8118)
b59f880 is described below

commit b59f880f5ddbcfe4cb522a799d3ae21d06a794c3
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Mon May 25 09:10:03 2020 -0700

    KAFKA-9472: Remove deleted Connect tasks from status store (#8118)
    
    Although the statuses for tasks are removed from the status store when their connector is deleted, their statuses are not removed when only the task is deleted, which happens in the case that the number of tasks for a connector is reduced.
    
    This commit adds logic for deleting the statuses for those tasks from the status store whenever a rebalance has completed and the leader of a distributed cluster has detected that there are recently-deleted tasks. Standalone is also updated to accomplish this.
    
    Unit tests for the `DistributedHerder` and `StandaloneHerder` classes are updated and an integration test has been added.
    
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/runtime/AbstractHerder.java      |  7 +++-
 .../apache/kafka/connect/runtime/TaskStatus.java   |  7 ++++
 .../apache/kafka/connect/runtime/WorkerTask.java   |  6 ++++
 .../runtime/distributed/DistributedHerder.java     | 23 ++++++++++---
 .../runtime/standalone/StandaloneHerder.java       |  1 +
 .../integration/ConnectWorkerIntegrationTest.java  | 39 ++++++++++++++++++++--
 .../runtime/distributed/DistributedHerderTest.java |  3 +-
 .../runtime/standalone/StandaloneHerderTest.java   |  3 ++
 8 files changed, 81 insertions(+), 8 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 8d8934b..99a634d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -189,11 +189,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     @Override
     public void onDeletion(String connector) {
         for (TaskStatus status : statusBackingStore.getAll(connector))
-            statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation()));
+            onDeletion(status.id());
         statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
     }
 
     @Override
+    public void onDeletion(ConnectorTaskId id) {
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.DESTROYED, workerId, generation()));
+    }
+
+    @Override
     public void pauseConnector(String connector) {
         if (!configBackingStore.contains(connector))
             throw new NotFoundException("Unknown connector " + connector);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
index 5ee9041..62bb1c7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -61,5 +61,12 @@ public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
          */
         void onShutdown(ConnectorTaskId id);
 
+        /**
+         * Invoked after the task has been deleted. Can be called if the
+         * connector tasks have been reduced, or if the connector itself has
+         * been deleted.
+         * @param id The id of the task
+         */
+        void onDeletion(ConnectorTaskId id);
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index c40dc90..b0a6a0c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -436,6 +436,12 @@ abstract class WorkerTask implements Runnable {
             delegateListener.onShutdown(id);
         }
 
+        @Override
+        public void onDeletion(ConnectorTaskId id) {
+            taskStateTimer.changeState(State.DESTROYED, time.milliseconds());
+            delegateListener.onDeletion(id);
+        }
+
         public void recordState(TargetState state) {
             switch (state) {
                 case STARTED:
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 512c700..349aa71 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -46,6 +46,7 @@ import org.apache.kafka.connect.runtime.SessionKey;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
 import org.apache.kafka.connect.runtime.rest.RestClient;
@@ -1526,6 +1527,18 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
+    private void updateDeletedTaskStatus() {
+        ClusterConfigState snapshot = configBackingStore.snapshot();
+        for (String connector : statusBackingStore.connectors()) {
+            Set<ConnectorTaskId> remainingTasks = new HashSet<>(snapshot.tasks(connector));
+            
+            statusBackingStore.getAll(connector).stream()
+                .map(TaskStatus::id)
+                .filter(task -> !remainingTasks.contains(task))
+                .forEach(this::onDeletion);
+        }
+    }
+
     protected HerderMetrics herderMetrics() {
         return herderMetrics;
     }
@@ -1588,11 +1601,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 herderMetrics.rebalanceStarted(time.milliseconds());
             }
 
-            // Delete the statuses of all connectors removed prior to the start of this rebalance. This has to
-            // be done after the rebalance completes to avoid race conditions as the previous generation attempts
-            // to change the state to UNASSIGNED after tasks have been stopped.
-            if (isLeader())
+            // Delete the statuses of all connectors and tasks removed prior to the start of this rebalance. This
+            // has to be done after the rebalance completes to avoid race conditions as the previous generation
+            // attempts to change the state to UNASSIGNED after tasks have been stopped.
+            if (isLeader()) {
                 updateDeletedConnectorStatus();
+                updateDeletedTaskStatus();
+            }
 
             // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
             // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index a3e75b5..6c5398f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -319,6 +319,7 @@ public class StandaloneHerder extends AbstractHerder {
         if (!tasks.isEmpty()) {
             worker.stopAndAwaitTasks(tasks);
             configBackingStore.removeTaskConfigs(connName);
+            tasks.forEach(this::onDeletion);
         }
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index b8e0497..b3f3020 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -102,7 +102,7 @@ public class ConnectWorkerIntegrationTest {
         // create test topic
         connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
 
-        // setup up props for the sink connector
+        // set up props for the source connector
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks));
@@ -193,7 +193,7 @@ public class ConnectWorkerIntegrationTest {
         // create test topic
         connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
 
-        // setup up props for the sink connector
+        // set up props for the source connector
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(numTasks));
@@ -236,4 +236,39 @@ public class ConnectWorkerIntegrationTest {
                 "Connector tasks did not start in time.");
     }
 
+    /**
+     * Verify that the number of tasks listed in the REST API is updated correctly after changes to
+     * the "tasks.max" connector configuration.
+     */
+    @Test
+    public void testTaskStatuses() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+            "Initial group of workers did not start in time.");
+
+        // base connector props
+        Map<String, String> connectorProps = new HashMap<>();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+
+        // start the connector with only one task
+        final int initialNumTasks = 1;
+        connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(initialNumTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, initialNumTasks, "Connector tasks did not start in time");
+
+        // then reconfigure it to use more tasks
+        final int increasedNumTasks = 5;
+        connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(increasedNumTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, increasedNumTasks, "Connector task statuses did not update in time.");
+
+        // then reconfigure it to use fewer tasks
+        final int decreasedNumTasks = 3;
+        connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(decreasedNumTasks));
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, decreasedNumTasks, "Connector task statuses did not update in time.");
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 08118fe..879abf8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -208,7 +208,7 @@ public class DistributedHerderTest {
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
 
         herder = PowerMock.createPartialMock(DistributedHerder.class,
-                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
+                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
 
@@ -222,6 +222,7 @@ public class DistributedHerderTest {
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
         PowerMock.mockStatic(Plugins.class);
         PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
+        PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes();
     }
 
     @After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 89d4e45..feb0f99 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -266,6 +266,7 @@ public class StandaloneHerderTest {
 
         EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
         statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
+        statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
 
         expectDestroy();
 
@@ -434,6 +435,8 @@ public class StandaloneHerderTest {
         // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
         expectStop();
 
+        statusBackingStore.put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 0));
+
         statusBackingStore.stop();
         EasyMock.expectLastCall();
         worker.stop();