You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/01 20:59:25 UTC

[GitHub] [kafka] C0urante opened a new pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

C0urante opened a new pull request #10016:
URL: https://github.com/apache/kafka/pull/10016


   [Jira](https://issues.apache.org/jira/browse/KAFKA-10340)
   
   When a source task produces records for a topic that doesn't exist on the Kafka cluster and automatic topic creation is disabled on the broker and not configured à la [KIP-158](https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics) on the connector, the task hangs indefinitely until and unless the topic is created. Even if the task is scheduled for shutdown by the worker, it continues to hang; the `SourceTask` instance isn't stopped and the producer isn't closed.
   
   One possible approach to handle this situation is to proactively close the producer for the task when it is abandoned after exceeding the graceful shutdown timeout period. This can increase the likelihood of duplicate records for tasks that are blocked on shutdown for other reasons (high throughput, for example), as offsets will not be committed for any outstanding batches. However, given that the overall delivery guarantees of the Connect framework still remain intact with this approach (either at-least-once or at-most-once, but not exactly-once), the tradeoff seems acceptable in order to prevent resource leaks that, if stacked over a long enough period, will require worker restarts to deal with.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#issuecomment-771154039


   @gharris1727 @ncliang @sajanaW would any of you have time to take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583778100



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -64,7 +64,7 @@
     private final TaskMetricsGroup taskMetricsGroup;
     private volatile TargetState targetState;
     private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
-    private volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
+    protected volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)

Review comment:
       Mmmm, aligns nicely with the existing `WorkerTask.stopping` and `WorkerTask::isStopping` field and method, respectively. Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568823888



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -171,13 +171,9 @@ protected void close() {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(30);
+
         if (admin != null) {

Review comment:
       Yeah, I see that here that the situation doesn't require any plugins to block, just the framework/clients. I'm sure that had you implemented by refactoring close, someone could come along and leave the comment "is all of this necessary? how about just closing the producer instead?"
   
   I'm fine with this fix as-is, and if we notice a resource leak related to any of the other plugins caused by stalled connectors, we can address them at that time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#issuecomment-786768813


   Thanks @rhauch. I've added all of your suggested changes except for the special handling for `InterruptException` (which I've responded to via comment) and fixed the merge conflict. Ready for another round when you have a chance.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583741413



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {
+                log.warn("Could not close producer", t);

Review comment:
       This is run on a thread from the worker's executor. Should we include the connector and task ID in this message so that we have some form of traceability with what task had the issue?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##########
@@ -64,7 +64,7 @@
     private final TaskMetricsGroup taskMetricsGroup;
     private volatile TargetState targetState;
     private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
-    private volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
+    protected volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)

Review comment:
       How about keeping this `private` and adding a protected `isCancelled()` method? 
   
   Currently, the `cancelled` field is encapsulated entirely within the `WorkerTask` class, and modified only via the public `cancel()` method. We can just as easily keep the encapsulation. OTOH, if we were to make `cancelled` protected, we'd lose that encapsulation and make it a bit more complicated if a future developer did want to add logic upon cancellation. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
##########
@@ -577,7 +578,7 @@ private void createSourceTask(TargetState initialState, RetryWithToleranceOperat
                 producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
                 offsetReader, offsetWriter, workerConfig,
                 ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator,
-                statusBackingStore);
+                statusBackingStore, (Executor) Runnable::run);

