You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/22 08:34:11 UTC
[flink] branch master updated: [FLINK-29377][rpc] Reusable timeout extraction
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b9e3dfe0ae7 [FLINK-29377][rpc] Reusable timeout extraction
b9e3dfe0ae7 is described below
commit b9e3dfe0ae78aaedf5cfa645c307cc3e52168db5
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Sep 21 10:41:56 2022 +0200
[FLINK-29377][rpc] Reusable timeout extraction
---
.../runtime/rpc/akka/AkkaInvocationHandler.java | 62 +--------------
.../apache/flink/runtime/rpc/RpcGatewayUtils.java | 87 ++++++++++++++++++++++
2 files changed, 90 insertions(+), 59 deletions(-)
diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index ac78dbe28b8..a510f2d9409 100644
--- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -18,15 +18,14 @@
package org.apache.flink.runtime.rpc.akka;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcGatewayUtils;
import org.apache.flink.runtime.rpc.RpcServer;
-import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TimeUtils;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
@@ -220,7 +218,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
Class<?>[] parameterTypes = method.getParameterTypes();
final boolean isLocalRpcInvocation = method.getAnnotation(Local.class) != null;
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
- Duration futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+ Duration futureTimeout =
+ RpcGatewayUtils.extractRpcTimeout(parameterAnnotations, args, timeout);
final RpcInvocation rpcInvocation =
createRpcInvocationMessage(
@@ -320,61 +319,6 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
// Helper methods
// ------------------------------------------------------------------------
- /**
- * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
- * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
- * timeout is returned.
- *
- * @param parameterAnnotations Parameter annotations
- * @param args Array of arguments
- * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter
- * has been found
- * @return Timeout extracted from the array of arguments or the default timeout
- */
- private static Duration extractRpcTimeout(
- Annotation[][] parameterAnnotations, Object[] args, Duration defaultTimeout) {
- if (args != null) {
- Preconditions.checkArgument(parameterAnnotations.length == args.length);
-
- for (int i = 0; i < parameterAnnotations.length; i++) {
- if (isRpcTimeout(parameterAnnotations[i])) {
- if (args[i] instanceof Time) {
- return TimeUtils.toDuration((Time) args[i]);
- } else if (args[i] instanceof Duration) {
- return (Duration) args[i];
- } else {
- throw new RuntimeException(
- "The rpc timeout parameter must be of type "
- + Time.class.getName()
- + " or "
- + Duration.class.getName()
- + ". The type "
- + args[i].getClass().getName()
- + " is not supported.");
- }
- }
- }
- }
-
- return defaultTimeout;
- }
-
- /**
- * Checks whether any of the annotations is of type {@link RpcTimeout}.
- *
- * @param annotations Array of annotations
- * @return True if {@link RpcTimeout} was found; otherwise false
- */
- private static boolean isRpcTimeout(Annotation[] annotations) {
- for (Annotation annotation : annotations) {
- if (annotation.annotationType().equals(RpcTimeout.class)) {
- return true;
- }
- }
-
- return false;
- }
-
/**
* Sends the message to the RPC endpoint.
*
diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcGatewayUtils.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcGatewayUtils.java
new file mode 100644
index 00000000000..3996fc980fe
--- /dev/null
+++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcGatewayUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TimeUtils;
+
+import java.lang.annotation.Annotation;
+import java.time.Duration;
+
+/** Utils for {@link RpcGateway} implementations. */
+public class RpcGatewayUtils {
+
+ /**
+ * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
+ * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
+ * timeout is returned.
+ *
+ * @param parameterAnnotations Parameter annotations
+ * @param args Array of arguments
+ * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter
+ * has been found
+ * @return Timeout extracted from the array of arguments or the default timeout
+ */
+ public static Duration extractRpcTimeout(
+ Annotation[][] parameterAnnotations, Object[] args, Duration defaultTimeout) {
+ if (args != null) {
+ Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+ for (int i = 0; i < parameterAnnotations.length; i++) {
+ if (isRpcTimeout(parameterAnnotations[i])) {
+ if (args[i] instanceof Time) {
+ return TimeUtils.toDuration((Time) args[i]);
+ } else if (args[i] instanceof Duration) {
+ return (Duration) args[i];
+ } else {
+ throw new RuntimeException(
+ "The rpc timeout parameter must be of type "
+ + Time.class.getName()
+ + " or "
+ + Duration.class.getName()
+ + ". The type "
+ + args[i].getClass().getName()
+ + " is not supported.");
+ }
+ }
+ }
+ }
+
+ return defaultTimeout;
+ }
+
+ /**
+ * Checks whether any of the annotations is of type {@link RpcTimeout}.
+ *
+ * @param annotations Array of annotations
+ * @return True if {@link RpcTimeout} was found; otherwise false
+ */
+ private static boolean isRpcTimeout(Annotation[] annotations) {
+ for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(RpcTimeout.class)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private RpcGatewayUtils() {}
+}