You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/07 13:17:39 UTC

[flink] branch master updated (b766c3a -> 2292c19)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b766c3a  [FLINK-26303][build] Print rat-plugin violations to the console
     new 276a40a  [hotfix][state/changelog] Schedule timeouts on a separate thread
     new 2292c19  [FLINK-26396][state/changelog] Fail upload if last attempt times out

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../changelog/fs/BatchingStateChangeUploader.java  |   9 +-
 .../flink/changelog/fs/RetryingExecutor.java       | 159 +++++++++++++++------
 .../fs/BatchingStateChangeUploaderTest.java        |  55 +++++++
 .../flink/changelog/fs/RetryingExecutorTest.java   |   3 +-
 .../{log4j2.properties => log4j2-test.properties}  |   0
 5 files changed, 178 insertions(+), 48 deletions(-)
 rename flink-dstl/flink-dstl-dfs/src/test/resources/{log4j2.properties => log4j2-test.properties} (100%)

[flink] 02/02: [FLINK-26396][state/changelog] Fail upload if last attempt times out

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2292c19d5ac83e60d136b6fa8354b85d834c3c23
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Feb 21 20:44:24 2022 +0100

    [FLINK-26396][state/changelog] Fail upload if last attempt times out
---
 .../changelog/fs/BatchingStateChangeUploader.java  |   9 +-
 .../flink/changelog/fs/RetryingExecutor.java       | 128 ++++++++++++++-------
 .../fs/BatchingStateChangeUploaderTest.java        |  55 +++++++++
 .../flink/changelog/fs/RetryingExecutorTest.java   |   3 +-
 .../{log4j2.properties => log4j2-test.properties}  |   0
 5 files changed, 151 insertions(+), 44 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
index c1e353b..a3e339f 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploader.java
@@ -225,7 +225,10 @@ class BatchingStateChangeUploader implements StateChangeUploader {
                 return;
             }
             uploadBatchSizes.update(tasks.size());
-            retryingExecutor.execute(retryPolicy, () -> delegate.upload(tasks));
+            retryingExecutor.execute(
+                    retryPolicy,
+                    () -> delegate.upload(tasks),
+                    t -> tasks.forEach(task -> task.fail(t)));
         } catch (Throwable t) {
             tasks.forEach(task -> task.fail(t));
             if (findThrowable(t, IOException.class).isPresent()) {
@@ -271,14 +274,14 @@ class BatchingStateChangeUploader implements StateChangeUploader {
                     try {
                         releaseCapacity(size);
                     } finally {
-                        uploadTask.successCallback.accept(result);
+                        uploadTask.complete(result);
                     }
                 },
                 (result, error) -> {
                     try {
                         releaseCapacity(size);
                     } finally {
-                        uploadTask.failureCallback.accept(result, error);
+                        uploadTask.fail(error);
                     }
                 });
     }
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
index f54d02b..68f4edb 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
@@ -30,8 +30,11 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
 
 /**
  * A {@link RetriableAction} executor that schedules a next attempt upon timeout based on {@link
@@ -71,25 +74,43 @@ class RetryingExecutor implements AutoCloseable {
      * <p>NOTE: the action must be idempotent because multiple instances of it can be executed
      * concurrently (if the policy allows retries).
      */
