You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/24 01:17:38 UTC

[kafka] 01/02: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager… (#8887)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e22a9c67587abae24dec3d8f76c46a9475230a94
Author: feyman2016 <fe...@aliyun.com>
AuthorDate: Tue Jun 23 06:54:35 2020 +0800

    KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager… (#8887)
    
    Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager
---
 .../streams/processor/internals/AbstractTask.java  | 16 -------
 .../streams/processor/internals/StandbyTask.java   |  2 +-
 .../streams/processor/internals/StreamTask.java    |  4 +-
 .../streams/processor/internals/TaskManager.java   | 54 ++++++++++++++--------
 4 files changed, 39 insertions(+), 37 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index f59571b..044825a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.TaskId;
 
 import java.util.Collection;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
@@ -104,21 +103,6 @@ public abstract class AbstractTask implements Task {
         }
     }
 
-    static void executeAndMaybeSwallow(final boolean clean,
-                                       final Runnable runnable,
-                                       final String name,
-                                       final Logger log) {
-        try {
-            runnable.run();
-        } catch (final RuntimeException e) {
-            if (clean) {
-                throw e;
-            } else {
-                log.debug("Ignoring error in unclean {}", name);
-            }
-        }
-    }
-
     @Override
     public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
         this.inputPartitions = topicPartitions;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index b334bc1..ffd09f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -206,7 +206,7 @@ public class StandbyTask extends AbstractTask implements Task {
     private void close(final boolean clean) {
         switch (state()) {
             case SUSPENDED:
-                executeAndMaybeSwallow(
+                TaskManager.executeAndMaybeSwallow(
                     clean,
                     () -> StateManagerUtil.closeStateManager(
                         log,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index eb1bf4b..d6c6ea6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -528,7 +528,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
             case SUSPENDED:
                 // first close state manager (which is idempotent) then close the record collector
                 // if the latter throws and we re-close dirty which would close the state manager again.
-                executeAndMaybeSwallow(
+                TaskManager.executeAndMaybeSwallow(
                     clean,
                     () -> StateManagerUtil.closeStateManager(
                         log,
@@ -542,7 +542,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                     "state manager close",
                     log);
 
-                executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
+                TaskManager.executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
 
                 break;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index b90ed5f..7759d7e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -735,29 +735,23 @@ public class TaskManager {
 
         for (final Task task : tasks.values()) {
             if (task.isActive()) {
-                try {
-                    activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-                } catch (final RuntimeException e) {
-                    if (clean) {
-                        firstException.compareAndSet(null, e);
-                    } else {
-                        log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e);
-                    }
-                }
+                executeAndMaybeSwallow(
+                    clean,
+                    () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+                    e -> firstException.compareAndSet(null, e),
+                    e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+                );
             }
         }
 
         tasks.clear();
 
-        try {
-            activeTaskCreator.closeThreadProducerIfNeeded();
-        } catch (final RuntimeException e) {
-            if (clean) {
-                firstException.compareAndSet(null, e);
-            } else {
-                log.warn("Ignoring an exception while closing thread producer.", e);
-            }
-        }
+        executeAndMaybeSwallow(
+            clean,
+            activeTaskCreator::closeThreadProducerIfNeeded,
+            e -> firstException.compareAndSet(null, e),
+            e -> log.warn("Ignoring an exception while closing thread producer.", e)
+        );
 
         try {
             // this should be called after closing all tasks, to make sure we unlock the task dir for tasks that may
@@ -1042,4 +1036,28 @@ public class TaskManager {
     Set<TaskId> lockedTaskDirectories() {
         return Collections.unmodifiableSet(lockedTaskDirectories);
     }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final java.util.function.Consumer<RuntimeException> actionIfClean,
+                                              final java.util.function.Consumer<RuntimeException> actionIfNotClean) {
+        try {
+            runnable.run();
+        } catch (final RuntimeException e) {
+            if (clean) {
+                actionIfClean.accept(e);
+            } else {
+                actionIfNotClean.accept(e);
+            }
+        }
+    }
+
+    public static void executeAndMaybeSwallow(final boolean clean,
+                                              final Runnable runnable,
+                                              final String name,
+                                              final Logger log) {
+        executeAndMaybeSwallow(clean, runnable, e -> {
+            throw e; },
+            e -> log.debug("Ignoring error in unclean {}", name));
+    }
 }