You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/07 13:17:40 UTC
[flink] 01/02: [hotfix][state/changelog] Schedule timeouts on a separate thread
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 276a40a87698b8948153c6181180c930b33fe305
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Feb 28 16:27:58 2022 +0100
[hotfix][state/changelog] Schedule timeouts on a separate thread
---
.../flink/changelog/fs/RetryingExecutor.java | 67 +++++++++++++++-------
1 file changed, 45 insertions(+), 22 deletions(-)
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
index b9bc023..f54d02b 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
@@ -17,6 +17,7 @@
package org.apache.flink.changelog.fs;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.util.function.RunnableWithException;
@@ -39,17 +40,28 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
class RetryingExecutor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class);
- private final ScheduledExecutorService scheduler;
+ private final ScheduledExecutorService timer; // schedule timeouts
+ private final ScheduledExecutorService blockingExecutor; // schedule and run actual uploads
private final Histogram attemptsPerTaskHistogram;
RetryingExecutor(int nThreads, Histogram attemptsPerTaskHistogram) {
this(
- SchedulerFactory.create(nThreads, "ChangelogRetryScheduler", LOG),
+ SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG),
+ SchedulerFactory.create(nThreads, "ChangelogBlockingExecutor", LOG),
attemptsPerTaskHistogram);
}
- RetryingExecutor(ScheduledExecutorService scheduler, Histogram attemptsPerTaskHistogram) {
- this.scheduler = scheduler;
+ @VisibleForTesting
+ RetryingExecutor(ScheduledExecutorService executor, Histogram attemptsPerTaskHistogram) {
+ this(executor, executor, attemptsPerTaskHistogram);
+ }
+
+ RetryingExecutor(
+ ScheduledExecutorService timer,
+ ScheduledExecutorService blockingExecutor,
+ Histogram attemptsPerTaskHistogram) {
+ this.timer = timer;
+ this.blockingExecutor = blockingExecutor;
this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
}
@@ -62,16 +74,21 @@ class RetryingExecutor implements AutoCloseable {
void execute(RetryPolicy retryPolicy, RetriableAction action) {
LOG.debug("execute with retryPolicy: {}", retryPolicy);
RetriableTask task =
- new RetriableTask(action, retryPolicy, scheduler, attemptsPerTaskHistogram);
- scheduler.submit(task);
+ new RetriableTask(
+ action, retryPolicy, blockingExecutor, attemptsPerTaskHistogram, timer);
+ blockingExecutor.submit(task);
}
@Override
public void close() throws Exception {
LOG.debug("close");
- scheduler.shutdownNow();
- if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
- LOG.warn("Unable to cleanly shutdown executorService in 1s");
+ timer.shutdownNow();
+ if (!timer.awaitTermination(1, TimeUnit.SECONDS)) {
+ LOG.warn("Unable to cleanly shutdown scheduler in 1s");
+ }
+ blockingExecutor.shutdownNow();
+ if (!blockingExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+ LOG.warn("Unable to cleanly shutdown blockingExecutor in 1s");
}
}
@@ -85,7 +102,8 @@ class RetryingExecutor implements AutoCloseable {
private static final class RetriableTask implements Runnable {
private final RetriableAction runnable;
- private final ScheduledExecutorService executorService;
+ private final ScheduledExecutorService blockingExecutor;
+ private final ScheduledExecutorService timer;
private final int current;
private final RetryPolicy retryPolicy;
/**
@@ -107,15 +125,17 @@ class RetryingExecutor implements AutoCloseable {
RetriableTask(
RetriableAction runnable,
RetryPolicy retryPolicy,
- ScheduledExecutorService executorService,
- Histogram attemptsPerTaskHistogram) {
+ ScheduledExecutorService blockingExecutor,
+ Histogram attemptsPerTaskHistogram,
+ ScheduledExecutorService timer) {
this(
1,
new AtomicBoolean(false),
runnable,
retryPolicy,
- executorService,
- attemptsPerTaskHistogram);
+ blockingExecutor,
+ attemptsPerTaskHistogram,
+ timer);
}
private RetriableTask(
@@ -123,14 +143,16 @@ class RetryingExecutor implements AutoCloseable {
AtomicBoolean actionCompleted,
RetriableAction runnable,
RetryPolicy retryPolicy,
- ScheduledExecutorService executorService,
- Histogram attemptsPerTaskHistogram) {
+ ScheduledExecutorService blockingExecutor,
+ Histogram attemptsPerTaskHistogram,
+ ScheduledExecutorService timer) {
this.current = current;
this.runnable = runnable;
this.retryPolicy = retryPolicy;
- this.executorService = executorService;
+ this.blockingExecutor = blockingExecutor;
this.actionCompleted = actionCompleted;
this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
+ this.timer = timer;
}
@Override
@@ -157,9 +179,9 @@ class RetryingExecutor implements AutoCloseable {
if (attemptTransition && !actionCompleted.get()) {
long nextAttemptDelay = retryPolicy.retryAfter(current, e);
if (nextAttemptDelay == 0L) {
- executorService.submit(next());
+ blockingExecutor.submit(next());
} else if (nextAttemptDelay > 0L) {
- executorService.schedule(next(), nextAttemptDelay, MILLISECONDS);
+ blockingExecutor.schedule(next(), nextAttemptDelay, MILLISECONDS);
} else {
actionCompleted.set(true);
}
@@ -172,8 +194,9 @@ class RetryingExecutor implements AutoCloseable {
actionCompleted,
runnable,
retryPolicy,
- executorService,
- attemptsPerTaskHistogram);
+ blockingExecutor,
+ attemptsPerTaskHistogram,
+ timer);
}
private Optional<ScheduledFuture<?>> scheduleTimeout() {
@@ -181,7 +204,7 @@ class RetryingExecutor implements AutoCloseable {
return timeout <= 0
? Optional.empty()
: Optional.of(
- executorService.schedule(
+ timer.schedule(
() -> handleError(fmtError(timeout)), timeout, MILLISECONDS));
}