-    void execute(RetryPolicy retryPolicy, RetriableAction action) {
+    void execute(
+            RetryPolicy retryPolicy, RetriableAction action, Consumer<Throwable> failureCallback) {
         LOG.debug("execute with retryPolicy: {}", retryPolicy);
         RetriableTask task =
-                new RetriableTask(
-                        action, retryPolicy, blockingExecutor, attemptsPerTaskHistogram, timer);
+                RetriableTask.initialize(
+                        action,
+                        retryPolicy,
+                        blockingExecutor,
+                        attemptsPerTaskHistogram,
+                        timer,
+                        failureCallback);
         blockingExecutor.submit(task);
     }
 
     @Override
     public void close() throws Exception {
         LOG.debug("close");
-        timer.shutdownNow();
+        Exception closeException = null;
+        try {
+            timer.shutdownNow();
+        } catch (Exception e) {
+            closeException = e;
+        }
+        try {
+            blockingExecutor.shutdownNow();
+        } catch (Exception e) {
+            closeException = firstOrSuppressed(e, closeException);
+        }
         if (!timer.awaitTermination(1, TimeUnit.SECONDS)) {
             LOG.warn("Unable to cleanly shutdown scheduler in 1s");
         }
-        blockingExecutor.shutdownNow();
         if (!blockingExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
             LOG.warn("Unable to cleanly shutdown blockingExecutor in 1s");
         }
+        if (closeException != null) {
+            throw closeException;
+        }
     }
 
     /**
@@ -102,6 +123,7 @@ class RetryingExecutor implements AutoCloseable {
 
     private static final class RetriableTask implements Runnable {
         private final RetriableAction runnable;
+        private final Consumer<Throwable> failureCallback;
         private final ScheduledExecutorService blockingExecutor;
         private final ScheduledExecutorService timer;
         private final int current;
@@ -118,25 +140,11 @@ class RetryingExecutor implements AutoCloseable {
          * to prevent double finalization ({@link #handleError}) by the executing thread and
          * timeouting thread.
          */
-        private final AtomicBoolean attemptCompleted = new AtomicBoolean(false);
+        private final AtomicBoolean attemptCompleted;
 
-        private final Histogram attemptsPerTaskHistogram;
+        private final AtomicInteger activeAttempts;
 
-        RetriableTask(
-                RetriableAction runnable,
-                RetryPolicy retryPolicy,
-                ScheduledExecutorService blockingExecutor,
-                Histogram attemptsPerTaskHistogram,
-                ScheduledExecutorService timer) {
-            this(
-                    1,
-                    new AtomicBoolean(false),
-                    runnable,
-                    retryPolicy,
-                    blockingExecutor,
-                    attemptsPerTaskHistogram,
-                    timer);
-        }
+        private final Histogram attemptsPerTaskHistogram;
 
         private RetriableTask(
                 int current,
@@ -144,25 +152,33 @@ class RetryingExecutor implements AutoCloseable {
                 RetriableAction runnable,
                 RetryPolicy retryPolicy,
                 ScheduledExecutorService blockingExecutor,
-                Histogram attemptsPerTaskHistogram,
-                ScheduledExecutorService timer) {
+                ScheduledExecutorService timer,
+                Consumer<Throwable> failureCallback,
+                AtomicInteger activeAttempts,
+                Histogram attemptsPerTaskHistogram) {
             this.current = current;
             this.runnable = runnable;
+            this.failureCallback = failureCallback;
             this.retryPolicy = retryPolicy;
             this.blockingExecutor = blockingExecutor;
             this.actionCompleted = actionCompleted;
             this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
             this.timer = timer;
+            this.activeAttempts = activeAttempts;
+            this.attemptCompleted = new AtomicBoolean(false);
         }
 
         @Override
         public void run() {
+            LOG.debug("starting attempt {}", current);
             if (!actionCompleted.get()) {
                 Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
                 try {
                     runnable.run();
-                    actionCompleted.set(true);
-                    attemptsPerTaskHistogram.update(current);
+                    if (actionCompleted.compareAndSet(false, true)) {
+                        LOG.debug("succeeded with {} attempts", current);
+                        attemptsPerTaskHistogram.update(current);
+                    }
                     attemptCompleted.set(true);
                 } catch (Exception e) {
                     handleError(e);
@@ -173,19 +189,49 @@ class RetryingExecutor implements AutoCloseable {
         }
 
         private void handleError(Exception e) {
-            LOG.info("execution attempt {} failed: {}", current, e.getMessage());
-            // prevent double completion in case of a timeout and another failure
-            boolean attemptTransition = attemptCompleted.compareAndSet(false, true);
-            if (attemptTransition && !actionCompleted.get()) {
-                long nextAttemptDelay = retryPolicy.retryAfter(current, e);
-                if (nextAttemptDelay == 0L) {
-                    blockingExecutor.submit(next());
-                } else if (nextAttemptDelay > 0L) {
-                    blockingExecutor.schedule(next(), nextAttemptDelay, MILLISECONDS);
-                } else {
-                    actionCompleted.set(true);
-                }
+            if (!attemptCompleted.compareAndSet(false, true) || actionCompleted.get()) {
+                // either this attempt was already completed (e.g. timed out);
+                // or another attempt completed the task
+                return;
             }
+            LOG.debug("execution attempt {} failed: {}", current, e.getMessage());
+            long nextAttemptDelay = retryPolicy.retryAfter(current, e);
+            if (nextAttemptDelay >= 0L) {
+                activeAttempts.incrementAndGet();
+                scheduleNext(nextAttemptDelay, next());
+            }
+            if (activeAttempts.decrementAndGet() == 0
+                    && actionCompleted.compareAndSet(false, true)) {
+                LOG.info("failed with {} attempts: {}", current, e.getMessage());
+                failureCallback.accept(e);
+            }
+        }
+
+        private void scheduleNext(long nextAttemptDelay, RetriableTask next) {
+            if (nextAttemptDelay == 0L) {
+                blockingExecutor.submit(next);
+            } else if (nextAttemptDelay > 0L) {
+                blockingExecutor.schedule(next, nextAttemptDelay, MILLISECONDS);
+            }
+        }
+
+        private static RetriableTask initialize(
+                RetriableAction runnable,
+                RetryPolicy retryPolicy,
+                ScheduledExecutorService blockingExecutor,
+                Histogram attemptsPerTaskHistogram,
+                ScheduledExecutorService timer,
+                Consumer<Throwable> failureCallback) {
+            return new RetriableTask(
+                    1,
+                    new AtomicBoolean(false),
+                    runnable,
+                    retryPolicy,
+                    blockingExecutor,
+                    timer,
+                    failureCallback,
+                    new AtomicInteger(1),
+                    attemptsPerTaskHistogram);
         }
 
         private RetriableTask next() {
@@ -195,8 +241,10 @@ class RetryingExecutor implements AutoCloseable {
                     runnable,
                     retryPolicy,
                     blockingExecutor,
-                    attemptsPerTaskHistogram,
-                    timer);
+                    timer,
+                    failureCallback,
+                    activeAttempts,
+                    attemptsPerTaskHistogram);
         }
 
         private Optional<ScheduledFuture<?>> scheduleTimeout() {
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java
index e3d19ca..9f627b7 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.StateChangeUploader.UploadTask;
 import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
@@ -28,9 +29,11 @@ import org.apache.flink.util.function.BiConsumerWithException;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -38,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
@@ -169,6 +173,43 @@ public class BatchingStateChangeUploaderTest {
         }
     }
 
+    @Test
+    public void testUploadTimeout() throws Exception {
+        AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>();
+        UploadTask upload =
+                new UploadTask(getChanges(4), unused -> {}, (sqn, error) -> failed.set(sqn));
+        ManuallyTriggeredScheduledExecutorService scheduler =
+                new ManuallyTriggeredScheduledExecutorService();
+        try (BatchingStateChangeUploader store =
+                new BatchingStateChangeUploader(
+                        0,
+                        0,
+                        Integer.MAX_VALUE,
+                        RetryPolicy.fixed(1, 1, 1),
+                        new BlockingUploader(),
+                        scheduler,
+                        new RetryingExecutor(
+                                1,
+                                createUnregisteredChangelogStorageMetricGroup()
+                                        .getAttemptsPerUpload()),
+                        createUnregisteredChangelogStorageMetricGroup())) {
+            store.upload(upload);
+            Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
+            while (!upload.finished.get() && deadline.hasTimeLeft()) {
+                scheduler.triggerScheduledTasks();
+                scheduler.triggerAll();
+                Thread.sleep(10);
+            }
+        }
+
+        assertTrue(upload.finished.get());
+        assertEquals(
+                upload.changeSets.stream()
+                        .map(StateChangeSet::getSequenceNumber)
+                        .collect(Collectors.toSet()),
+                new HashSet<>(failed.get()));
+    }
+
     @Test(expected = RejectedExecutionException.class)
     public void testErrorHandling() throws Exception {
         TestingStateChangeUploader probe = new TestingStateChangeUploader();
@@ -344,4 +385,18 @@ public class BatchingStateChangeUploaderTest {
         thread.start();
         return Tuple2.of(thread, future);
     }
+
+    private static final class BlockingUploader implements StateChangeUploader {
+        @Override
+        public void upload(UploadTask uploadTask) {
+            try {
+                Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void close() {}
+    }
 }
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
index 4616682..f60437c 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
@@ -176,7 +176,8 @@ public class RetryingExecutorTest {
                         } finally {
                             firstAttemptCompletedLatch.countDown();
                         }
-                    });
+                    },
+                    t -> {});
             firstAttemptCompletedLatch.await(); // before closing executor
         }
         assertEquals(expectedAttempts, attemptsMade.get());
diff --git a/flink-dstl/flink-dstl-dfs/src/test/resources/log4j2.properties b/flink-dstl/flink-dstl-dfs/src/test/resources/log4j2-test.properties
similarity index 100%
rename from flink-dstl/flink-dstl-dfs/src/test/resources/log4j2.properties
rename to flink-dstl/flink-dstl-dfs/src/test/resources/log4j2-test.properties

[flink] 01/02: [hotfix][state/changelog] Schedule timeouts on a separate thread

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 276a40a87698b8948153c6181180c930b33fe305
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Feb 28 16:27:58 2022 +0100

    [hotfix][state/changelog] Schedule timeouts on a separate thread
---
 .../flink/changelog/fs/RetryingExecutor.java       | 67 +++++++++++++++-------
 1 file changed, 45 insertions(+), 22 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
index b9bc023..f54d02b 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -39,17 +40,28 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 class RetryingExecutor implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class);
 
-    private final ScheduledExecutorService scheduler;
+    private final ScheduledExecutorService timer; // schedule timeouts
+    private final ScheduledExecutorService blockingExecutor; // schedule and run actual uploads
     private final Histogram attemptsPerTaskHistogram;
 
     RetryingExecutor(int nThreads, Histogram attemptsPerTaskHistogram) {
         this(
-                SchedulerFactory.create(nThreads, "ChangelogRetryScheduler", LOG),
+                SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG),
+                SchedulerFactory.create(nThreads, "ChangelogBlockingExecutor", LOG),
                 attemptsPerTaskHistogram);
     }
 
