You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/05/18 17:40:26 UTC

[kafka] branch trunk updated: KAFKA-6566: Improve Connect Resource Cleanup

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee8abb2  KAFKA-6566: Improve Connect Resource Cleanup
ee8abb2 is described below

commit ee8abb2f7053575bd2abec8152907e0642b1d713
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Fri May 18 10:39:34 2018 -0700

    KAFKA-6566: Improve Connect Resource Cleanup
    
    This is a change to improve resource cleanup for sink tasks and source tasks.  Now `Task.stop()` is called from both `WorkerSinkTask.close()` and `WorkerSourceTask.close()`.
    
    It is called from `WorkerXXXTask.close()` since this method is called in the `finally` block of `WorkerTask.run()`, and Connect developers use `stop()` to clean up resources.
    
    Author: Robert Yokota <ra...@gmail.com>
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5020 from rayokota/K6566-improve-connect-resource-cleanup
---
 .../kafka/connect/runtime/WorkerSinkTask.java      | 21 +++++++++++---
 .../kafka/connect/runtime/WorkerSourceTask.java    | 33 +++++++++++++++++++---
 2 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2ba785c..6edcfd4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -148,10 +148,23 @@ class WorkerSinkTask extends WorkerTask {
     protected void close() {
         // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
         // passed in
-        task.stop();
-        if (consumer != null)
-            consumer.close();
-        transformationChain.close();
+        try {
+            task.stop();
+        } catch (Throwable t) {
+            log.warn("Could not stop task", t);
+        }
+        if (consumer != null) {
+            try {
+                consumer.close();
+            } catch (Throwable t) {
+                log.warn("Could not close consumer", t);
+            }
+        }
+        try {
+            transformationChain.close();
+        } catch (Throwable t) {
+            log.warn("Could not close transformation chain", t);
+        }
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index f2cef5a..f17475d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -87,6 +87,7 @@ class WorkerSourceTask extends WorkerTask {
     private Map<String, String> taskConfig;
     private boolean finishedStart = false;
     private boolean startedShutdownBeforeStartCompleted = false;
+    private boolean stopped = false;
 
     public WorkerSourceTask(ConnectorTaskId id,
                             SourceTask task,
@@ -137,8 +138,21 @@ class WorkerSourceTask extends WorkerTask {
 
     @Override
     protected void close() {
-        producer.close(30, TimeUnit.SECONDS);
-        transformationChain.close();
+        if (!shouldPause()) {
+            tryStop();
+        }
+        if (producer != null) {
+            try {
+                producer.close(30, TimeUnit.SECONDS);
+            } catch (Throwable t) {
+                log.warn("Could not close producer", t);
+            }
+        }
+        try {
+            transformationChain.close();
+        } catch (Throwable t) {
+            log.warn("Could not close transformation chain", t);
+        }
     }
 
     @Override
@@ -152,12 +166,23 @@ class WorkerSourceTask extends WorkerTask {
         stopRequestedLatch.countDown();
         synchronized (this) {
             if (finishedStart)
-                task.stop();
+                tryStop();
             else
                 startedShutdownBeforeStartCompleted = true;
         }
     }
 
+    private synchronized void tryStop() {
+        if (!stopped) {
+            try {
+                task.stop();
+                stopped = true;
+            } catch (Throwable t) {
+                log.warn("Could not stop task", t);
+            }
+        }
+    }
+
     @Override
     public void execute() {
         try {
@@ -166,7 +191,7 @@ class WorkerSourceTask extends WorkerTask {
             log.info("{} Source task finished initialization and start", this);
             synchronized (this) {
                 if (startedShutdownBeforeStartCompleted) {
-                    task.stop();
+                    tryStop();
                     return;
                 }
                 finishedStart = true;

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.