Review comment:
       I agree with Greg: nice trick!

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +291,46 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exception {
+        // Ensure that automatic topic creation is disabled on the broker
+        brokerProps.put("auto.create.topics.enable", "false");
+        connect = connectBuilder
+            .brokerProps(brokerProps)
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        // After we delete the connector, it and each of its tasks should  be stopped by the framework

Review comment:
       Again, a bit more information would be more useful:
   ```suggestion
           // Then if we delete the connector, it and each of its tasks should be stopped by the framework
           // even though the producer is blocked because there is no topic
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -65,24 +65,27 @@
     private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
     private static final int NUM_TASKS = 4;
+    private static final int MESSAGES_PER_POLL = 10;
     private static final String CONNECTOR_NAME = "simple-source";
     private static final String TOPIC_NAME = "test-topic";
 
     private EmbeddedConnectCluster.Builder connectBuilder;
     private EmbeddedConnectCluster connect;
-    Map<String, String> workerProps = new HashMap<>();
-    Properties brokerProps = new Properties();
+    Map<String, String> workerProps;
+    Properties brokerProps;

Review comment:
       Nice catch. And since we're here, why not make these `private`, too? They're currently not used outside of this class.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {

Review comment:
       The producer's `close(...)` method can throw an `InterruptException` if the method fails to join the IO thread. This can theoretically happen even if the timeout is 0 if the thread is interrupted (e.g., the executor is shutdown) _before_ the join can wait. Although the likelihood of this is small, what do you think about catching `InterruptException` and ignoring the error?
   ```suggestion
               } catch (InterruptException t) {
                   // ignore, since this is likely due to the worker's executor being shut down
               } catch (Throwable t) {
   ```
   Two things. First, the producer throws `InterruptExeption`, not `InterruptedException`. Second, even though the `WorkerSourceTask::close()` that calls this `closeProducer(Duration)` method doesn't _directly_ use the executor, the `Worker` does use that same executor to stop this `WorkerSourceTask`, which ultimately does call `WorkerSourceTask::close()`. IOW, this `closeProducer(Duration)` method is always called from the executor, and the executor could be shutdown at any moment, thus the potential `InterruptException`.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +291,46 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exception {
+        // Ensure that automatic topic creation is disabled on the broker
+        brokerProps.put("auto.create.topics.enable", "false");
+        connect = connectBuilder
+            .brokerProps(brokerProps)
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");

Review comment:
       Nit: add a comment with the second important test condition
   ```suggestion
           // and when the connector is not configured to create topics
           Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +291,46 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exception {
+        // Ensure that automatic topic creation is disabled on the broker

Review comment:
       Nit: In unit tests, "ensure" usually means "verify". We're instructing here, not verifying. But we're setting up one of the two important conditions in this test, so maybe phrase it like that?
   ```suggestion
           // When automatic topic creation is disabled on the broker
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568603419



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -202,6 +198,8 @@ public void removeMetrics() {
     public void cancel() {
         super.cancel();
         offsetReader.close();
+        // Run on a separate thread to avoid potentially blocking the herder thread
+        new Thread(() -> closeProducer(0)).start();

Review comment:
       Yeah, we can probably use an executor for this. I was on the fence about the necessity of doing this in a separate thread at all (hence the half-assing here) but then I looked deeper into the `KafkaProducer::close` logic and saw that that included closing its interceptors, key serializer, value serializer, etc. Any time user code is called we should assume it can block forever (probably worth adding as a comment here), so yeah, since I'm more confident in the necessity of asynchronously closing the producer I'll expand this to use an executor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #10016:
URL: https://github.com/apache/kafka/pull/10016


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568599861



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -171,13 +171,9 @@ protected void close() {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(30);
+
         if (admin != null) {

Review comment:
       I wanted to keep things focused here. We know that there's an edge case with the producer that can cause it to hang forever in `send`, and we're addressing that. As far as I know, there is no such case with the admin client, so closing the admin client proactively seems unnecessary. Although there could be an issue with a transformation chain blocking, there's no guarantee that invoking `close` on it is going to fix anything since a transform blocked on, e.g., input/output could remain blocked on that even after being `closed` from another thread.
   
   Ultimately, if a similar case comes up where follow-up is necessary, we can consider our options then based on the particulars of the situation. Right now we only have one specific problem to solve, and I think a targeted approach that doesn't unnecessarily change things is best for that.
   
   If nothing else, maybe a comment explaining why the producer gets special treatment here would be beneficial.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -202,6 +198,8 @@ public void removeMetrics() {
     public void cancel() {
         super.cancel();
         offsetReader.close();
+        // Run on a separate thread to avoid potentially blocking the herder thread
+        new Thread(() -> closeProducer(0)).start();

Review comment:
       Yeah, we can probably use an executor for this. I was on the fence about the necessity of doing this in a separate thread at all (hence the half-assing here) but then I looked deeper into the `KafkaProducer::close` logic and saw that that included closing its interceptors, key serializer, value serializer, etc. Any time user code is called we should assume it can block forever (probably worth adding as a comment here), so yeah, since I'm more confident in the necessity of asynchronously closing the producer I'll expand this to use an executor.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +289,42 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskOnNonExistentTopic() throws Exception {
+        connect = connectBuilder
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        StartAndStopLatch stopCounter = connector.expectedStops(1);

Review comment:
       It's asserting that shutdown is not blocked indefinitely when we hit the topic-creation-disabled scenario, but not that the task fails. I'll try to make things clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568599861



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -171,13 +171,9 @@ protected void close() {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(30);
+
         if (admin != null) {

Review comment:
       I wanted to keep things focused here. We know that there's an edge case with the producer that can cause it to hang forever in `send`, and we're addressing that. As far as I know, there is no such case with the admin client, so closing the admin client proactively seems unnecessary. Although there could be an issue with a transformation chain blocking, there's no guarantee that invoking `close` on it is going to fix anything since a transform blocked on, e.g., input/output could remain blocked on that even after being `closed` from another thread.
   
   Ultimately, if a similar case comes up where follow-up is necessary, we can consider our options then based on the particulars of the situation. Right now we only have one specific problem to solve, and I think a targeted approach that doesn't unnecessarily change things is best for that.
   
   If nothing else, maybe a comment explaining why the producer gets special treatment here would be beneficial.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568823888



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -171,13 +171,9 @@ protected void close() {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(30);
+
         if (admin != null) {

Review comment:
       Yeah, I see that here that the situation doesn't require any plugins to block, just the framework/clients. I'm sure that had you implemented by refactoring close, someone could come along and leave the comment "is all of this necessary? how about just closing the producer instead?"
   
   I'm fine with this fix as-is, and if we notice a resource leak related to any of the other plugins caused by stalled connectors, we can address them at that time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568190708



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -202,6 +198,8 @@ public void removeMetrics() {
     public void cancel() {
         super.cancel();
         offsetReader.close();
+        // Run on a separate thread to avoid potentially blocking the herder thread
+        new Thread(() -> closeProducer(0)).start();

Review comment:
       nit: i don't want to have to guess what 0 means
   ```suggestion
           new Thread(() -> closeProducer(Duration.ofSeconds(0))).start();
   ```
   also, don't we have an executor for this sort of stuff...

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -171,13 +171,9 @@ protected void close() {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(30);
+
         if (admin != null) {

Review comment:
       Should we also clean up the admin client in a cancel scenario? or is it already closed by this point? Maybe this can be outside of the scope of this ticket, but it seems like it might have a similar leak behavior.
   The transformation chain and retryWithToleranceOperator could also be leaking.
   
   And at that point, is duplicating these calls in `cancel()` better than asynchronously calling `close()` (or some refactored variant without the `task.stop()`)?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +289,42 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskOnNonExistentTopic() throws Exception {
+        connect = connectBuilder
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        StartAndStopLatch stopCounter = connector.expectedStops(1);

Review comment:
       Oh, the failure strings are throwing me for a loop. I thought this was encoding the infinite hanging as an intended behavior, which is ridiculous. 
   
   Is this test asserting that without topic auto-creation enabled, the task stops (fails?) in a timely manner? Could you add a comment to that effect? The name of the test doesn't tell me the expected behavior, only the setup conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568612017



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +289,42 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskOnNonExistentTopic() throws Exception {
+        connect = connectBuilder
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        StartAndStopLatch stopCounter = connector.expectedStops(1);

Review comment:
       It's asserting that shutdown is not blocked indefinitely when we hit the topic-creation-disabled scenario, but not that the task fails. I'll try to make things clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583778193



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -65,24 +65,27 @@
     private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
     private static final int NUM_TASKS = 4;
+    private static final int MESSAGES_PER_POLL = 10;
     private static final String CONNECTOR_NAME = "simple-source";
     private static final String TOPIC_NAME = "test-topic";
 
     private EmbeddedConnectCluster.Builder connectBuilder;
     private EmbeddedConnectCluster connect;
-    Map<String, String> workerProps = new HashMap<>();
-    Properties brokerProps = new Properties();
+    Map<String, String> workerProps;
+    Properties brokerProps;

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#issuecomment-771673592


   Thanks for taking a look @gharris1727. I've tried to address your comments; let me know what you think.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#issuecomment-771673592


   Thanks for taking a look @gharris1727. I've tried to address your comments; let me know what you think.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583796206



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {

Review comment:
       > Right now there aren't any code paths that lead to the worker's executor being shut down.
   
   Hmm, that seems to have been done a long time ago. I wonder if that was an oversight, or whether that was intentional since in Connect the `Worker::stop()` is called when the herder is stopped, which only happens (in Connect) when the shutdown hook is called -- at which point the JVM is terminating anyway. Luckily MM2 works the same way.
   
   But in our test cases that use `EmbeddedConnectCluster`, those tests are not cleaning up all resources of the Worker (and thus Herder) -- we might have threads that still keep running. Seems like we should address that in a different issue. I'll log something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583964769



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {

Review comment:
       I've logged https://issues.apache.org/jira/browse/KAFKA-12380 for shutting down the worker's executor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583964769



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {

Review comment:
       I've logged https://issues.apache.org/jira/browse/KAFKA-12380 for shutting down the worker's executor. Again, it's not an issue in runtime, but a *potential* issue in our tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583777772



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {
+                log.warn("Could not close producer", t);

Review comment:
       Ack, good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r583777564



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -259,6 +267,16 @@ public void execute() {
         }
     }
 
+    private void closeProducer(Duration duration) {
+        if (producer != null) {
+            try {
+                producer.close(duration);
+            } catch (Throwable t) {

Review comment:
       Right now there aren't any code paths that lead to the worker's executor being shut down. One might be added in the future, but it seems a little premature to try to catch it right now and may mislead readers of the code base. Plus, if an `InterruptException` does get generated somehow, it might be worth knowing about as it may indicate unhealthy (or at least unexpected) behavior from the worker, the task, an interceptor, etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org