-    RetryingExecutor(ScheduledExecutorService scheduler, Histogram attemptsPerTaskHistogram) {
-        this.scheduler = scheduler;
+    @VisibleForTesting
+    RetryingExecutor(ScheduledExecutorService executor, Histogram attemptsPerTaskHistogram) {
+        this(executor, executor, attemptsPerTaskHistogram);
+    }
+
+    RetryingExecutor(
+            ScheduledExecutorService timer,
+            ScheduledExecutorService blockingExecutor,
+            Histogram attemptsPerTaskHistogram) {
+        this.timer = timer;
+        this.blockingExecutor = blockingExecutor;
         this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
     }
 
@@ -62,16 +74,21 @@ class RetryingExecutor implements AutoCloseable {
     void execute(RetryPolicy retryPolicy, RetriableAction action) {
         LOG.debug("execute with retryPolicy: {}", retryPolicy);
         RetriableTask task =
-                new RetriableTask(action, retryPolicy, scheduler, attemptsPerTaskHistogram);
-        scheduler.submit(task);
+                new RetriableTask(
+                        action, retryPolicy, blockingExecutor, attemptsPerTaskHistogram, timer);
+        blockingExecutor.submit(task);
     }
 
     @Override
     public void close() throws Exception {
         LOG.debug("close");
-        scheduler.shutdownNow();
-        if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
-            LOG.warn("Unable to cleanly shutdown executorService in 1s");
+        timer.shutdownNow();
+        if (!timer.awaitTermination(1, TimeUnit.SECONDS)) {
+            LOG.warn("Unable to cleanly shutdown scheduler in 1s");
+        }
+        blockingExecutor.shutdownNow();
+        if (!blockingExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
+            LOG.warn("Unable to cleanly shutdown blockingExecutor in 1s");
         }
     }
 
@@ -85,7 +102,8 @@ class RetryingExecutor implements AutoCloseable {
 
     private static final class RetriableTask implements Runnable {
         private final RetriableAction runnable;
-        private final ScheduledExecutorService executorService;
+        private final ScheduledExecutorService blockingExecutor;
+        private final ScheduledExecutorService timer;
         private final int current;
         private final RetryPolicy retryPolicy;
         /**
@@ -107,15 +125,17 @@ class RetryingExecutor implements AutoCloseable {
         RetriableTask(
                 RetriableAction runnable,
                 RetryPolicy retryPolicy,
-                ScheduledExecutorService executorService,
-                Histogram attemptsPerTaskHistogram) {
+                ScheduledExecutorService blockingExecutor,
+                Histogram attemptsPerTaskHistogram,
+                ScheduledExecutorService timer) {
             this(
                     1,
                     new AtomicBoolean(false),
                     runnable,
                     retryPolicy,
-                    executorService,
-                    attemptsPerTaskHistogram);
+                    blockingExecutor,
+                    attemptsPerTaskHistogram,
+                    timer);
         }
 
         private RetriableTask(
@@ -123,14 +143,16 @@ class RetryingExecutor implements AutoCloseable {
                 AtomicBoolean actionCompleted,
                 RetriableAction runnable,
                 RetryPolicy retryPolicy,
-                ScheduledExecutorService executorService,
-                Histogram attemptsPerTaskHistogram) {
+                ScheduledExecutorService blockingExecutor,
+                Histogram attemptsPerTaskHistogram,
+                ScheduledExecutorService timer) {
             this.current = current;
             this.runnable = runnable;
             this.retryPolicy = retryPolicy;
-            this.executorService = executorService;
+            this.blockingExecutor = blockingExecutor;
             this.actionCompleted = actionCompleted;
             this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
+            this.timer = timer;
         }
 
         @Override
@@ -157,9 +179,9 @@ class RetryingExecutor implements AutoCloseable {
             if (attemptTransition && !actionCompleted.get()) {
                 long nextAttemptDelay = retryPolicy.retryAfter(current, e);
                 if (nextAttemptDelay == 0L) {
-                    executorService.submit(next());
+                    blockingExecutor.submit(next());
                 } else if (nextAttemptDelay > 0L) {
-                    executorService.schedule(next(), nextAttemptDelay, MILLISECONDS);
+                    blockingExecutor.schedule(next(), nextAttemptDelay, MILLISECONDS);
                 } else {
                     actionCompleted.set(true);
                 }
@@ -172,8 +194,9 @@ class RetryingExecutor implements AutoCloseable {
                     actionCompleted,
                     runnable,
                     retryPolicy,
-                    executorService,
-                    attemptsPerTaskHistogram);
+                    blockingExecutor,
+                    attemptsPerTaskHistogram,
+                    timer);
         }
 
         private Optional<ScheduledFuture<?>> scheduleTimeout() {
@@ -181,7 +204,7 @@ class RetryingExecutor implements AutoCloseable {
             return timeout <= 0
                     ? Optional.empty()
                     : Optional.of(
-                            executorService.schedule(
+                            timer.schedule(
                                     () -> handleError(fmtError(timeout)), timeout, MILLISECONDS));
         }