You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/28 07:36:41 UTC

[GitHub] [flink] XComp commented on a diff in pull request #19592: [FLINK-27427][rpc] Remove timeouts from RpcUtils#terminate*

XComp commented on code in PR #19592:
URL: https://github.com/apache/flink/pull/19592#discussion_r860544081


##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)
             throws ExecutionException, InterruptedException, TimeoutException {
-        rpcEndpoint.closeAsync().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcEndpoint.closeAsync().get();
     }
 
     /**
      * Shuts the given {@link RpcEndpoint RpcEndpoints} down and waits for their termination.
      *
      * @param rpcEndpoints to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoints(Time timeout, RpcEndpoint... rpcEndpoints)
+    @VisibleForTesting
+    public static void terminateRpcEndpoints(RpcEndpoint... rpcEndpoints)
             throws InterruptedException, ExecutionException, TimeoutException {
-        terminateAsyncCloseables(Arrays.asList(rpcEndpoints), timeout);
+        terminateAsyncCloseables(Arrays.asList(rpcEndpoints));
     }
 
     /**
      * Shuts the given rpc service down and waits for its termination.
      *
      * @param rpcService to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcService(RpcService rpcService, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcService(RpcService rpcService)

Review Comment:
   I would argue that this method is obsolete because we can do the same with `terminateRpcServices(RpcService...)`. WDYT?



##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)
             throws ExecutionException, InterruptedException, TimeoutException {
-        rpcEndpoint.closeAsync().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcEndpoint.closeAsync().get();
     }
 
     /**
      * Shuts the given {@link RpcEndpoint RpcEndpoints} down and waits for their termination.
      *
      * @param rpcEndpoints to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoints(Time timeout, RpcEndpoint... rpcEndpoints)
+    @VisibleForTesting
+    public static void terminateRpcEndpoints(RpcEndpoint... rpcEndpoints)
             throws InterruptedException, ExecutionException, TimeoutException {
-        terminateAsyncCloseables(Arrays.asList(rpcEndpoints), timeout);
+        terminateAsyncCloseables(Arrays.asList(rpcEndpoints));
     }
 
     /**
      * Shuts the given rpc service down and waits for its termination.
      *
      * @param rpcService to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcService(RpcService rpcService, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcService(RpcService rpcService)
             throws InterruptedException, ExecutionException, TimeoutException {
-        rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcService.stopService().get();
     }
 
     /**
      * Shuts the given rpc services down and waits for their termination.
      *
      * @param rpcServices to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred

Review Comment:
   ```suggestion
   ```
   This applies to the JavaDoc of all public methods that were touched in this class in this PR



##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)
             throws ExecutionException, InterruptedException, TimeoutException {
-        rpcEndpoint.closeAsync().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcEndpoint.closeAsync().get();
     }
 
     /**
      * Shuts the given {@link RpcEndpoint RpcEndpoints} down and waits for their termination.
      *
      * @param rpcEndpoints to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoints(Time timeout, RpcEndpoint... rpcEndpoints)
+    @VisibleForTesting
+    public static void terminateRpcEndpoints(RpcEndpoint... rpcEndpoints)
             throws InterruptedException, ExecutionException, TimeoutException {
-        terminateAsyncCloseables(Arrays.asList(rpcEndpoints), timeout);
+        terminateAsyncCloseables(Arrays.asList(rpcEndpoints));
     }
 
     /**
      * Shuts the given rpc service down and waits for its termination.
      *
      * @param rpcService to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcService(RpcService rpcService, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcService(RpcService rpcService)
             throws InterruptedException, ExecutionException, TimeoutException {
-        rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcService.stopService().get();
     }
 
     /**
      * Shuts the given rpc services down and waits for their termination.
      *
      * @param rpcServices to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcServices(Time timeout, RpcService... rpcServices)
+    @VisibleForTesting
+    public static void terminateRpcServices(RpcService... rpcServices)
             throws InterruptedException, ExecutionException, TimeoutException {
         terminateAsyncCloseables(
                 Arrays.stream(rpcServices)
                         .map(rpcService -> (AutoCloseableAsync) rpcService::stopService)
-                        .collect(Collectors.toList()),
-                timeout);
+                        .collect(Collectors.toList()));
     }
 
     private static void terminateAsyncCloseables(
-            Collection<? extends AutoCloseableAsync> closeables, Time timeout)
+            Collection<? extends AutoCloseableAsync> closeables)
             throws InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   ```suggestion
               throws InterruptedException, ExecutionException {
   ```
   This applies to all methods that were touched in this class in this PR



##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)

Review Comment:
   Same here: I would argue that this method is obsolete because we can do the same with terminateRpcEndpoints(RpcEndpoint...). WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org