You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/03/01 16:29:04 UTC

[kafka] branch 2.7 updated: KAFKA-10340: Proactively close producer when cancelling source tasks (#10016)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 388d9a3  KAFKA-10340: Proactively close producer when cancelling source tasks (#10016)
388d9a3 is described below

commit 388d9a321bb998aeaed512b3530f8daf390b50d3
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Mon Mar 1 11:03:34 2021 -0500

    KAFKA-10340: Proactively close producer when cancelling source tasks (#10016)
    
    Close the producer in `WorkerSourceTask` when the latter is cancelled. If the broker do not autocreate the topic, and the connector is not configured to create topics written by the source connector, then the `WorkerSourceTask` main thread will block forever until the topic is created, and will not stop if cancelled or scheduled for shutdown by the worker.
    
    Expanded an existing unit test for the WorkerSourceTask class to ensure that the producer is closed when the task is abandoned, and added a new integration test that guarantees that tasks are still shut down even when their producers are trying to write to topics that do not exist.
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewed: Greg Harris <gr...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  2 +-
 .../kafka/connect/runtime/WorkerSourceTask.java    | 38 +++++++++++++-----
 .../apache/kafka/connect/runtime/WorkerTask.java   |  4 ++
 .../integration/ConnectWorkerIntegrationTest.java  | 45 ++++++++++++++++++++--
 .../kafka/connect/integration/ConnectorHandle.java |  8 ++--
 .../integration/MonitorableSourceConnector.java    |  1 +
 .../connect/runtime/ErrorHandlingTaskTest.java     |  3 +-
 .../ErrorHandlingTaskWithTopicCreationTest.java    |  3 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |  5 ++-
 .../WorkerSourceTaskWithTopicCreationTest.java     |  5 ++-
 .../apache/kafka/connect/runtime/WorkerTest.java   |  4 +-
 .../runtime/WorkerWithTopicCreationTest.java       |  4 +-
 12 files changed, 98 insertions(+), 24 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 45a93df..fffe71a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -626,7 +626,7 @@ public class Worker {
             // Note we pass the configState as it performs dynamic transformations under the covers
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
                     headerConverter, transformationChain, producer, admin, topicCreationGroups,
-                    offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore());
+                    offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore(), executor);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index ecb6260..6300b11 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -60,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -86,6 +87,7 @@ class WorkerSourceTask extends WorkerTask {
     private final TopicAdmin admin;
     private final CloseableOffsetStorageReader offsetReader;
     private final OffsetStorageWriter offsetWriter;
+    private final Executor closeExecutor;
     private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
     private final AtomicReference<Exception> producerSendException;
     private final boolean isTopicTrackingEnabled;
@@ -123,7 +125,8 @@ class WorkerSourceTask extends WorkerTask {
                             ClassLoader loader,
                             Time time,
                             RetryWithToleranceOperator retryWithToleranceOperator,
-                            StatusBackingStore statusBackingStore) {
+                            StatusBackingStore statusBackingStore,
+                            Executor closeExecutor) {
 
         super(id, statusListener, initialState, loader, connectMetrics,
                 retryWithToleranceOperator, time, statusBackingStore);
@@ -139,6 +142,7 @@ class WorkerSourceTask extends WorkerTask {
         this.admin = admin;
         this.offsetReader = offsetReader;
         this.offsetWriter = offsetWriter;
+        this.closeExecutor = closeExecutor;
 
         this.toSend = null;
         this.lastSendFailed = false;
@@ -171,13 +175,9 @@ class WorkerSourceTask extends WorkerTask {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(Duration.ofSeconds(30));
+
         if (admin != null) {
             try {
                 admin.close(Duration.ofSeconds(30));
@@ -202,6 +202,14 @@ class WorkerSourceTask extends WorkerTask {
     public void cancel() {
         super.cancel();
         offsetReader.close();
+        // We proactively close the producer here as the main work thread for the task may
+        // be blocked indefinitely in a call to Producer::send if automatic topic creation is
+        // not enabled on either the connector or the Kafka cluster. Closing the producer should
+        // unblock it in that case and allow shutdown to proceed normally.
+        // With a duration of 0, the producer's own shutdown logic should be fairly quick,
+        // but closing user-pluggable classes like interceptors may lag indefinitely. So, we
+        // call close on a separate thread in order to avoid blocking the herder's tick thread.
+        closeExecutor.execute(() -> closeProducer(Duration.ZERO));
     }
 
     @Override
@@ -259,6 +267,16 @@ class WorkerSourceTask extends WorkerTask {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {
+                log.warn("Could not close producer for {}", id, t);
+            }
+        }
+    }
+
     private void maybeThrowProducerSendException() {
         if (producerSendException.get() != null) {
             throw new ConnectException(
@@ -485,7 +503,9 @@ class WorkerSourceTask extends WorkerTask {
             while (!outstandingMessages.isEmpty()) {
                 try {
                     long timeoutMs = timeout - time.milliseconds();
-                    if (timeoutMs <= 0) {
+                    // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain,
+                    // we can stop flushing immediately
+                    if (isCancelled() || timeoutMs <= 0) {
                         log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
                         finishFailedFlush();
                         recordCommitFailure(time.milliseconds() - started, null);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index cadb4c4..deacb62 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -159,6 +159,10 @@ abstract class WorkerTask implements Runnable {
         return stopping;
     }
 
+    protected boolean isCancelled() {
+        return cancelled;
+    }
+
     private void doClose() {
         try {
             close();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index bd98183..5cd794e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -65,13 +65,14 @@ public class ConnectWorkerIntegrationTest {
     private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
     private static final int NUM_TASKS = 4;
+    private static final int MESSAGES_PER_POLL = 10;
     private static final String CONNECTOR_NAME = "simple-source";
     private static final String TOPIC_NAME = "test-topic";
 
     private EmbeddedConnectCluster.Builder connectBuilder;
     private EmbeddedConnectCluster connect;
-    Map<String, String> workerProps = new HashMap<>();
-    Properties brokerProps = new Properties();
+    private Map<String, String> workerProps;
+    private Properties brokerProps;
 
     @Rule
     public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
@@ -79,10 +80,12 @@ public class ConnectWorkerIntegrationTest {
     @Before
     public void setup() {
         // setup Connect worker properties
+        workerProps = new HashMap<>();
         workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
         workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
 
         // setup Kafka broker properties
+        brokerProps = new Properties();
         brokerProps.put("auto.create.topics.enable", String.valueOf(false));
 
         // build a Connect cluster backed by Kafka and Zk
@@ -288,14 +291,48 @@ public class ConnectWorkerIntegrationTest {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exception {
+        // When automatic topic creation is disabled on the broker
+        brokerProps.put("auto.create.topics.enable", "false");
+        connect = connectBuilder
+            .brokerProps(brokerProps)
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        // and when the connector is not configured to create topics
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        // Then if we delete the connector, it and each of its tasks should be stopped by the framework
+        // even though the producer is blocked because there is no topic
+        StartAndStopLatch stopCounter = connector.expectedStops(1);
+        connect.deleteConnector(CONNECTOR_NAME);
+
+        assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES));
+    }
+
     private Map<String, String> defaultSourceConnectorProps(String topic) {
         // setup up props for the source connector
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put(TOPIC_CONFIG, topic);
-        props.put("throughput", String.valueOf(10));
-        props.put("messages.per.poll", String.valueOf(10));
+        props.put("throughput", "10");
+        props.put("messages.per.poll", String.valueOf(MESSAGES_PER_POLL));
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
index 06bc373..ffc9e7a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
@@ -292,8 +292,8 @@ public class ConnectorHandle {
      * {@link StartAndStopLatch#await(long, TimeUnit)} to wait up to a specified duration for the
      * connector and all tasks to be started at least the specified number of times.
      *
-     * <p>This method does not track the number of times the connector and tasks are stopped, and
-     * only tracks the number of times the connector and tasks are <em>started</em>.
+     * <p>This method does not track the number of times the connector and tasks are started, and
+     * only tracks the number of times the connector and tasks are <em>stopped</em>.
      *
      * @param expectedStops the minimum number of starts that are expected once this method is
      *                      called
@@ -315,8 +315,8 @@ public class ConnectorHandle {
      * {@link StartAndStopLatch#await(long, TimeUnit)} to wait up to a specified duration for the
      * connector and all tasks to be started at least the specified number of times.
      *
-     * <p>This method does not track the number of times the connector and tasks are stopped, and
-     * only tracks the number of times the connector and tasks are <em>started</em>.
+     * <p>This method does not track the number of times the connector and tasks are started, and
+     * only tracks the number of times the connector and tasks are <em>stopped</em>.
      *
      * @param expectedStops the minimum number of starts that are expected once this method is
      *                      called
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index c11b9bd..aaada37 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -130,6 +130,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
                     throttler.throttle();
                 }
                 taskHandle.record(batchSize);
+                log.info("Returning batch of {} records", batchSize);
                 return LongStream.range(0, batchSize)
                         .mapToObj(i -> new SourceRecord(
                                 Collections.singletonMap("task.id", taskId),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 70bbfc6..daa9edd 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -77,6 +77,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
@@ -563,7 +564,7 @@ public class ErrorHandlingTaskTest {
                 producer, admin, null,
                 offsetReader, offsetWriter, workerConfig,
                 ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator,
-                statusBackingStore);
+                statusBackingStore, (Executor) Runnable::run);
     }
 
     private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
index 692f4d2..5cdfab9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
@@ -80,6 +80,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
@@ -581,7 +582,7 @@ public class ErrorHandlingTaskWithTopicCreationTest {
                 producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
                 offsetReader, offsetWriter, workerConfig,
                 ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator,
-                statusBackingStore);
+                statusBackingStore, (Executor) Runnable::run);
     }
 
     private ConsumerRecords<byte[], byte[]> records(ConsumerRecord<byte[], byte[]> record) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index ff61ea3..9beb47c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -196,7 +196,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
                 transformationChain, producer, admin, null,
                 offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
-                RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore);
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore, Runnable::run);
     }
 
     @Test
@@ -718,6 +718,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         offsetReader.close();
         PowerMock.expectLastCall();
 
+        producer.close(Duration.ZERO);
+        PowerMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.cancel();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
index 0324168..5ce4f47 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
@@ -222,7 +222,7 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
                 transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
                 offsetReader, offsetWriter, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
-                RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore);
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, statusBackingStore, Runnable::run);
     }
 
     @Test
@@ -763,6 +763,9 @@ public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
         offsetReader.close();
         PowerMock.expectLastCall();
 
+        producer.close(Duration.ZERO);
+        PowerMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.cancel();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 15933a7..cad0cf4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -94,6 +94,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -1481,7 +1482,8 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(pluginLoader),
                 anyObject(Time.class),
                 anyObject(RetryWithToleranceOperator.class),
-                anyObject(StatusBackingStore.class))
+                anyObject(StatusBackingStore.class),
+                anyObject(Executor.class))
                 .andReturn(workerTask);
     }
     /* Name here needs to be unique as we are testing the aliasing mechanism */
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
index 763ef36..fe0005b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
@@ -85,6 +85,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -1389,7 +1390,8 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
                 EasyMock.eq(pluginLoader),
                 anyObject(Time.class),
                 anyObject(RetryWithToleranceOperator.class),
-                anyObject(StatusBackingStore.class))
+                anyObject(StatusBackingStore.class),
+                anyObject(Executor.class))
                 .andReturn(workerTask);
     }