You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 16:33:57 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

rhauch commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583741413



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {
+                log.warn("Could not close producer", t);

Review comment:
       This is run on a thread from the worker's executor. Should we include the connector and task ID in this message so that we have some form of traceability with what task had the issue?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -64,7 +64,7 @@
     private final TaskMetricsGroup taskMetricsGroup;
     private volatile TargetState targetState;
     private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
-    private volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
+    protected volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)

Review comment:
       How about keeping this `private` and adding a protected `isCancelled()` method? 
   
   Currently, the `cancelled` field is encapsulated entirely within the `WorkerTask` class, and modified only via the public `cancel()` method. We can just as easily keep the encapsulation. OTOH, if we were to make `cancelled` protected, we'd lose that encapsulation and make it a bit more complicated if a future developer did want to add logic upon cancellation. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
##########
@@ -577,7 +578,7 @@ private void createSourceTask(TargetState initialState, RetryWithToleranceOperat
                 producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
                 offsetReader, offsetWriter, workerConfig,
                 ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator,
-                statusBackingStore);
+                statusBackingStore, (Executor) Runnable::run);

Review comment:
       I agree with Greg: nice trick!

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +291,46 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exception {
+        // Ensure that automatic topic creation is disabled on the broker
+        brokerProps.put("auto.create.topics.enable", "false");
+        connect = connectBuilder
+            .brokerProps(brokerProps)
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        // After we delete the connector, it and each of its tasks should  be stopped by the framework

Review comment:
       Again, a bit more information would be more useful:
   ```suggestion
           // Then if we delete the connector, it and each of its tasks should be stopped by the framework
           // even though the producer is blocked because there is no topic
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -65,24 +65,27 @@
     private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
     private static final int NUM_TASKS = 4;
+    private static final int MESSAGES_PER_POLL = 10;
     private static final String CONNECTOR_NAME = "simple-source";
     private static final String TOPIC_NAME = "test-topic";
 
     private EmbeddedConnectCluster.Builder connectBuilder;
     private EmbeddedConnectCluster connect;
-    Map<String, String> workerProps = new HashMap<>();
-    Properties brokerProps = new Properties();
+    Map<String, String> workerProps;
+    Properties brokerProps;

Review comment:
       Nice catch. And since we're here, why not make these `private`, too? They're currently not used outside of this class.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {

Review comment:
       The producer's `close(...)` method can throw an `InterruptException` if the method fails to join the IO thread. This can theoretically happen even if the timeout is 0 if the thread is interrupted (e.g., the executor is shutdown) _before_ the join can wait. Although the likelihood of this is small, what do you think about catching `InterruptException` and ignoring the error?
   ```suggestion
               } catch (InterruptException t) {
                   // ignore, since this is likely due to the worker's executor being shut down
               } catch (Throwable t) {
   ```
   Two things. First, the producer throws `InterruptExeption`, not `InterruptedException`. Second, even though the `WorkerSourceTask::close()` that calls this `closeProducer(Duration)` method doesn't _directly_ use the executor, the `Worker` does use that same executor to stop this `WorkerSourceTask`, which ultimately does call `WorkerSourceTask::close()`. IOW, this `closeProducer(Duration)` method is always called from the executor, and the executor could be shutdown at any moment, thus the potential `InterruptException`.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +291,46 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exception {
+        // Ensure that automatic topic creation is disabled on the broker
+        brokerProps.put("auto.create.topics.enable", "false");
+        connect = connectBuilder
+            .brokerProps(brokerProps)
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");

Review comment:
       Nit: add a comment with the second important test condition
   ```suggestion
           // and when the connector is not configured to create topics
           Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +291,46 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exception {
+        // Ensure that automatic topic creation is disabled on the broker

Review comment:
       Nit: In unit tests, "ensure" usually means "verify". We're instructing here, not verifying. But we're setting up one of the two important conditions in this test, so maybe phrase it like that?
   ```suggestion
           // When automatic topic creation is disabled on the broker
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org