You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/22 08:34:11 UTC

[flink] branch master updated: [FLINK-29377][rpc] Reusable timeout extraction

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b9e3dfe0ae7 [FLINK-29377][rpc] Reusable timeout extraction
b9e3dfe0ae7 is described below

commit b9e3dfe0ae78aaedf5cfa645c307cc3e52168db5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Sep 21 10:41:56 2022 +0200

    [FLINK-29377][rpc] Reusable timeout extraction
---
 .../runtime/rpc/akka/AkkaInvocationHandler.java    | 62 +--------------
 .../apache/flink/runtime/rpc/RpcGatewayUtils.java  | 87 ++++++++++++++++++++++
 2 files changed, 90 insertions(+), 59 deletions(-)

diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index ac78dbe28b8..a510f2d9409 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcGatewayUtils;
 import org.apache.flink.runtime.rpc.RpcServer;
-import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
 import org.apache.flink.runtime.rpc.exceptions.RpcException;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TimeUtils;
 
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
@@ -220,7 +218,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
         Class<?>[] parameterTypes = method.getParameterTypes();
         final boolean isLocalRpcInvocation = method.getAnnotation(Local.class) != null;
         Annotation[][] parameterAnnotations = method.getParameterAnnotations();
-        Duration futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+        Duration futureTimeout =
+                RpcGatewayUtils.extractRpcTimeout(parameterAnnotations, args, timeout);
 
         final RpcInvocation rpcInvocation =
                 createRpcInvocationMessage(
@@ -320,61 +319,6 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
     //  Helper methods
     // ------------------------------------------------------------------------
 
-    /**
-     * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
-     * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
-     * timeout is returned.
-     *
-     * @param parameterAnnotations Parameter annotations
-     * @param args Array of arguments
-     * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter
-     *     has been found
-     * @return Timeout extracted from the array of arguments or the default timeout
-     */
-    private static Duration extractRpcTimeout(
-            Annotation[][] parameterAnnotations, Object[] args, Duration defaultTimeout) {
-        if (args != null) {
-            Preconditions.checkArgument(parameterAnnotations.length == args.length);
-
-            for (int i = 0; i < parameterAnnotations.length; i++) {
-                if (isRpcTimeout(parameterAnnotations[i])) {
-                    if (args[i] instanceof Time) {
-                        return TimeUtils.toDuration((Time) args[i]);
-                    } else if (args[i] instanceof Duration) {
-                        return (Duration) args[i];
-                    } else {
-                        throw new RuntimeException(
-                                "The rpc timeout parameter must be of type "
-                                        + Time.class.getName()
-                                        + " or "
-                                        + Duration.class.getName()
-                                        + ". The type "
-                                        + args[i].getClass().getName()
-                                        + " is not supported.");
-                    }
-                }
-            }
-        }
-
-        return defaultTimeout;
-    }
-
-    /**
-     * Checks whether any of the annotations is of type {@link RpcTimeout}.
-     *
-     * @param annotations Array of annotations
-     * @return True if {@link RpcTimeout} was found; otherwise false
-     */
-    private static boolean isRpcTimeout(Annotation[] annotations) {
-        for (Annotation annotation : annotations) {
-            if (annotation.annotationType().equals(RpcTimeout.class)) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
     /**
      * Sends the message to the RPC endpoint.
      *
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcGatewayUtils.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcGatewayUtils.java
new file mode 100644
index 00000000000..3996fc980fe
--- /dev/null
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcGatewayUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TimeUtils;
+
+import java.lang.annotation.Annotation;
+import java.time.Duration;
+
+/** Utils for {@link RpcGateway} implementations. */
+public class RpcGatewayUtils {
+
+    /**
+     * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
+     * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
+     * timeout is returned.
+     *
+     * @param parameterAnnotations Parameter annotations
+     * @param args Array of arguments
+     * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter
+     *     has been found
+     * @return Timeout extracted from the array of arguments or the default timeout
+     */
+    public static Duration extractRpcTimeout(
+            Annotation[][] parameterAnnotations, Object[] args, Duration defaultTimeout) {
+        if (args != null) {
+            Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+            for (int i = 0; i < parameterAnnotations.length; i++) {
+                if (isRpcTimeout(parameterAnnotations[i])) {
+                    if (args[i] instanceof Time) {
+                        return TimeUtils.toDuration((Time) args[i]);
+                    } else if (args[i] instanceof Duration) {
+                        return (Duration) args[i];
+                    } else {
+                        throw new RuntimeException(
+                                "The rpc timeout parameter must be of type "
+                                        + Time.class.getName()
+                                        + " or "
+                                        + Duration.class.getName()
+                                        + ". The type "
+                                        + args[i].getClass().getName()
+                                        + " is not supported.");
+                    }
+                }
+            }
+        }
+
+        return defaultTimeout;
+    }
+
+    /**
+     * Checks whether any of the annotations is of type {@link RpcTimeout}.
+     *
+     * @param annotations Array of annotations
+     * @return True if {@link RpcTimeout} was found; otherwise false
+     */
+    private static boolean isRpcTimeout(Annotation[] annotations) {
+        for (Annotation annotation : annotations) {
+            if (annotation.annotationType().equals(RpcTimeout.class)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private RpcGatewayUtils() {}
+}