You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/01 22:55:15 UTC

[GitHub] [kafka] gharris1727 commented on a change in pull request #10016: KAFKA-10340: Proactively close producer when cancelling source tasks

gharris1727 commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568190708



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -202,6 +198,8 @@ public void removeMetrics() {
     public void cancel() {
         super.cancel();
         offsetReader.close();
+        // Run on a separate thread to avoid potentially blocking the herder thread
+        new Thread(() -> closeProducer(0)).start();

Review comment:
       nit: i don't want to have to guess what 0 means
   ```suggestion
           new Thread(() -> closeProducer(Duration.ofSeconds(0))).start();
   ```
   also, don't we have an executor for this sort of stuff...

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -171,13 +171,9 @@ protected void close() {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(30);
+
         if (admin != null) {

Review comment:
       Should we also clean up the admin client in a cancel scenario? or is it already closed by this point? Maybe this can be outside of the scope of this ticket, but it seems like it might have a similar leak behavior.
   The transformation chain and retryWithToleranceOperator could also be leaking.
   
   And at that point, is duplicating these calls in `cancel()` better than asynchronously calling `close()` (or some refactored variant without the `task.stop()`)?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +289,42 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in time.");
     }
 
+    @Test
+    public void testSourceTaskOnNonExistentTopic() throws Exception {
+        connect = connectBuilder
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        StartAndStopLatch stopCounter = connector.expectedStops(1);

Review comment:
       Oh, the failure strings are throwing me for a loop. I thought this was encoding the infinite hanging as an intended behavior, which is ridiculous. 
   
   Is this test asserting that without topic auto-creation enabled, the task stops (fails?) in a timely manner? Could you add a comment to that effect? The name of the test doesn't tell me the expected behavior, only the setup conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org