You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/24 14:46:40 UTC

[GitHub] [flink] XComp commented on a change in pull request #18303: [FLINK-25085][runtime] Add a scheduled thread pool for periodic tasks in RpcEndpoint

XComp commented on a change in pull request #18303:
URL: https://github.com/apache/flink/pull/18303#discussion_r790685562



##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
##########
@@ -42,6 +43,7 @@
     private final UnfencedMainThreadExecutor unfencedMainThreadExecutor;
     private volatile F fencingToken;
     private volatile MainThreadExecutor fencedMainThreadExecutor;
+    private volatile MainScheduledExecutor mainScheduledExecutor;

Review comment:
       I think we should stick to the naming pattern introduced by `fencedMainThreadExecutor`. I struggled initially to understand why we have that one in both, `RpcEndpoint` and `FencedRpcEndpoint`. Renaming it to `fencedMainScheduledExecutor` would make the distinction clearer.

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -404,14 +459,15 @@ public void validateRunsInMainThread() {
 
         private final MainThreadExecutable gateway;
         private final Runnable mainThreadCheck;
+        private final MainScheduledExecutor mainScheduledExecutor;
 
-        MainThreadExecutor(MainThreadExecutable gateway, Runnable mainThreadCheck) {
+        MainThreadExecutor(
+                MainThreadExecutable gateway,
+                Runnable mainThreadCheck,
+                MainScheduledExecutor mainScheduledExecutor) {
             this.gateway = Preconditions.checkNotNull(gateway);
             this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
-        }
-
-        private void scheduleRunAsync(Runnable runnable, long delayMillis) {
-            gateway.scheduleRunAsync(runnable, delayMillis);
+            this.mainScheduledExecutor = mainScheduledExecutor;

Review comment:
       ```suggestion
               this.mainScheduledExecutor = Preconditions.checkNotNull(mainScheduledExecutor);
   ```
   nit: for consistency reasons

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/EndpointCloseableRegistryTest.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.util.TestLogger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Validate the given {@link CloseableRegistry} in test case. */
+public abstract class EndpointCloseableRegistryTest extends TestLogger {

Review comment:
       Just my opinion here: I'm not sure whether adding inheritance here is the best way to implement the utility methods. What about providing a utility class for this and providing the `validate*` methods as static utility methods? IMHO, inheritance especially in test classes makes reading the code harder.

##########
File path: flink-rpc/flink-rpc-core/src/test/java/org/apache/flink/runtime/concurrent/MainScheduledExecutorTest.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Unit tests for {@link MainScheduledExecutor}. */
+public class MainScheduledExecutorTest {
+    /** Test schedule runnable. */
+    @Test
+    public void testScheduleRunnable() throws Exception {
+        MainScheduledExecutor mainScheduledExecutor =
+                new MainScheduledExecutor(new TestRunnableMainThreadExecutable());
+        final int timeDelay = 1;

Review comment:
       1 second is quite a delay for such a test to succeed

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/MainScheduledExecutor.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The main scheduled executor will manage the scheduled tasks. When the specified time arrives, the
+ * executor will send these tasks to gateway and execute them.
+ */
+public class MainScheduledExecutor implements ScheduledExecutor, Closeable {

Review comment:
       What about calling it `OnMainThreadScheduledExecutor`? That would make the class name more descriptive.

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
##########
@@ -211,11 +232,40 @@ protected final void stop() {
      */
     public final CompletableFuture<Void> internalCallOnStop() {
         validateRunsInMainThread();
+        try {
+            resourceRegistry.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Close resource registry fail", e);
+        }
         CompletableFuture<Void> stopFuture = onStop();
         isRunning = false;
         return stopFuture;
     }
 
+    /**
+     * Register the given closeable resource to {@link CloseableRegistry}.
+     *
+     * @param closeableResource the given closeable resource
+     */
+    protected void registerResource(Closeable closeableResource) {
+        try {
+            resourceRegistry.registerCloseable(closeableResource);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Registry closeable resource " + closeableResource + " fail", e);
+        }
+    }
+
+    /**
+     * Unregister the given closeable resource from {@link CloseableRegistry}.
+     *
+     * @param closeableResource the given closeable resource
+     * @return true if the given resource register successful, otherwise false

Review comment:
       ```suggestion
        * @return true if the given resource unregister successful, otherwise false
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/EndpointCloseableRegistryTest.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.util.TestLogger;
+
+import static org.junit.Assert.assertEquals;

Review comment:
       FYI: Recently, there was some discussion on the dev mailing list where we agreed switching to JUnit5 and assertj for new tests at least. But I guess, that would require also refactoring `FencedRpcEndpointTest` and `RpcEndpointTest`

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ThrowingScheduledFuture.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Completed {@link ScheduledFuture} implementation.
+ *
+ * @param <T> type of the {@link ScheduledFuture}
+ */
+public final class ThrowingScheduledFuture<T> implements ScheduledFuture<T> {
+    private static final ThrowingScheduledFuture instance = new ThrowingScheduledFuture();
+
+    private ThrowingScheduledFuture() {}
+
+    @Override
+    public long getDelay(@Nonnull TimeUnit unit) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+        return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return true;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return true;
+    }
+
+    @Override
+    public boolean isDone() {
+        return true;
+    }
+
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public T get(long timeout, @Nonnull TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        throw new UnsupportedOperationException();
+    }
+
+    public static <T> ThrowingScheduledFuture<T> create() {

Review comment:
       ```suggestion
       public static <T> ThrowingScheduledFuture<T> getInstance() {
   ```
   nit: The `create` method is a bit misleading here. We're actually implementing a singleton returning the same instance. Renaming the method might improve code-readability.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/EndpointCloseableRegistryTest.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.util.TestLogger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Validate the given {@link CloseableRegistry} in test case. */
+public abstract class EndpointCloseableRegistryTest extends TestLogger {
+    /**
+     * Validate the registered closeable resource count of the given {@link CloseableRegistry}.
+     *
+     * @param registeredCount registered closeable resource count in given {@link CloseableRegistry}
+     * @param closeableRegistry the given {@link CloseableRegistry}
+     */
+    protected void validateRegisteredResourceCount(
+            int registeredCount, CloseableRegistry closeableRegistry) {

Review comment:
       ```suggestion
       protected void assertRegisteredResourceCount(
               int expectedRegisteredCount, CloseableRegistry closeableRegistry) {
   ```
   nit: Just an idea - we could align the names of custom methods and parameters a bit more with the junit naming scheme to make it more obvious what this method does when going over the test code

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapter.java
##########
@@ -44,6 +45,12 @@
     /** The uid sequence generator. */
     private static final AtomicLong SEQUENCE_GEN = new AtomicLong();
 
+    /**
+     * The encapsulated scheduled future to which task scheduled by {@link
+     * ScheduledThreadPoolExecutor}.
+     */
+    @Nonnull private final ScheduledFuture<?> scheduledFuture;

Review comment:
       Could we remove the `@Nonnull` annotations in this class entirely? Right now, we're using them inconsistently (the constructor parameter `scheduledFuture` is not annotated with it but the member variable is. Removing the annotation entirely would leave us in a valid consistent state again.

##########
File path: flink-rpc/flink-rpc-core/src/test/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapterTest.java
##########
@@ -27,19 +28,23 @@
 import javax.annotation.Nonnull;
 
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /** Unit tests for {@link ScheduledFutureAdapter}. */
 public class ScheduledFutureAdapterTest extends TestLogger {
 
+    private ScheduledFuture<?> scheduledFuture;
     private ScheduledFutureAdapter<Integer> objectUnderTest;
     private TestFuture innerDelegate;
 
     @Before
     public void before() {
+        this.scheduledFuture = CompletedScheduledFuture.create(null);
         this.innerDelegate = new TestFuture();
         this.objectUnderTest =
-                new ScheduledFutureAdapter<>(innerDelegate, 4200000321L, TimeUnit.NANOSECONDS);
+                new ScheduledFutureAdapter<>(
+                        scheduledFuture, innerDelegate, 4200000321L, TimeUnit.NANOSECONDS);
     }
 
     @Test

Review comment:
       Should we add test cases for when the scheduled task is cancelled but the delegate didn't run, yet?

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ThrowingScheduledFuture.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Completed {@link ScheduledFuture} implementation.
+ *
+ * @param <T> type of the {@link ScheduledFuture}
+ */
+public final class ThrowingScheduledFuture<T> implements ScheduledFuture<T> {
+    private static final ThrowingScheduledFuture instance = new ThrowingScheduledFuture();

Review comment:
       ```suggestion
       private static final ThrowingScheduledFuture INSTANCE = new ThrowingScheduledFuture();
   ```
   nit

##########
File path: flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ScheduledFutureAdapter.java
##########
@@ -94,17 +108,19 @@ public int compareTo(@Nonnull Delayed o) {
 
     @Override
     public boolean cancel(boolean mayInterruptIfRunning) {
-        return delegate.cancel(mayInterruptIfRunning);
+        return (scheduledFuture.cancel(mayInterruptIfRunning) || scheduledFuture.isDone())

Review comment:
       ```suggestion
           return (scheduledFuture.isDone() || scheduledFuture.cancel(mayInterruptIfRunning))
   ```
   Would it make sense to swap the sub-conditions here to avoid running the `cancel` method again? Not sure about whether it actually makes a difference. I just wanted to point it out...

##########
File path: flink-rpc/flink-rpc-core/src/test/java/org/apache/flink/runtime/concurrent/MainScheduledExecutorTest.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+
+import org.junit.Test;

Review comment:
       We might want to use JUnit5 here.

##########
File path: flink-rpc/flink-rpc-core/src/test/java/org/apache/flink/runtime/concurrent/MainScheduledExecutorTest.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;

Review comment:
       We might want to use AssertJ here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org