You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/27 10:25:33 UTC

[GitHub] [flink] zentol opened a new pull request, #19592: [FLINK-27427][rpc] Remove timeouts from RpcUtils#terminate*

zentol opened a new pull request, #19592:
URL: https://github.com/apache/flink/pull/19592

   These methods are only used in tests. We could remove the timeouts (easing [FLINK-27426](https://issues.apache.org/jira/browse/FLINK-27426)), and should annotate them with @VisibleForTesting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19592: [FLINK-27427][rpc] Remove timeouts from RpcUtils#terminate*

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19592:
URL: https://github.com/apache/flink/pull/19592#discussion_r860582562


##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)

Review Comment:
   I had the same feeling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #19592: [FLINK-27427][rpc] Remove timeouts from RpcUtils#terminate*

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #19592:
URL: https://github.com/apache/flink/pull/19592#discussion_r860544081


##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)
             throws ExecutionException, InterruptedException, TimeoutException {
-        rpcEndpoint.closeAsync().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcEndpoint.closeAsync().get();
     }
 
     /**
      * Shuts the given {@link RpcEndpoint RpcEndpoints} down and waits for their termination.
      *
      * @param rpcEndpoints to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoints(Time timeout, RpcEndpoint... rpcEndpoints)
+    @VisibleForTesting
+    public static void terminateRpcEndpoints(RpcEndpoint... rpcEndpoints)
             throws InterruptedException, ExecutionException, TimeoutException {
-        terminateAsyncCloseables(Arrays.asList(rpcEndpoints), timeout);
+        terminateAsyncCloseables(Arrays.asList(rpcEndpoints));
     }
 
     /**
      * Shuts the given rpc service down and waits for its termination.
      *
      * @param rpcService to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcService(RpcService rpcService, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcService(RpcService rpcService)

Review Comment:
   I would argue that this method is obsolete because we can do the same with `terminateRpcServices(RpcService...)`. WDYT?



##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)
             throws ExecutionException, InterruptedException, TimeoutException {
-        rpcEndpoint.closeAsync().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcEndpoint.closeAsync().get();
     }
 
     /**
      * Shuts the given {@link RpcEndpoint RpcEndpoints} down and waits for their termination.
      *
      * @param rpcEndpoints to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoints(Time timeout, RpcEndpoint... rpcEndpoints)
+    @VisibleForTesting
+    public static void terminateRpcEndpoints(RpcEndpoint... rpcEndpoints)
             throws InterruptedException, ExecutionException, TimeoutException {
-        terminateAsyncCloseables(Arrays.asList(rpcEndpoints), timeout);
+        terminateAsyncCloseables(Arrays.asList(rpcEndpoints));
     }
 
     /**
      * Shuts the given rpc service down and waits for its termination.
      *
      * @param rpcService to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcService(RpcService rpcService, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcService(RpcService rpcService)
             throws InterruptedException, ExecutionException, TimeoutException {
-        rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcService.stopService().get();
     }
 
     /**
      * Shuts the given rpc services down and waits for their termination.
      *
      * @param rpcServices to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred

Review Comment:
   ```suggestion
   ```
   This applies to the JavaDoc of all public methods that were touched in this class in this PR



##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)
             throws ExecutionException, InterruptedException, TimeoutException {
-        rpcEndpoint.closeAsync().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcEndpoint.closeAsync().get();
     }
 
     /**
      * Shuts the given {@link RpcEndpoint RpcEndpoints} down and waits for their termination.
      *
      * @param rpcEndpoints to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoints(Time timeout, RpcEndpoint... rpcEndpoints)
+    @VisibleForTesting
+    public static void terminateRpcEndpoints(RpcEndpoint... rpcEndpoints)
             throws InterruptedException, ExecutionException, TimeoutException {
-        terminateAsyncCloseables(Arrays.asList(rpcEndpoints), timeout);
+        terminateAsyncCloseables(Arrays.asList(rpcEndpoints));
     }
 
     /**
      * Shuts the given rpc service down and waits for its termination.
      *
      * @param rpcService to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcService(RpcService rpcService, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcService(RpcService rpcService)
             throws InterruptedException, ExecutionException, TimeoutException {
-        rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+        rpcService.stopService().get();
     }
 
     /**
      * Shuts the given rpc services down and waits for their termination.
      *
      * @param rpcServices to shut down
-     * @param timeout for this operation
      * @throws InterruptedException if the operation has been interrupted
      * @throws ExecutionException if a problem occurred
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcServices(Time timeout, RpcService... rpcServices)
+    @VisibleForTesting
+    public static void terminateRpcServices(RpcService... rpcServices)
             throws InterruptedException, ExecutionException, TimeoutException {
         terminateAsyncCloseables(
                 Arrays.stream(rpcServices)
                         .map(rpcService -> (AutoCloseableAsync) rpcService::stopService)
-                        .collect(Collectors.toList()),
-                timeout);
+                        .collect(Collectors.toList()));
     }
 
     private static void terminateAsyncCloseables(
-            Collection<? extends AutoCloseableAsync> closeables, Time timeout)
+            Collection<? extends AutoCloseableAsync> closeables)
             throws InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   ```suggestion
               throws InterruptedException, ExecutionException {
   ```
   This applies to all methods that were touched in this class in this PR



##########
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java:
##########
@@ -76,64 +76,63 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
      * Shuts the given {@link RpcEndpoint} down and awaits its termination.
      *
      * @param rpcEndpoint to terminate
-     * @param timeout for this operation
      * @throws ExecutionException if a problem occurred
      * @throws InterruptedException if the operation has been interrupted
      * @throws TimeoutException if a timeout occurred
      */
-    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout)
+    @VisibleForTesting
+    public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint)

Review Comment:
   Same here: I would argue that this method is obsolete because we can do the same with terminateRpcEndpoints(RpcEndpoint...). WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #19592: [FLINK-27427][rpc] Remove timeouts from RpcUtils#terminate*

Posted by GitBox <gi...@apache.org>.
zentol merged PR #19592:
URL: https://github.com/apache/flink/pull/19592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19592: [FLINK-27427][rpc] Remove timeouts from RpcUtils#terminate*

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19592:
URL: https://github.com/apache/flink/pull/19592#issuecomment-1110837442

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "76b2e6ebaa40ca16e610725a93657abc3bd171df",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "76b2e6ebaa40ca16e610725a93657abc3bd171df",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 76b2e6ebaa40ca16e610725a93657abc3bd171df UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org