You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/11 11:45:23 UTC

[GitHub] [flink] xintongsong commented on a change in pull request #18303: [FLINK-25085][runtime] Add a scheduled thread pool for periodic tasks in RpcEndpoint

xintongsong commented on a change in pull request #18303:
URL: https://github.com/apache/flink/pull/18303#discussion_r824450130



##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
##########
@@ -80,9 +82,25 @@ protected void setFencingToken(@Nullable F newFencingToken) {
         // which is bound to the new fencing token
         MainThreadExecutable mainThreadExecutable =
                 getRpcService().fenceRpcServer(rpcServer, newFencingToken);
+        setFencedMainThreadExecutor(
+                new MainThreadExecutor(
+                        mainThreadExecutable, this::validateRunsInMainThread, getEndpointId()));
+    }
 
-        this.fencedMainThreadExecutor =
-                new MainThreadExecutor(mainThreadExecutable, this::validateRunsInMainThread);
+    /**
+     * Set fenced main thread executor and register it to closeable register.
+     *
+     * @param fencedMainThreadExecutor the given fenced main thread executor
+     */
+    private void setFencedMainThreadExecutor(MainThreadExecutor fencedMainThreadExecutor) {
+        if (this.fencedMainThreadExecutor != null) {
+            this.fencedMainThreadExecutor.close();
+            if (!unregisterResource(this.fencedMainThreadExecutor)) {
+                throw new RuntimeException("Unregister resource failed");

Review comment:
       Throwing a `RuntimeException` is probably not necessary. It looks like `unregisterResource` can return `false` only if 1) the resource passed in is `null` or 2) the resource has already been unregistered.
   
   

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -453,5 +563,18 @@ public void execute(@Nonnull Runnable command) {
         public void assertRunningInMainThread() {
             mainThreadCheck.run();
         }
+
+        /** Shutdown the {@link ScheduledThreadPoolExecutor} and remove all the pending tasks. */
+        @Override
+        public void close() {
+            if (!mainScheduledExecutor.isShutdown()) {
+                mainScheduledExecutor.shutdownNow();
+            }
+        }
+
+        @VisibleForTesting
+        ScheduledThreadPoolExecutor getMainScheduledExecutor() {
+            return mainScheduledExecutor;
+        }

Review comment:
       I'd avoid exposing internal things for tests whenever it's possible, especially for non-readonly things.
   
   I noticed this is only used for `RpcEndpointTest#testCancelScheduledTask()`. See my other comment there.

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -400,39 +454,95 @@ public void validateRunsInMainThread() {
     // ------------------------------------------------------------------------
 
     /** Executor which executes runnables in the main thread context. */
-    protected static class MainThreadExecutor implements ComponentMainThreadExecutor {
+    protected static class MainThreadExecutor implements ComponentMainThreadExecutor, Closeable {
+        private static final Logger log = LoggerFactory.getLogger(MainThreadExecutor.class);
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
-
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) {
+        /**
+         * The main scheduled executor manages the scheduled tasks and send them to gateway when
+         * they should be executed.
+         */
+        private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.mainScheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory(endpointId + "-main-scheduler"));
+            this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);

Review comment:
       ```suggestion
               this.mainScheduledExecutor =
                       Executors.newSingleThreadScheduledExecutor(
                               new ExecutorThreadFactory(endpointId + "-main-scheduler"));
   ```

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -211,11 +227,44 @@ protected final void stop() {
      */
     public final CompletableFuture<Void> internalCallOnStop() {
         validateRunsInMainThread();
-        CompletableFuture<Void> stopFuture = onStop();
+        CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+        try {
+            resourceRegistry.close();
+            stopFuture.complete(null);
+        } catch (IOException e) {
+            stopFuture.completeExceptionally(
+                    new RuntimeException("Close resource registry fail", e));
+            return stopFuture;
+        }
+        stopFuture = stopFuture.thenCompose(v -> onStop());

Review comment:
       ```suggestion
           stopFuture = CompletableFuture.allOf(stopFuture, onStop());
   ```
   
   This makes sure both `resourceRegistry.close()` and `onStop()` are always invoked.

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -400,39 +454,95 @@ public void validateRunsInMainThread() {
     // ------------------------------------------------------------------------
 
     /** Executor which executes runnables in the main thread context. */
-    protected static class MainThreadExecutor implements ComponentMainThreadExecutor {
+    protected static class MainThreadExecutor implements ComponentMainThreadExecutor, Closeable {
+        private static final Logger log = LoggerFactory.getLogger(MainThreadExecutor.class);
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
-
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) {
+        /**
+         * The main scheduled executor manages the scheduled tasks and send them to gateway when
+         * they should be executed.
+         */
+        private final ScheduledThreadPoolExecutor mainScheduledExecutor;

Review comment:
       Since this is always single-threaded, we don't need a pool.
   ```suggestion
           private final ScheduledExecutorService mainScheduledExecutor;
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
##########
@@ -309,22 +354,83 @@ public void testScheduleCallableWithDelayInSeconds() throws Exception {
                                 () -> 1, expectedDelay.toMillis() / 1000, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testScheduleCallableAfterClose() throws Exception {
+        testScheduleAfterClose(
+                (mainThreadExecutor, expectedDelay) ->
+                        mainThreadExecutor.schedule(
+                                () -> 1, expectedDelay.toMillis() / 1000, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testCancelScheduledCallable() {
+        testCancelScheduledTask(
+                (mainThreadExecutor, timeDelay) ->
+                        mainThreadExecutor.schedule(
+                                () -> 1, timeDelay.toMillis(), TimeUnit.MILLISECONDS));
+    }
+
     private static void testScheduleWithDelay(
             BiConsumer<RpcEndpoint.MainThreadExecutor, Duration> scheduler) throws Exception {
-        final CompletableFuture<Long> actualDelayMsFuture = new CompletableFuture<>();
+        final CompletableFuture<Void> taskCompletedFuture = new CompletableFuture<>();
+        final String endpointId = "foobar";
 
         final MainThreadExecutable mainThreadExecutable =
-                new TestMainThreadExecutable(
-                        (runnable, delay) -> actualDelayMsFuture.complete(delay));
+                new TestMainThreadExecutable((runnable) -> taskCompletedFuture.complete(null));
 
         final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
-                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {});
+                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId);
 
         final Duration expectedDelay = Duration.ofSeconds(1);
 
         scheduler.accept(mainThreadExecutor, expectedDelay);
 
-        assertThat(actualDelayMsFuture.get(), is(expectedDelay.toMillis()));
+        taskCompletedFuture.get();
+        mainThreadExecutor.close();
+    }
+
+    private static void testScheduleAfterClose(
+            BiFunction<RpcEndpoint.MainThreadExecutor, Duration, ScheduledFuture<?>> scheduler) {
+        final CompletableFuture<Void> taskCompletedFuture = new CompletableFuture<>();
+        final String endpointId = "foobar";
+
+        final Duration expectedDelay = Duration.ofSeconds(1);
+        final MainThreadExecutable mainThreadExecutable =
+                new TestMainThreadExecutable((runnable) -> taskCompletedFuture.complete(null));
+
+        final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
+                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId);
+
+        mainThreadExecutor.close();
+        ScheduledFuture<?> future = scheduler.apply(mainThreadExecutor, expectedDelay);
+
+        assertTrue(future.isDone());
+        assertTrue(future instanceof ThrowingScheduledFuture);
+        assertTrue(future.cancel(true));
+        assertTrue(future.cancel(false));
+        assertFalse(taskCompletedFuture.isDone());
+    }
+
+    private static void testCancelScheduledTask(
+            BiFunction<RpcEndpoint.MainThreadExecutor, Duration, ScheduledFuture<?>> scheduler) {
+        final CompletableFuture<Void> actualDelayMsFuture = new CompletableFuture<>();
+        final String endpointId = "foobar";
+
+        final MainThreadExecutable mainThreadExecutable =
+                new TestMainThreadExecutable((runnable) -> actualDelayMsFuture.complete(null));
+
+        final RpcEndpoint.MainThreadExecutor mainThreadExecutor =
+                new RpcEndpoint.MainThreadExecutor(mainThreadExecutable, () -> {}, endpointId);
+        final Duration timeDelay = Duration.ofSeconds(10);
+        ScheduledFuture<?> scheduledFuture = scheduler.apply(mainThreadExecutor, timeDelay);
+
+        assertEquals(1, mainThreadExecutor.getMainScheduledExecutor().getQueue().size());
+        scheduledFuture.cancel(true);
+        mainThreadExecutor.close();
+
+        assertTrue(scheduledFuture.isCancelled());
+        assertTrue(mainThreadExecutor.getMainScheduledExecutor().getQueue().isEmpty());
+        assertFalse(actualDelayMsFuture.isDone());

Review comment:
       I see two problems in this test:
   
   1. We should verify that `(runnable) -> actualDelayMsFuture.complete(null)` will not be executed, rather than verifying it's not passed to `mainThreadExecutor.runAsync()`. Actually, canceling `scheduledFuture` will not stop it from being scheduled to `mainThreadExecutor`. It is `FutureTask` that guarantees itself won't be executed once canceled.
   
   Currently, the test will fail if you remove `mainThreadExecutor.close()`, because it is a `() -> {}` or `() -> 1` that actually wrapped int the `scheduledFuture`, thus cancelling it will not stop `actualDelayMsFuture` from being completed.
   
   2. Another challenge here is to make sure task is not executed before being canceled.
   - If we schedule with a short delay, the test might be unstable because the task can finish before canceling.
   - If we schedule with a large delay, we would need to wait longer to see if the task is performed, if not looking into the internal queue of the executor.
   
   I would suggest to use a short delay, while use `assumeTrue` to skip the test if the task is occasionally finished before canceling. You can find out whether the task has been finished before canceling from the return value of `cancel()`.

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -401,38 +451,90 @@ public void validateRunsInMainThread() {
 
     /** Executor which executes runnables in the main thread context. */
     protected static class MainThreadExecutor implements ComponentMainThreadExecutor {
+        private static final Logger log = LoggerFactory.getLogger(MainThreadExecutor.class);
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
-
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) {
+        /**
+         * The main scheduled executor manages the scheduled tasks and send them to gateway when
+         * they should be executed.
+         */
+        private final ScheduledThreadPoolExecutor mainScheduledExecutor;
+
+        MainThreadExecutor(
+                MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.mainScheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new ExecutorThreadFactory(endpointId + "-main-scheduler"));
+            this.mainScheduledExecutor.setRemoveOnCancelPolicy(true);
         }
 
         @Override
         public void execute(@Nonnull Runnable command) {
             gateway.runAsync(command);
         }
 
+        /**
+         * The mainScheduledExecutor manages the task and sends it to the gateway after the given
+         * delay.
+         *
+         * @param command the task to execute in the future
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @return a ScheduledFuture representing the completion of the scheduled task
+         */
         @Override
         public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
-            FutureTask<Void> ft = new FutureTask<>(command, null);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);
+            // If the scheduled executor service is shutdown, the command won't be executed.
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and return throwing scheduled future for command {}",
+                        command);
+                return ThrowingScheduledFuture.getInstance();
+            } else {
+                final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
+                FutureTask<Void> ft = new FutureTask<>(command, null);
+                ScheduledFuture<?> scheduledFuture =
+                        mainScheduledExecutor.schedule(
+                                () -> gateway.scheduleRunAsync(ft, 0L),
+                                delayMillis,
+                                TimeUnit.MILLISECONDS);
+                return new ScheduledFutureAdapter<>(
+                        scheduledFuture, ft, delayMillis, TimeUnit.MILLISECONDS);
+            }
         }
 
+        /**
+         * The mainScheduledExecutor manages the given callable and sends it to the gateway after
+         * the given delay. The result of the callable is returned as a {@link ScheduledFuture}.
+         *
+         * @param callable the callable to execute
+         * @param delay the time from now to delay the execution
+         * @param unit the time unit of the delay parameter
+         * @param <V> result type of the callable
+         * @return a ScheduledFuture which holds the future value of the given callable
+         */
         @Override
         public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-            final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
-            FutureTask<V> ft = new FutureTask<>(callable);
-            scheduleRunAsync(ft, delayMillis);
-            return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);
+            // If the scheduled executor service is shutdown, the callable won't be executed.
+            if (mainScheduledExecutor.isShutdown()) {
+                log.warn(
+                        "The scheduled executor service is shutdown and return throwing scheduled future for callable {}",
+                        callable);
+                return ThrowingScheduledFuture.getInstance();

Review comment:
       @zjureel,
   
   I think my major point was to figure out what was the previous behavior (when `schedule` is called on a closed endpoint) and to align with it. Looking at my previous comment now, I realize I haven't make that clear.
   - If it is expected to fail explicitly, we should not rely on the returned future which can easily get ignored.
   - If it is expected to be ignored, we won't need to return a special future.
   
   I looked a bit more into how `schedule` behaves previously. It seems I was wrong about throwing a `RuntimeException`. In fact, commands scheduled on a closed endpoint will simply be ignored (with some info logs in `AkkaRpcActor`).
   
   Therefore, I'd suggest not to introduce the special class `ThrowingScheduledFuture`, and simply log and ignore the command if `mainScheduledExecutor` is shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org