You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/07 13:17:40 UTC

[flink] 01/02: [hotfix][state/changelog] Schedule timeouts on a separate thread

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 276a40a87698b8948153c6181180c930b33fe305
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Feb 28 16:27:58 2022 +0100

    [hotfix][state/changelog] Schedule timeouts on a separate thread
---
 .../flink/changelog/fs/RetryingExecutor.java       | 67 +++++++++++++++-------
 1 file changed, 45 insertions(+), 22 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
index b9bc023..f54d02b 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -39,17 +40,28 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 class RetryingExecutor implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class);
 
-    private final ScheduledExecutorService scheduler;
+    private final ScheduledExecutorService timer; // schedule timeouts
+    private final ScheduledExecutorService blockingExecutor; // schedule and run actual uploads
     private final Histogram attemptsPerTaskHistogram;
 
     RetryingExecutor(int nThreads, Histogram attemptsPerTaskHistogram) {
         this(
-                SchedulerFactory.create(nThreads, "ChangelogRetryScheduler", LOG),
+                SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG),
+                SchedulerFactory.create(nThreads, "ChangelogBlockingExecutor", LOG),
                 attemptsPerTaskHistogram);
     }
 
-    RetryingExecutor(ScheduledExecutorService scheduler, Histogram attemptsPerTaskHistogram) {
-        this.scheduler = scheduler;
+    @VisibleForTesting
+    RetryingExecutor(ScheduledExecutorService executor, Histogram attemptsPerTaskHistogram) {
+        this(executor, executor, attemptsPerTaskHistogram);
+    }
+
+    RetryingExecutor(
+            ScheduledExecutorService timer,
+            ScheduledExecutorService blockingExecutor,
+            Histogram attemptsPerTaskHistogram) {
+        this.timer = timer;
+        this.blockingExecutor = blockingExecutor;
         this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
     }
 
@@ -62,16 +74,21 @@ class RetryingExecutor implements AutoCloseable {
     void execute(RetryPolicy retryPolicy, RetriableAction action) {
         LOG.debug("execute with retryPolicy: {}", retryPolicy);
         RetriableTask task =
-                new RetriableTask(action, retryPolicy, scheduler, attemptsPerTaskHistogram);
-        scheduler.submit(task);
+                new RetriableTask(
+                        action, retryPolicy, blockingExecutor, attemptsPerTaskHistogram, timer);
+        blockingExecutor.submit(task);
     }
 
     @Override
     public void close() throws Exception {
         LOG.debug("close");
-        scheduler.shutdownNow();
-        if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
-            LOG.warn("Unable to cleanly shutdown executorService in 1s");
+        timer.shutdownNow();
+        if (!timer.awaitTermination(1, TimeUnit.SECONDS)) {
+            LOG.warn("Unable to cleanly shutdown scheduler in 1s");
+        }
+        blockingExecutor.shutdownNow();
+        if (!blockingExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+            LOG.warn("Unable to cleanly shutdown blockingExecutor in 1s");
         }
     }
 
@@ -85,7 +102,8 @@ class RetryingExecutor implements AutoCloseable {
 
     private static final class RetriableTask implements Runnable {
         private final RetriableAction runnable;
-        private final ScheduledExecutorService executorService;
+        private final ScheduledExecutorService blockingExecutor;
+        private final ScheduledExecutorService timer;
         private final int current;
         private final RetryPolicy retryPolicy;
         /**
@@ -107,15 +125,17 @@ class RetryingExecutor implements AutoCloseable {
         RetriableTask(
                 RetriableAction runnable,
                 RetryPolicy retryPolicy,
-                ScheduledExecutorService executorService,
-                Histogram attemptsPerTaskHistogram) {
+                ScheduledExecutorService blockingExecutor,
+                Histogram attemptsPerTaskHistogram,
+                ScheduledExecutorService timer) {
             this(
                     1,
                     new AtomicBoolean(false),
                     runnable,
                     retryPolicy,
-                    executorService,
-                    attemptsPerTaskHistogram);
+                    blockingExecutor,
+                    attemptsPerTaskHistogram,
+                    timer);
         }
 
         private RetriableTask(
@@ -123,14 +143,16 @@ class RetryingExecutor implements AutoCloseable {
                 AtomicBoolean actionCompleted,
                 RetriableAction runnable,
                 RetryPolicy retryPolicy,
-                ScheduledExecutorService executorService,
-                Histogram attemptsPerTaskHistogram) {
+                ScheduledExecutorService blockingExecutor,
+                Histogram attemptsPerTaskHistogram,
+                ScheduledExecutorService timer) {
             this.current = current;
             this.runnable = runnable;
             this.retryPolicy = retryPolicy;
-            this.executorService = executorService;
+            this.blockingExecutor = blockingExecutor;
             this.actionCompleted = actionCompleted;
             this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
+            this.timer = timer;
         }
 
         @Override
@@ -157,9 +179,9 @@ class RetryingExecutor implements AutoCloseable {
             if (attemptTransition && !actionCompleted.get()) {
                 long nextAttemptDelay = retryPolicy.retryAfter(current, e);
                 if (nextAttemptDelay == 0L) {
-                    executorService.submit(next());
+                    blockingExecutor.submit(next());
                 } else if (nextAttemptDelay > 0L) {
-                    executorService.schedule(next(), nextAttemptDelay, MILLISECONDS);
+                    blockingExecutor.schedule(next(), nextAttemptDelay, MILLISECONDS);
                 } else {
                     actionCompleted.set(true);
                 }
@@ -172,8 +194,9 @@ class RetryingExecutor implements AutoCloseable {
                     actionCompleted,
                     runnable,
                     retryPolicy,
-                    executorService,
-                    attemptsPerTaskHistogram);
+                    blockingExecutor,
+                    attemptsPerTaskHistogram,
+                    timer);
         }
 
         private Optional<ScheduledFuture<?>> scheduleTimeout() {
@@ -181,7 +204,7 @@ class RetryingExecutor implements AutoCloseable {
             return timeout <= 0
                     ? Optional.empty()
                     : Optional.of(
-                            executorService.schedule(
+                            timer.schedule(
                                     () -> handleError(fmtError(timeout)), timeout, MILLISECONDS));
         }