You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/29 11:01:00 UTC

[GitHub] [kafka] tombentley commented on a change in pull request #10605: KAFKA-12726 prevent a stuck Task.stop() from blocking subsequent Task.stops()s

tombentley commented on a change in pull request #10605:
URL: https://github.com/apache/kafka/pull/10605#discussion_r622876998



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {
+                    @Override
+                    public void run() {
+                        task.stop();

Review comment:
       If stop throws we won't count down the latch. No harm will result except there will be an erroneous log messages about exceeding the stop timeout.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -824,7 +825,7 @@ private WorkerErrantRecordReporter createWorkerErrantRecordReporter(
         return null;
     }
 
-    private void stopTask(ConnectorTaskId taskId) {
+    private void stopTask(ConnectorTaskId taskId, long timeout) {

Review comment:
       `timeoutMs` would be unambigous

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {

Review comment:
       We should name the thread so that thread dumps are a bit more informative. I _think_ these should be daemon threads because if we're prepared to basically ignore the non-return of `task.stop()` during runtime I don't see why we'd block jvm exit for them. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -839,7 +840,22 @@ private void stopTask(ConnectorTaskId taskId) {
             ClassLoader savedLoader = plugins.currentThreadLoader();
             try {
                 savedLoader = Plugins.compareAndSwapLoaders(task.loader());
-                task.stop();
+                CountDownLatch latch = new CountDownLatch(1);
+                new Thread() {
+                    @Override
+                    public void run() {
+                        task.stop();
+                        latch.countDown();
+                    }
+                }.start();
+                // Wait for thread to terminate, but not longer than timeout.
+                if (timeout <= 0) {

Review comment:
       It's not required to protect the await, but is to get the logging.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org