You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 10:34:06 UTC
[flink] 01/03: [FLINK-10251][rpc] Handle oversized response
messages in AkkaRpcActor
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 63641cd07a8afc9dcfdab04cc6dff7e1265b076c
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 11:26:18 2018 +0800
[FLINK-10251][rpc] Handle oversized response messages in AkkaRpcActor
---
.../metrics/MetricRegistryConfiguration.java | 9 +-
.../flink/runtime/minicluster/MiniCluster.java | 24 +--
.../runtime/rpc/akka/AkkaInvocationHandler.java | 30 ++-
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 65 +++++-
.../flink/runtime/rpc/akka/AkkaRpcService.java | 60 +++---
.../rpc/akka/AkkaRpcServiceConfiguration.java | 70 +++++++
.../runtime/rpc/akka/AkkaRpcServiceUtils.java | 24 ++-
.../flink/runtime/rpc/akka/FencedAkkaRpcActor.java | 9 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 6 +-
.../jobmaster/slotpool/SlotPoolRpcTest.java | 3 +-
.../apache/flink/runtime/rpc/AsyncCallsTest.java | 3 +-
.../flink/runtime/rpc/RpcConnectionTest.java | 7 +-
.../apache/flink/runtime/rpc/RpcEndpointTest.java | 3 +-
.../apache/flink/runtime/rpc/RpcSSLAuthITCase.java | 9 +-
.../flink/runtime/rpc/TestingRpcService.java | 5 +-
.../rpc/akka/AkkaRpcActorHandshakeTest.java | 12 +-
.../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 117 ++++++++++-
.../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 5 +-
.../runtime/rpc/akka/MainThreadValidationTest.java | 3 +-
.../runtime/rpc/akka/MessageSerializationTest.java | 17 +-
.../flink/runtime/rpc/akka/SyncCallsTest.java | 227 +++++++++++++++++++++
21 files changed, 613 insertions(+), 95 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 244a1ed..c739170 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -19,16 +19,14 @@
package org.apache.flink.runtime.metrics;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.util.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -171,10 +169,7 @@ public class MetricRegistryConfiguration {
}
}
- final String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
- final String akkaConfigStr = String.format("akka {remote {netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
- final Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
- final long maximumFrameSize = akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
+ final long maximumFrameSize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
// padding to account for serialization overhead
final long messageSizeLimitPadding = 256;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d0b4219..f25c73c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -262,8 +263,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
// bring up all the RPC services
LOG.info("Starting RPC Service(s)");
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
// we always need the 'commonRpcService' for auxiliary calls
- commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+ commonRpcService = createRpcService(akkaRpcServiceConfig, false, null);
// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
@@ -290,12 +292,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
- jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
- resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
+ jobManagerRpcService = createRpcService(akkaRpcServiceConfig, true, jobManagerBindAddress);
+ resourceManagerRpcService = createRpcService(akkaRpcServiceConfig, true, resourceManagerBindAddress);
for (int i = 0; i < numTaskManagers; i++) {
- taskManagerRpcServices[i] = createRpcService(
- configuration, rpcTimeout, true, taskManagerBindAddress);
+ taskManagerRpcServices[i] = createRpcService(akkaRpcServiceConfig, true, taskManagerBindAddress);
}
this.jobManagerRpcService = jobManagerRpcService;
@@ -742,9 +743,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
/**
* Factory method to instantiate the RPC service.
*
- * @param configuration
- * The configuration of the mini cluster
- * @param askTimeout
+ * @param akkaRpcServiceConfig
* The default RPC timeout for asynchronous "ask" requests.
* @param remoteEnabled
* True, if the RPC service should be reachable from other (remote) RPC services.
@@ -754,24 +753,23 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
* @return The instantiated RPC service
*/
protected RpcService createRpcService(
- Configuration configuration,
- Time askTimeout,
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig,
boolean remoteEnabled,
String bindAddress) {
final Config akkaConfig;
if (remoteEnabled) {
- akkaConfig = AkkaUtils.getAkkaConfig(configuration, bindAddress, 0);
+ akkaConfig = AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration(), bindAddress, 0);
} else {
- akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+ akkaConfig = AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration());
}
final Config effectiveAkkaConfig = AkkaUtils.testDispatcherConfig().withFallback(akkaConfig);
final ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig);
- return new AkkaRpcService(actorSystem, askTimeout);
+ return new AkkaRpcService(actorSystem, akkaRpcServiceConfig);
}
protected ResourceManagerRunner startResourceManager(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index cc54f2e..90e126c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -27,12 +27,14 @@ import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.messages.Processing;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
@@ -48,6 +50,7 @@ import java.lang.reflect.Method;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -203,14 +206,29 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
tell(rpcInvocation);
result = null;
- } else if (Objects.equals(returnType, CompletableFuture.class)) {
- // execute an asynchronous call
- result = ask(rpcInvocation, futureTimeout);
} else {
- // execute a synchronous call
- CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);
+ // execute an asynchronous call
+ CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout);
+
+ CompletableFuture completableFuture = resultFuture.thenApply((Object o) -> {
+ if (o instanceof SerializedValue) {
+ try {
+ return ((SerializedValue) o).deserializeValue(getClass().getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new CompletionException(
+ new RpcException("Could not deserialize the serialized payload of RPC method : "
+ + methodName, e));
+ }
+ } else {
+ return o;
+ }
+ });
- result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
+ if (!Objects.equals(returnType, CompletableFuture.class)) {
+ result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
+ } else {
+ result = completableFuture;
+ }
}
return result;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 8471d7e..84d75fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -33,7 +33,9 @@ import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
+import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
import akka.actor.ActorRef;
import akka.actor.Status;
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -85,13 +88,22 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
private final int version;
+ private final long maximumFramesize;
+
private State state;
- AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture, final int version) {
+ AkkaRpcActor(
+ final T rpcEndpoint,
+ final CompletableFuture<Boolean> terminationFuture,
+ final int version,
+ final long maximumFramesize) {
+
+ checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
this.terminationFuture = checkNotNull(terminationFuture);
this.version = version;
+ this.maximumFramesize = maximumFramesize;
this.state = State.STOPPED;
}
@@ -254,6 +266,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
return;
}
+ boolean remoteSender = isRemoteSender();
+ final String methodName = rpcMethod.getName();
+
if (result instanceof CompletableFuture) {
final CompletableFuture<?> future = (CompletableFuture<?>) result;
Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>();
@@ -263,14 +278,33 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
if (throwable != null) {
promise.failure(throwable);
} else {
- promise.success(value);
+ if (!remoteSender) {
+ promise.success(value);
+ } else {
+ Either<SerializedValue, AkkaRpcException> serializedResult =
+ serializeRemoteResultAndVerifySize(value, methodName);
+ if (serializedResult.isLeft()) {
+ promise.success(serializedResult.left());
+ } else {
+ promise.failure(serializedResult.right());
+ }
+ }
}
});
Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
} else {
- // tell the sender the result of the computation
- getSender().tell(new Status.Success(result), getSelf());
+ if (!remoteSender) {
+ getSender().tell(result, getSelf());
+ } else {
+ Either<SerializedValue, AkkaRpcException> serializedResult =
+ serializeRemoteResultAndVerifySize(result, methodName);
+ if (serializedResult.isLeft()) {
+ getSender().tell(new Status.Success(serializedResult.left()), getSelf());
+ } else {
+ getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
+ }
+ }
}
}
} catch (Throwable e) {
@@ -281,6 +315,29 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
}
}
+ protected boolean isRemoteSender() {
+ return !getSender().path().address().hasLocalScope();
+ }
+
+ private Either<SerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(
+ Object result, String methodName) {
+ try {
+ SerializedValue serializedResult = new SerializedValue(result);
+
+ long resultSize = serializedResult.getByteArray().length;
+ if (resultSize > maximumFramesize) {
+ return Either.Right(new AkkaRpcException(
+ "The method " + methodName + "'s result size " + resultSize
+ + " exceeds the maximum size " + maximumFramesize + " ."));
+ } else {
+ return Either.Left(serializedResult);
+ }
+ } catch (IOException e) {
+ return Either.Right(new AkkaRpcException(
+ "Failed to serialize the result for RPC call : " + methodName + ".", e));
+ }
+ }
+
/**
* Handle asynchronous {@link Callable}. This method simply executes the given {@link Callable}
* in the context of the actor thread.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 519ac9e..57c4dee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rpc.akka;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -87,18 +86,14 @@ public class AkkaRpcService implements RpcService {
static final int VERSION = 1;
- static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
-
private final Object lock = new Object();
private final ActorSystem actorSystem;
- private final Time timeout;
+ private final AkkaRpcServiceConfiguration configuration;
@GuardedBy("lock")
private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);
- private final long maximumFramesize;
-
private final String address;
private final int port;
@@ -108,16 +103,9 @@ public class AkkaRpcService implements RpcService {
private volatile boolean stopped;
- public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
+ public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
this.actorSystem = checkNotNull(actorSystem, "actor system");
- this.timeout = checkNotNull(timeout, "timeout");
-
- if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
- maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
- } else {
- // only local communication
- maximumFramesize = Long.MAX_VALUE;
- }
+ this.configuration = checkNotNull(configuration, "akka rpc service configuration");
Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
@@ -174,8 +162,8 @@ public class AkkaRpcService implements RpcService {
addressHostname.f0,
addressHostname.f1,
actorRef,
- timeout,
- maximumFramesize,
+ configuration.getTimeout(),
+ configuration.getMaximumFramesize(),
null);
});
}
@@ -193,8 +181,8 @@ public class AkkaRpcService implements RpcService {
addressHostname.f0,
addressHostname.f1,
actorRef,
- timeout,
- maximumFramesize,
+ configuration.getTimeout(),
+ configuration.getMaximumFramesize(),
null,
() -> fencingToken);
});
@@ -208,9 +196,19 @@ public class AkkaRpcService implements RpcService {
final Props akkaRpcActorProps;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
- akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
+ akkaRpcActorProps = Props.create(
+ FencedAkkaRpcActor.class,
+ rpcEndpoint,
+ terminationFuture,
+ getVersion(),
+ configuration.getMaximumFramesize());
} else {
- akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
+ akkaRpcActorProps = Props.create(
+ getAkkaRpcActorClass(),
+ rpcEndpoint,
+ terminationFuture,
+ getVersion(),
+ configuration.getMaximumFramesize());
}
ActorRef actorRef;
@@ -245,8 +243,8 @@ public class AkkaRpcService implements RpcService {
akkaAddress,
hostname,
actorRef,
- timeout,
- maximumFramesize,
+ configuration.getTimeout(),
+ configuration.getMaximumFramesize(),
terminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
@@ -256,8 +254,8 @@ public class AkkaRpcService implements RpcService {
akkaAddress,
hostname,
actorRef,
- timeout,
- maximumFramesize,
+ configuration.getTimeout(),
+ configuration.getMaximumFramesize(),
terminationFuture);
}
@@ -283,8 +281,8 @@ public class AkkaRpcService implements RpcService {
rpcServer.getAddress(),
rpcServer.getHostname(),
((AkkaBasedEndpoint) rpcServer).getActorRef(),
- timeout,
- maximumFramesize,
+ configuration.getTimeout(),
+ configuration.getMaximumFramesize(),
null,
() -> fencingToken);
@@ -392,6 +390,10 @@ public class AkkaRpcService implements RpcService {
return FutureUtils.toJava(scalaFuture);
}
+ protected Class getAkkaRpcActorClass() {
+ return AkkaRpcActor.class;
+ }
+
// ---------------------------------------------------------------------------------------
// Private helper methods
// ---------------------------------------------------------------------------------------
@@ -421,7 +423,7 @@ public class AkkaRpcService implements RpcService {
final ActorSelection actorSel = actorSystem.actorSelection(address);
final Future<ActorIdentity> identify = Patterns
- .ask(actorSel, new Identify(42), timeout.toMilliseconds())
+ .ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
.<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));
final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
@@ -438,7 +440,7 @@ public class AkkaRpcService implements RpcService {
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
(ActorRef actorRef) -> FutureUtils.toJava(
Patterns
- .ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds())
+ .ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));
return actorRefFuture.thenCombineAsync(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
new file mode 100644
index 0000000..35f464b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Configuration object for {@link AkkaRpcService}.
+ */
+public class AkkaRpcServiceConfiguration {
+
+ private final Time timeout;
+ private final long maximumFramesize;
+ private final Configuration configuration;
+
+ public AkkaRpcServiceConfiguration(Time timeout, long maximumFramesize, Configuration configuration) {
+ checkNotNull(timeout);
+ checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
+ this.timeout = timeout;
+ this.maximumFramesize = maximumFramesize;
+ this.configuration = configuration;
+ }
+
+ public Time getTimeout() {
+ return timeout;
+ }
+
+ public long getMaximumFramesize() {
+ return maximumFramesize;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
+ FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+ Time timeout = Time.of(duration.length(), duration.unit());
+
+ long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
+
+ return new AkkaRpcServiceConfiguration(timeout, maximumFramesize, configuration);
+ }
+
+ public static AkkaRpcServiceConfiguration defaultConfiguration() {
+ return fromConfiguration(new Configuration());
+ }
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 43a52bb..746bc34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.rpc.akka;
-import org.apache.flink.api.common.time.Time;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
@@ -55,6 +55,12 @@ public class AkkaRpcServiceUtils {
private static final String AKKA_TCP = "akka.tcp";
private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
+ private static final String SIMPLE_AKKA_CONFIG_TEMPLATE =
+ "akka {remote {netty.tcp {maximum-frame-size = %s}}}";
+
+ private static final String MAXIMUM_FRAME_SIZE_PATH =
+ "akka.remote.netty.tcp.maximum-frame-size";
+
private static final AtomicLong nextNameOffset = new AtomicLong(0L);
// ------------------------------------------------------------------------
@@ -99,8 +105,7 @@ public class AkkaRpcServiceUtils {
@Nonnull
private static RpcService instantiateAkkaRpcService(Configuration configuration, ActorSystem actorSystem) {
- final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
- return new AkkaRpcService(actorSystem, timeout);
+ return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
}
// ------------------------------------------------------------------------
@@ -200,6 +205,17 @@ public class AkkaRpcServiceUtils {
}
// ------------------------------------------------------------------------
+ // RPC service configuration
+ // ------------------------------------------------------------------------
+
+ public static long extractMaximumFramesize(Configuration configuration) {
+ String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
+ String akkaConfigStr = String.format(SIMPLE_AKKA_CONFIG_TEMPLATE, maxFrameSizeStr);
+ Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
+ return akkaConfig.getBytes(MAXIMUM_FRAME_SIZE_PATH);
+ }
+
+ // ------------------------------------------------------------------------
/** This class is not meant to be instantiated. */
private AkkaRpcServiceUtils() {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 872effd..72c783b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -39,8 +39,13 @@ import java.util.concurrent.CompletableFuture;
*/
public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
- public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version) {
- super(rpcEndpoint, terminationFuture, version);
+ public FencedAkkaRpcActor(
+ T rpcEndpoint,
+ CompletableFuture<Boolean> terminationFuture,
+ int version,
+ final long maximumFramesize) {
+
+ super(rpcEndpoint, terminationFuture, version, maximumFramesize);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index de3f787..56d5044 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -96,6 +96,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
@@ -265,8 +266,9 @@ public class JobMasterTest extends TestLogger {
final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
- rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout);
- rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout);
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+ rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+ rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
final CompletableFuture<Throwable> declineCheckpointMessageFuture = new CompletableFuture<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index 4fb2ada..33104c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -85,7 +86,7 @@ public class SlotPoolRpcTest extends TestLogger {
@BeforeClass
public static void setup() {
ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
- rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+ rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
}
@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 1331bb7..1233306 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -61,7 +62,7 @@ public class AsyncCallsTest extends TestLogger {
private static final Time timeout = Time.seconds(10L);
private static final AkkaRpcService akkaRpcService =
- new AkkaRpcService(actorSystem, Time.milliseconds(10000L));
+ new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
@AfterClass
public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index cf3e651..e93de88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -20,11 +20,12 @@ package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.TestLogger;
@@ -59,7 +60,9 @@ public class RpcConnectionTest extends TestLogger {
// we start the RPC service with a very long timeout to ensure that the test
// can only pass if the connection problem is not recognized merely via a timeout
- rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
+ Configuration configuration = new Configuration();
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
+ rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 9aa4520..0081d58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.util.TestLogger;
import akka.actor.ActorSystem;
@@ -49,7 +50,7 @@ public class RpcEndpointTest extends TestLogger {
@BeforeClass
public static void setup() {
actorSystem = AkkaUtils.createDefaultActorSystem();
- rpcService = new AkkaRpcService(actorSystem, TIMEOUT);
+ rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
}
@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
index 2aa38fa..138cf98 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
@@ -18,13 +18,13 @@
package org.apache.flink.runtime.rpc;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.TestLogger;
@@ -91,8 +91,11 @@ public class RpcSSLAuthITCase extends TestLogger {
// we start the RPC service with a very long timeout to ensure that the test
// can only pass if the connection problem is not recognized merely via a timeout
- rpcService1 = new AkkaRpcService(actorSystem1, Time.of(10000000, TimeUnit.SECONDS));
- rpcService2 = new AkkaRpcService(actorSystem2, Time.of(10000000, TimeUnit.SECONDS));
+ Configuration configuration = new Configuration();
+ configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+ rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+ rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
TestEndpoint endpoint = new TestEndpoint(rpcService1);
endpoint.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index db70a0f..85c5707 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -18,11 +18,11 @@
package org.apache.flink.runtime.rpc;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
@@ -66,7 +66,8 @@ public class TestingRpcService extends AkkaRpcService {
* Creates a new {@code TestingRpcService}, using the given configuration.
*/
public TestingRpcService(Configuration configuration) {
- super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
+ super(AkkaUtils.createLocalActorSystem(configuration),
+ AkkaRpcServiceConfiguration.fromConfiguration(configuration));
this.registeredConnections = new ConcurrentHashMap<>();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
index ed7a3bd..88aa95b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
@@ -60,9 +60,11 @@ public class AkkaRpcActorHandshakeTest extends TestLogger {
final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
final ActorSystem wrongVersionActorSystem = AkkaUtils.createDefaultActorSystem();
- akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
- akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
- wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(wrongVersionActorSystem, timeout);
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.defaultConfiguration();
+ akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+ akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+ wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(
+ wrongVersionActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
}
@AfterClass
@@ -135,8 +137,8 @@ public class AkkaRpcActorHandshakeTest extends TestLogger {
private static class WrongVersionAkkaRpcService extends AkkaRpcService {
- WrongVersionAkkaRpcService(ActorSystem actorSystem, Time timeout) {
- super(actorSystem, timeout);
+ WrongVersionAkkaRpcService(ActorSystem actorSystem, AkkaRpcServiceConfiguration configuration) {
+ super(actorSystem, configuration);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index a32c1f6..fd62305 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -38,6 +40,9 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -144,6 +149,92 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.shutDown();
}
+ @Test
+ public void testOversizedResponseMsg() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
+ OversizedResponseRpcEndpoint rpcEndpoint = null;
+
+ ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+ ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+ AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
+ AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
+
+ try {
+ rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
+
+ rpcEndpoint.start();
+
+ CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(
+ rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
+ OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+ CompletableFuture<String> result = rpcGateway.calculate();
+
+ result.get(timeout.getSize(), timeout.getUnit());
+
+ fail("Expected an AkkaRpcException.");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof IOException);
+ } finally {
+ if (rpcEndpoint != null) {
+ rpcEndpoint.shutDown();
+ }
+
+ final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+ terminationFutures.add(rpcService1.stopService());
+ terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+ terminationFutures.add(rpcService2.stopService());
+ terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+ FutureUtils
+ .waitForAll(terminationFutures)
+ .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Test
+ public void testNonOversizedResponseMsg() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setString(AkkaOptions.FRAMESIZE, "1000 kB");
+ OversizedResponseRpcEndpoint rpcEndpoint = null;
+
+ ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+ ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+ AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+ AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+
+ try {
+ rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
+ rpcEndpoint.start();
+
+ CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
+ OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+ CompletableFuture<String> result = rpcGateway.calculate();
+
+ String actualTxt = result.get(timeout.getSize(), timeout.getUnit());
+
+ assertEquals("hello world", actualTxt);
+ } finally {
+ if (rpcEndpoint != null) {
+ rpcEndpoint.shutDown();
+ }
+
+ final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+ terminationFutures.add(rpcService1.stopService());
+ terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+ terminationFutures.add(rpcService2.stopService());
+ terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+ FutureUtils
+ .waitForAll(terminationFutures)
+ .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ }
+
/**
* Tests that we can wait for a RpcEndpoint to terminate.
*
@@ -248,7 +339,8 @@ public class AkkaRpcActorTest extends TestLogger {
@Test
public void testActorTerminationWhenServiceShutdown() throws Exception {
final ActorSystem rpcActorSystem = AkkaUtils.createDefaultActorSystem();
- final RpcService rpcService = new AkkaRpcService(rpcActorSystem, timeout);
+ final RpcService rpcService = new AkkaRpcService(
+ rpcActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
try {
SimpleRpcEndpoint rpcEndpoint = new SimpleRpcEndpoint(rpcService, SimpleRpcEndpoint.class.getSimpleName());
@@ -429,4 +521,27 @@ public class AkkaRpcActorTest extends TestLogger {
return postStopFuture;
}
}
+
+ // -------------------------------------------------------------------------
+
+ interface OversizedResponseMsgRpcGateway extends RpcGateway {
+ CompletableFuture<String> calculate();
+ }
+
+ static class OversizedResponseRpcEndpoint extends TestRpcEndpoint implements OversizedResponseMsgRpcGateway {
+
+ private volatile String txt;
+
+ public OversizedResponseRpcEndpoint(RpcService rpcService, String txt) {
+ super(rpcService);
+ this.txt = txt;
+ }
+
+ @Override
+ public CompletableFuture<String> calculate() {
+ return CompletableFuture.completedFuture(txt);
+ }
+
+ }
+
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index caf22f4..1b81331 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -56,7 +56,7 @@ public class AkkaRpcServiceTest extends TestLogger {
private static final Time TIMEOUT = Time.milliseconds(10000L);
- private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, TIMEOUT);
+ private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, AkkaRpcServiceConfiguration.defaultConfiguration());
@AfterClass
public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
@@ -136,7 +136,8 @@ public class AkkaRpcServiceTest extends TestLogger {
@Test(timeout = 60000)
public void testTerminationFuture() throws Exception {
final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
- final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
+ final AkkaRpcService rpcService = new AkkaRpcService(
+ actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
CompletableFuture<Void> terminationFuture = rpcService.getTerminationFuture();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 2ce2905..d190e9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rpc.akka;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -46,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger {
// actual test
AkkaRpcService akkaRpcService = new AkkaRpcService(
AkkaUtils.createDefaultActorSystem(),
- Time.milliseconds(10000));
+ AkkaRpcServiceConfiguration.defaultConfiguration());
try {
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 9a2a1fe..e60c675 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -27,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.TestLogger;
import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -60,14 +60,15 @@ public class MessageSerializationTest extends TestLogger {
@BeforeClass
public static void setup() {
- Config akkaConfig = AkkaUtils.getDefaultAkkaConfig();
- Config modifiedAkkaConfig = akkaConfig.withValue(AkkaRpcService.MAXIMUM_FRAME_SIZE_PATH, ConfigValueFactory.fromAnyRef(maxFrameSize + "b"));
+ Configuration configuration = new Configuration();
+ configuration.setString(AkkaOptions.FRAMESIZE, maxFrameSize + "b");
- actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
- actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
+ actorSystem1 = AkkaUtils.createDefaultActorSystem();
+ actorSystem2 = AkkaUtils.createDefaultActorSystem();
- akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
- akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+ akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+ akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
}
@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
new file mode 100644
index 0000000..523ad73
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * RPC sync invoke test.
+ */
+public class SyncCallsTest extends TestLogger {
+
+ // ------------------------------------------------------------------------
+ // shared test members
+ // ------------------------------------------------------------------------
+
+ private static final Time timeout = Time.seconds(10L);
+
+ private static ActorSystem actorSystem1;
+ private static ActorSystem actorSystem2;
+ private static AkkaRpcService akkaRpcService1;
+ private static AkkaRpcService akkaRpcService2;
+
+ @BeforeClass
+ public static void setup() {
+ Configuration configuration = new Configuration();
+
+ actorSystem1 = AkkaUtils.createDefaultActorSystem();
+ actorSystem2 = AkkaUtils.createDefaultActorSystem();
+
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+ akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+ akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+ }
+
+ @AfterClass
+ public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
+ final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+
+ terminationFutures.add(akkaRpcService1.stopService());
+ terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+ terminationFutures.add(akkaRpcService2.stopService());
+ terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+ FutureUtils
+ .waitForAll(terminationFutures)
+ .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testSimpleLocalSyncCall() throws Exception {
+ RpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
+ rpcEndpoint.start();
+
+ try {
+ DummyRpcGateway gateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
+
+ int actualResult = gateway.foobar();
+
+ assertEquals(1234, actualResult);
+ } finally {
+ rpcEndpoint.shutDown();
+ }
+
+ }
+
+ @Test
+ public void testSimpleRemoteSyncCall() throws Exception {
+ RpcEndpoint rpcEndpoint = null;
+
+ try {
+ rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
+ rpcEndpoint.start();
+
+ CompletableFuture<DummyRpcGateway> future = akkaRpcService2.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
+ DummyRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+ int actualResult = rpcGateway.foobar();
+
+ assertEquals(1234, actualResult);
+ } finally {
+ if (rpcEndpoint != null) {
+ rpcEndpoint.shutDown();
+ }
+ }
+ }
+
+ @Test
+ public void testSimpleRemoteSyncCallWithOversizedMsg() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
+ OversizedMsgRpcEndpoint rpcEndpoint = null;
+
+ ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+ ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+ AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+ AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
+ AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
+
+ try {
+ rpcEndpoint = new OversizedMsgRpcEndpoint(rpcService1, "hello world");
+
+ rpcEndpoint.start();
+
+ CompletableFuture<OversizedMsgRpcGateway> future = rpcService2.connect(
+ rpcEndpoint.getAddress(), OversizedMsgRpcGateway.class);
+ OversizedMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+ String result = rpcGateway.response();
+
+ fail("Expected an AkkaRpcException.");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof IOException);
+ } finally {
+ if (rpcEndpoint != null) {
+ rpcEndpoint.shutDown();
+ }
+
+ final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+ terminationFutures.add(rpcService1.stopService());
+ terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+ terminationFutures.add(rpcService2.stopService());
+ terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+ FutureUtils
+ .waitForAll(terminationFutures)
+ .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * A dummy rpc gateway.
+ */
+ public interface DummyRpcGateway extends RpcGateway {
+ int foobar();
+ }
+
+ /**
+ * A dummy rpc endpoint.
+ */
+ public static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
+
+ DummyRpcEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @Override
+ public int foobar() {
+ return 1234;
+ }
+
+ @Override
+ public CompletableFuture<Void> postStop() {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ /**
+ * Oversized message rpc gateway.
+ */
+ private interface OversizedMsgRpcGateway extends RpcGateway {
+ String response();
+ }
+
+ /**
+ * Oversized message rpc endpoint.
+ */
+ private static class OversizedMsgRpcEndpoint extends RpcEndpoint implements OversizedMsgRpcGateway {
+
+ private String txt;
+
+ public OversizedMsgRpcEndpoint(RpcService rpcService, String txt) {
+ super(rpcService);
+ this.txt = txt;
+ }
+
+ @Override
+ public CompletableFuture<Void> postStop() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public String response() {
+ return this.txt;
+ }
+ }
+
+}