You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/24 01:17:38 UTC
[kafka] 01/02: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager… (#8887)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e22a9c67587abae24dec3d8f76c46a9475230a94
Author: feyman2016 <fe...@aliyun.com>
AuthorDate: Tue Jun 23 06:54:35 2020 +0800
KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager… (#8887)
Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager
---
.../streams/processor/internals/AbstractTask.java | 16 -------
.../streams/processor/internals/StandbyTask.java | 2 +-
.../streams/processor/internals/StreamTask.java | 4 +-
.../streams/processor/internals/TaskManager.java | 54 ++++++++++++++--------
4 files changed, 39 insertions(+), 37 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index f59571b..044825a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.TaskId;
import java.util.Collection;
import java.util.Set;
-import org.slf4j.Logger;
import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
@@ -104,21 +103,6 @@ public abstract class AbstractTask implements Task {
}
}
- static void executeAndMaybeSwallow(final boolean clean,
- final Runnable runnable,
- final String name,
- final Logger log) {
- try {
- runnable.run();
- } catch (final RuntimeException e) {
- if (clean) {
- throw e;
- } else {
- log.debug("Ignoring error in unclean {}", name);
- }
- }
- }
-
@Override
public void update(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> nodeToSourceTopics) {
this.inputPartitions = topicPartitions;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index b334bc1..ffd09f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -206,7 +206,7 @@ public class StandbyTask extends AbstractTask implements Task {
private void close(final boolean clean) {
switch (state()) {
case SUSPENDED:
- executeAndMaybeSwallow(
+ TaskManager.executeAndMaybeSwallow(
clean,
() -> StateManagerUtil.closeStateManager(
log,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index eb1bf4b..d6c6ea6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -528,7 +528,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
case SUSPENDED:
// first close state manager (which is idempotent) then close the record collector
// if the latter throws and we re-close dirty which would close the state manager again.
- executeAndMaybeSwallow(
+ TaskManager.executeAndMaybeSwallow(
clean,
() -> StateManagerUtil.closeStateManager(
log,
@@ -542,7 +542,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
"state manager close",
log);
- executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
+ TaskManager.executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
break;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index b90ed5f..7759d7e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -735,29 +735,23 @@ public class TaskManager {
for (final Task task : tasks.values()) {
if (task.isActive()) {
- try {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- } catch (final RuntimeException e) {
- if (clean) {
- firstException.compareAndSet(null, e);
- } else {
- log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e);
- }
- }
+ executeAndMaybeSwallow(
+ clean,
+ () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+ e -> firstException.compareAndSet(null, e),
+ e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+ );
}
}
tasks.clear();
- try {
- activeTaskCreator.closeThreadProducerIfNeeded();
- } catch (final RuntimeException e) {
- if (clean) {
- firstException.compareAndSet(null, e);
- } else {
- log.warn("Ignoring an exception while closing thread producer.", e);
- }
- }
+ executeAndMaybeSwallow(
+ clean,
+ activeTaskCreator::closeThreadProducerIfNeeded,
+ e -> firstException.compareAndSet(null, e),
+ e -> log.warn("Ignoring an exception while closing thread producer.", e)
+ );
try {
// this should be called after closing all tasks, to make sure we unlock the task dir for tasks that may
@@ -1042,4 +1036,28 @@ public class TaskManager {
Set<TaskId> lockedTaskDirectories() {
return Collections.unmodifiableSet(lockedTaskDirectories);
}
+
+ public static void executeAndMaybeSwallow(final boolean clean,
+ final Runnable runnable,
+ final java.util.function.Consumer<RuntimeException> actionIfClean,
+ final java.util.function.Consumer<RuntimeException> actionIfNotClean) {
+ try {
+ runnable.run();
+ } catch (final RuntimeException e) {
+ if (clean) {
+ actionIfClean.accept(e);
+ } else {
+ actionIfNotClean.accept(e);
+ }
+ }
+ }
+
+ public static void executeAndMaybeSwallow(final boolean clean,
+ final Runnable runnable,
+ final String name,
+ final Logger log) {
+ executeAndMaybeSwallow(clean, runnable, e -> {
+ throw e; },
+ e -> log.debug("Ignoring error in unclean {}", name));
+ }
}