You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 10:34:05 UTC

[flink] branch master updated (2104335 -> 35779f2)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2104335  [FLINK-11353][tests] Port JobManagerHAJobGraphRecoveryITCase to new code base
     new 63641cd  [FLINK-10251][rpc] Handle oversized response messages in AkkaRpcActor
     new 3667ea6  [FLINK-10251][rpc] Refactor handling of over sized messages
     new 35779f2  [hotfix] Fix checkstyle violations in AkkaRpcActorTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../metrics/MetricRegistryConfiguration.java       |   9 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  24 ++-
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  38 +++-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  64 ++++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  56 +++---
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  77 ++++++++
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      |  24 ++-
 .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java |   8 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   6 +-
 .../jobmaster/slotpool/SlotPoolRpcTest.java        |   3 +-
 .../apache/flink/runtime/rpc/AsyncCallsTest.java   |   3 +-
 .../flink/runtime/rpc/RpcConnectionTest.java       |   7 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  |   3 +-
 .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java |   9 +-
 .../flink/runtime/rpc/TestingRpcService.java       |   5 +-
 .../rpc/akka/AkkaRpcActorHandshakeTest.java        |  12 +-
 .../AkkaRpcActorOversizedResponseMessageTest.java  | 194 +++++++++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  32 ++--
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java |   5 +-
 .../runtime/rpc/akka/MainThreadValidationTest.java |   3 +-
 .../runtime/rpc/akka/MessageSerializationTest.java |  17 +-
 .../flink/runtime/rpc/akka/TestRpcEndpoint.java}   |  24 +--
 22 files changed, 496 insertions(+), 127 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
 copy flink-runtime/src/{main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java => test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java} (69%)


[flink] 01/03: [FLINK-10251][rpc] Handle oversized response messages in AkkaRpcActor

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63641cd07a8afc9dcfdab04cc6dff7e1265b076c
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 11:26:18 2018 +0800

    [FLINK-10251][rpc] Handle oversized response messages in AkkaRpcActor
---
 .../metrics/MetricRegistryConfiguration.java       |   9 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  24 +--
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  30 ++-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  65 +++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  60 +++---
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  70 +++++++
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      |  24 ++-
 .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java |   9 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   6 +-
 .../jobmaster/slotpool/SlotPoolRpcTest.java        |   3 +-
 .../apache/flink/runtime/rpc/AsyncCallsTest.java   |   3 +-
 .../flink/runtime/rpc/RpcConnectionTest.java       |   7 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  |   3 +-
 .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java |   9 +-
 .../flink/runtime/rpc/TestingRpcService.java       |   5 +-
 .../rpc/akka/AkkaRpcActorHandshakeTest.java        |  12 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 117 ++++++++++-
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java |   5 +-
 .../runtime/rpc/akka/MainThreadValidationTest.java |   3 +-
 .../runtime/rpc/akka/MessageSerializationTest.java |  17 +-
 .../flink/runtime/rpc/akka/SyncCallsTest.java      | 227 +++++++++++++++++++++
 21 files changed, 613 insertions(+), 95 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 244a1ed..c739170 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -19,16 +19,14 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.util.Preconditions;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -171,10 +169,7 @@ public class MetricRegistryConfiguration {
 			}
 		}
 
-		final String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
-		final String akkaConfigStr = String.format("akka {remote {netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
-		final Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
-		final long maximumFrameSize = akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
+		final long maximumFrameSize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
 
 		// padding to account for serialization overhead
 		final long messageSizeLimitPadding = 256;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d0b4219..f25c73c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -262,8 +263,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 				// bring up all the RPC services
 				LOG.info("Starting RPC Service(s)");
 
+				AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
 				// we always need the 'commonRpcService' for auxiliary calls
-				commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+				commonRpcService = createRpcService(akkaRpcServiceConfig, false, null);
 
 				// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
 				metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
@@ -290,12 +292,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
 					final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
 
-					jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
-					resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
+					jobManagerRpcService = createRpcService(akkaRpcServiceConfig, true, jobManagerBindAddress);
+					resourceManagerRpcService = createRpcService(akkaRpcServiceConfig, true, resourceManagerBindAddress);
 
 					for (int i = 0; i < numTaskManagers; i++) {
-						taskManagerRpcServices[i] = createRpcService(
-								configuration, rpcTimeout, true, taskManagerBindAddress);
+						taskManagerRpcServices[i] = createRpcService(akkaRpcServiceConfig, true, taskManagerBindAddress);
 					}
 
 					this.jobManagerRpcService = jobManagerRpcService;
@@ -742,9 +743,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	/**
 	 * Factory method to instantiate the RPC service.
 	 *
-	 * @param configuration
-	 *            The configuration of the mini cluster
-	 * @param askTimeout
+	 * @param akkaRpcServiceConfig
 	 *            The default RPC timeout for asynchronous "ask" requests.
 	 * @param remoteEnabled
 	 *            True, if the RPC service should be reachable from other (remote) RPC services.
@@ -754,24 +753,23 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	 * @return The instantiated RPC service
 	 */
 	protected RpcService createRpcService(
-			Configuration configuration,
-			Time askTimeout,
+			AkkaRpcServiceConfiguration akkaRpcServiceConfig,
 			boolean remoteEnabled,
 			String bindAddress) {
 
 		final Config akkaConfig;
 
 		if (remoteEnabled) {
-			akkaConfig = AkkaUtils.getAkkaConfig(configuration, bindAddress, 0);
+			akkaConfig = AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration(), bindAddress, 0);
 		} else {
-			akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+			akkaConfig = AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration());
 		}
 
 		final Config effectiveAkkaConfig = AkkaUtils.testDispatcherConfig().withFallback(akkaConfig);
 
 		final ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig);
 
-		return new AkkaRpcService(actorSystem, askTimeout);
+		return new AkkaRpcService(actorSystem, akkaRpcServiceConfig);
 	}
 
 	protected ResourceManagerRunner startResourceManager(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index cc54f2e..90e126c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -27,12 +27,14 @@ import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.akka.messages.Processing;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
 import org.apache.flink.runtime.rpc.messages.CallAsync;
 import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
@@ -48,6 +50,7 @@ import java.lang.reflect.Method;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -203,14 +206,29 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 			tell(rpcInvocation);
 
 			result = null;
-		} else if (Objects.equals(returnType, CompletableFuture.class)) {
-			// execute an asynchronous call
-			result = ask(rpcInvocation, futureTimeout);
 		} else {
-			// execute a synchronous call
-			CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);
+			// execute an asynchronous call
+			CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout);
+
+			CompletableFuture completableFuture = resultFuture.thenApply((Object o) -> {
+				if (o instanceof SerializedValue) {
+					try {
+						return  ((SerializedValue) o).deserializeValue(getClass().getClassLoader());
+					} catch (IOException | ClassNotFoundException e) {
+						throw new CompletionException(
+							new RpcException("Could not deserialize the serialized payload of RPC method : "
+								+ methodName, e));
+					}
+				} else {
+					return o;
+				}
+			});
 
-			result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
+			if (!Objects.equals(returnType, CompletableFuture.class)) {
+				result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
+			} else {
+				result = completableFuture;
+			}
 		}
 
 		return result;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 8471d7e..84d75fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -33,7 +33,9 @@ import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -85,13 +88,22 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 
 	private final int version;
 
+	private final long maximumFramesize;
+
 	private State state;
 
-	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture, final int version) {
+	AkkaRpcActor(
+			final T rpcEndpoint,
+			final CompletableFuture<Boolean> terminationFuture,
+			final int version,
+			final long maximumFramesize) {
+
+		checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
 		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
 		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
 		this.terminationFuture = checkNotNull(terminationFuture);
 		this.version = version;
+		this.maximumFramesize = maximumFramesize;
 		this.state = State.STOPPED;
 	}
 
@@ -254,6 +266,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 						return;
 					}
 
+					boolean remoteSender = isRemoteSender();
+					final String methodName = rpcMethod.getName();
+
 					if (result instanceof CompletableFuture) {
 						final CompletableFuture<?> future = (CompletableFuture<?>) result;
 						Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>();
@@ -263,14 +278,33 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 								if (throwable != null) {
 									promise.failure(throwable);
 								} else {
-									promise.success(value);
+									if (!remoteSender) {
+										promise.success(value);
+									} else {
+										Either<SerializedValue, AkkaRpcException> serializedResult =
+											serializeRemoteResultAndVerifySize(value, methodName);
+										if (serializedResult.isLeft()) {
+											promise.success(serializedResult.left());
+										} else {
+											promise.failure(serializedResult.right());
+										}
+									}
 								}
 							});
 
 						Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
 					} else {
-						// tell the sender the result of the computation
-						getSender().tell(new Status.Success(result), getSelf());
+						if (!remoteSender) {
+							getSender().tell(result, getSelf());
+						} else {
+							Either<SerializedValue, AkkaRpcException> serializedResult =
+								serializeRemoteResultAndVerifySize(result, methodName);
+							if (serializedResult.isLeft()) {
+								getSender().tell(new Status.Success(serializedResult.left()), getSelf());
+							} else {
+								getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
+							}
+						}
 					}
 				}
 			} catch (Throwable e) {
@@ -281,6 +315,29 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 		}
 	}
 
+	protected boolean isRemoteSender() {
+		return !getSender().path().address().hasLocalScope();
+	}
+
+	private Either<SerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(
+		Object result, String methodName) {
+		try {
+			SerializedValue serializedResult = new SerializedValue(result);
+
+			long resultSize = serializedResult.getByteArray().length;
+			if (resultSize > maximumFramesize) {
+				return Either.Right(new AkkaRpcException(
+					"The method " + methodName + "'s result size " + resultSize
+						+ " exceeds the maximum size " + maximumFramesize + " ."));
+			} else {
+				return Either.Left(serializedResult);
+			}
+		} catch (IOException e) {
+			return Either.Right(new AkkaRpcException(
+				"Failed to serialize the result for RPC call : " + methodName + ".", e));
+		}
+	}
+
 	/**
 	 * Handle asynchronous {@link Callable}. This method simply executes the given {@link Callable}
 	 * in the context of the actor thread.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 519ac9e..57c4dee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -87,18 +86,14 @@ public class AkkaRpcService implements RpcService {
 
 	static final int VERSION = 1;
 
-	static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
-
 	private final Object lock = new Object();
 
 	private final ActorSystem actorSystem;
-	private final Time timeout;
+	private final AkkaRpcServiceConfiguration configuration;
 
 	@GuardedBy("lock")
 	private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);
 
-	private final long maximumFramesize;
-
 	private final String address;
 	private final int port;
 
@@ -108,16 +103,9 @@ public class AkkaRpcService implements RpcService {
 
 	private volatile boolean stopped;
 
-	public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
+	public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
 		this.actorSystem = checkNotNull(actorSystem, "actor system");
-		this.timeout = checkNotNull(timeout, "timeout");
-
-		if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
-			maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
-		} else {
-			// only local communication
-			maximumFramesize = Long.MAX_VALUE;
-		}
+		this.configuration = checkNotNull(configuration, "akka rpc service configuration");
 
 		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
 
@@ -174,8 +162,8 @@ public class AkkaRpcService implements RpcService {
 					addressHostname.f0,
 					addressHostname.f1,
 					actorRef,
-					timeout,
-					maximumFramesize,
+					configuration.getTimeout(),
+					configuration.getMaximumFramesize(),
 					null);
 			});
 	}
@@ -193,8 +181,8 @@ public class AkkaRpcService implements RpcService {
 					addressHostname.f0,
 					addressHostname.f1,
 					actorRef,
-					timeout,
-					maximumFramesize,
+					configuration.getTimeout(),
+					configuration.getMaximumFramesize(),
 					null,
 					() -> fencingToken);
 			});
@@ -208,9 +196,19 @@ public class AkkaRpcService implements RpcService {
 		final Props akkaRpcActorProps;
 
 		if (rpcEndpoint instanceof FencedRpcEndpoint) {
-			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
+			akkaRpcActorProps = Props.create(
+				FencedAkkaRpcActor.class,
+				rpcEndpoint,
+				terminationFuture,
+				getVersion(),
+				configuration.getMaximumFramesize());
 		} else {
-			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
+			akkaRpcActorProps = Props.create(
+				getAkkaRpcActorClass(),
+				rpcEndpoint,
+				terminationFuture,
+				getVersion(),
+				configuration.getMaximumFramesize());
 		}
 
 		ActorRef actorRef;
@@ -245,8 +243,8 @@ public class AkkaRpcService implements RpcService {
 				akkaAddress,
 				hostname,
 				actorRef,
-				timeout,
-				maximumFramesize,
+				configuration.getTimeout(),
+				configuration.getMaximumFramesize(),
 				terminationFuture,
 				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
 
@@ -256,8 +254,8 @@ public class AkkaRpcService implements RpcService {
 				akkaAddress,
 				hostname,
 				actorRef,
-				timeout,
-				maximumFramesize,
+				configuration.getTimeout(),
+				configuration.getMaximumFramesize(),
 				terminationFuture);
 		}
 
@@ -283,8 +281,8 @@ public class AkkaRpcService implements RpcService {
 				rpcServer.getAddress(),
 				rpcServer.getHostname(),
 				((AkkaBasedEndpoint) rpcServer).getActorRef(),
-				timeout,
-				maximumFramesize,
+				configuration.getTimeout(),
+				configuration.getMaximumFramesize(),
 				null,
 				() -> fencingToken);
 
@@ -392,6 +390,10 @@ public class AkkaRpcService implements RpcService {
 		return FutureUtils.toJava(scalaFuture);
 	}
 
+	protected Class getAkkaRpcActorClass() {
+		return AkkaRpcActor.class;
+	}
+
 	// ---------------------------------------------------------------------------------------
 	// Private helper methods
 	// ---------------------------------------------------------------------------------------
@@ -421,7 +423,7 @@ public class AkkaRpcService implements RpcService {
 		final ActorSelection actorSel = actorSystem.actorSelection(address);
 
 		final Future<ActorIdentity> identify = Patterns
-			.ask(actorSel, new Identify(42), timeout.toMilliseconds())
+			.ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
 			.<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));
 
 		final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
@@ -438,7 +440,7 @@ public class AkkaRpcService implements RpcService {
 		final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
 			(ActorRef actorRef) -> FutureUtils.toJava(
 				Patterns
-					.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds())
+					.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
 					.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));
 
 		return actorRefFuture.thenCombineAsync(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
new file mode 100644
index 0000000..35f464b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Configuration object for {@link AkkaRpcService}.
+ */
+public class AkkaRpcServiceConfiguration {
+
+	private final Time timeout;
+	private final long maximumFramesize;
+	private final Configuration configuration;
+
+	public AkkaRpcServiceConfiguration(Time timeout, long maximumFramesize, Configuration configuration) {
+		checkNotNull(timeout);
+		checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
+		this.timeout = timeout;
+		this.maximumFramesize = maximumFramesize;
+		this.configuration = configuration;
+	}
+
+	public Time getTimeout() {
+		return timeout;
+	}
+
+	public long getMaximumFramesize() {
+		return maximumFramesize;
+	}
+
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
+		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+		Time timeout = Time.of(duration.length(), duration.unit());
+
+		long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
+
+		return new AkkaRpcServiceConfiguration(timeout, maximumFramesize, configuration);
+	}
+
+	public static AkkaRpcServiceConfiguration defaultConfiguration() {
+		return fromConfiguration(new Configuration());
+	}
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 43a52bb..746bc34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import org.apache.flink.api.common.time.Time;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
@@ -55,6 +55,12 @@ public class AkkaRpcServiceUtils {
 	private static final String AKKA_TCP = "akka.tcp";
 	private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
 
+	private static final String SIMPLE_AKKA_CONFIG_TEMPLATE =
+		"akka {remote {netty.tcp {maximum-frame-size = %s}}}";
+
+	private static final String MAXIMUM_FRAME_SIZE_PATH =
+		"akka.remote.netty.tcp.maximum-frame-size";
+
 	private static final AtomicLong nextNameOffset = new AtomicLong(0L);
 
 	// ------------------------------------------------------------------------
@@ -99,8 +105,7 @@ public class AkkaRpcServiceUtils {
 
 	@Nonnull
 	private static RpcService instantiateAkkaRpcService(Configuration configuration, ActorSystem actorSystem) {
-		final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
-		return new AkkaRpcService(actorSystem, timeout);
+		return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
 	}
 
 	// ------------------------------------------------------------------------
@@ -200,6 +205,17 @@ public class AkkaRpcServiceUtils {
 	}
 
 	// ------------------------------------------------------------------------
+	//  RPC service configuration
+	// ------------------------------------------------------------------------
+
+	public static long extractMaximumFramesize(Configuration configuration) {
+		String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
+		String akkaConfigStr = String.format(SIMPLE_AKKA_CONFIG_TEMPLATE, maxFrameSizeStr);
+		Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
+		return akkaConfig.getBytes(MAXIMUM_FRAME_SIZE_PATH);
+	}
+
+	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated. */
 	private AkkaRpcServiceUtils() {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 872effd..72c783b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -39,8 +39,13 @@ import java.util.concurrent.CompletableFuture;
  */
 public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
 
-	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version) {
-		super(rpcEndpoint, terminationFuture, version);
+	public FencedAkkaRpcActor(
+		T rpcEndpoint,
+		CompletableFuture<Boolean> terminationFuture,
+		int version,
+		final long maximumFramesize) {
+
+		super(rpcEndpoint, terminationFuture, version, maximumFramesize);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index de3f787..56d5044 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -96,6 +96,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
@@ -265,8 +266,9 @@ public class JobMasterTest extends TestLogger {
 			final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
 			final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
 
-			rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout);
-			rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout);
+			AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+			rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+			rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
 
 			final CompletableFuture<Throwable> declineCheckpointMessageFuture = new CompletableFuture<>();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index 4fb2ada..33104c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -85,7 +86,7 @@ public class SlotPoolRpcTest extends TestLogger {
 	@BeforeClass
 	public static void setup() {
 		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-		rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+		rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 	}
 
 	@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 1331bb7..1233306 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -61,7 +62,7 @@ public class AsyncCallsTest extends TestLogger {
 	private static final Time timeout = Time.seconds(10L);
 
 	private static final AkkaRpcService akkaRpcService =
-			new AkkaRpcService(actorSystem, Time.milliseconds(10000L));
+			new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 
 	@AfterClass
 	public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index cf3e651..e93de88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -20,11 +20,12 @@ package org.apache.flink.runtime.rpc;
 
 import akka.actor.ActorSystem;
 
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.TestLogger;
@@ -59,7 +60,9 @@ public class RpcConnectionTest extends TestLogger {
 
 			// we start the RPC service with a very long timeout to ensure that the test
 			// can only pass if the connection problem is not recognized merely via a timeout
-			rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
+			Configuration configuration = new Configuration();
+			configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
+			rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
 
 			CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 9aa4520..0081d58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
@@ -49,7 +50,7 @@ public class RpcEndpointTest extends TestLogger {
 	@BeforeClass
 	public static void setup() {
 		actorSystem = AkkaUtils.createDefaultActorSystem();
-		rpcService = new AkkaRpcService(actorSystem, TIMEOUT);
+		rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 	}
 
 	@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
index 2aa38fa..138cf98 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.TestLogger;
 
@@ -91,8 +91,11 @@ public class RpcSSLAuthITCase extends TestLogger {
 
 			// we start the RPC service with a very long timeout to ensure that the test
 			// can only pass if the connection problem is not recognized merely via a timeout
-			rpcService1 = new AkkaRpcService(actorSystem1, Time.of(10000000, TimeUnit.SECONDS));
-			rpcService2 = new AkkaRpcService(actorSystem2, Time.of(10000000, TimeUnit.SECONDS));
+			Configuration configuration = new Configuration();
+			configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
+			AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+			rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+			rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
 
 			TestEndpoint endpoint = new TestEndpoint(rpcService1);
 			endpoint.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index db70a0f..85c5707 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
@@ -66,7 +66,8 @@ public class TestingRpcService extends AkkaRpcService {
 	 * Creates a new {@code TestingRpcService}, using the given configuration. 
 	 */
 	public TestingRpcService(Configuration configuration) {
-		super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
+		super(AkkaUtils.createLocalActorSystem(configuration),
+			AkkaRpcServiceConfiguration.fromConfiguration(configuration));
 
 		this.registeredConnections = new ConcurrentHashMap<>();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
index ed7a3bd..88aa95b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
@@ -60,9 +60,11 @@ public class AkkaRpcActorHandshakeTest extends TestLogger {
 		final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
 		final ActorSystem wrongVersionActorSystem = AkkaUtils.createDefaultActorSystem();
 
-		akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
-		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
-		wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(wrongVersionActorSystem, timeout);
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.defaultConfiguration();
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+		wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(
+			wrongVersionActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 	}
 
 	@AfterClass
@@ -135,8 +137,8 @@ public class AkkaRpcActorHandshakeTest extends TestLogger {
 
 	private static class WrongVersionAkkaRpcService extends AkkaRpcService {
 
-		WrongVersionAkkaRpcService(ActorSystem actorSystem, Time timeout) {
-			super(actorSystem, timeout);
+		WrongVersionAkkaRpcService(ActorSystem actorSystem, AkkaRpcServiceConfiguration configuration) {
+			super(actorSystem, configuration);
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index a32c1f6..fd62305 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -38,6 +40,9 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -144,6 +149,92 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.shutDown();
 	}
 
+	@Test
+	public void testOversizedResponseMsg() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
+		OversizedResponseRpcEndpoint rpcEndpoint = null;
+
+		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
+		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
+
+		try {
+			rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
+
+			rpcEndpoint.start();
+
+			CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(
+				rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
+			OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+			CompletableFuture<String> result = rpcGateway.calculate();
+
+			result.get(timeout.getSize(), timeout.getUnit());
+
+			fail("Expected an AkkaRpcException.");
+		} catch (Exception e) {
+			assertTrue(e.getCause() instanceof IOException);
+		} finally {
+			if (rpcEndpoint != null) {
+				rpcEndpoint.shutDown();
+			}
+
+			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+			terminationFutures.add(rpcService1.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+			terminationFutures.add(rpcService2.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+			FutureUtils
+				.waitForAll(terminationFutures)
+				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
+	}
+
+	@Test
+	public void testNonOversizedResponseMsg() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, "1000 kB");
+		OversizedResponseRpcEndpoint rpcEndpoint = null;
+
+		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+
+		try {
+			rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
+			rpcEndpoint.start();
+
+			CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
+			OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+			CompletableFuture<String> result = rpcGateway.calculate();
+
+			String actualTxt = result.get(timeout.getSize(), timeout.getUnit());
+
+			assertEquals("hello world", actualTxt);
+		} finally {
+			if (rpcEndpoint != null) {
+				rpcEndpoint.shutDown();
+			}
+
+			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+			terminationFutures.add(rpcService1.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+			terminationFutures.add(rpcService2.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+			FutureUtils
+				.waitForAll(terminationFutures)
+				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
+	}
+
 	/**
 	 * Tests that we can wait for a RpcEndpoint to terminate.
 	 *
@@ -248,7 +339,8 @@ public class AkkaRpcActorTest extends TestLogger {
 	@Test
 	public void testActorTerminationWhenServiceShutdown() throws Exception {
 		final ActorSystem rpcActorSystem = AkkaUtils.createDefaultActorSystem();
-		final RpcService rpcService = new AkkaRpcService(rpcActorSystem, timeout);
+		final RpcService rpcService = new AkkaRpcService(
+			rpcActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 
 		try {
 			SimpleRpcEndpoint rpcEndpoint = new SimpleRpcEndpoint(rpcService, SimpleRpcEndpoint.class.getSimpleName());
@@ -429,4 +521,27 @@ public class AkkaRpcActorTest extends TestLogger {
 			return postStopFuture;
 		}
 	}
+
+	// -------------------------------------------------------------------------
+
+	interface OversizedResponseMsgRpcGateway extends RpcGateway {
+		CompletableFuture<String> calculate();
+	}
+
+	static class OversizedResponseRpcEndpoint extends TestRpcEndpoint implements OversizedResponseMsgRpcGateway {
+
+		private volatile String txt;
+
+		public OversizedResponseRpcEndpoint(RpcService rpcService, String txt) {
+			super(rpcService);
+			this.txt = txt;
+		}
+
+		@Override
+		public CompletableFuture<String> calculate() {
+			return CompletableFuture.completedFuture(txt);
+		}
+
+	}
+
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index caf22f4..1b81331 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -56,7 +56,7 @@ public class AkkaRpcServiceTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.milliseconds(10000L);
 
-	private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, TIMEOUT);
+	private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, AkkaRpcServiceConfiguration.defaultConfiguration());
 
 	@AfterClass
 	public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
@@ -136,7 +136,8 @@ public class AkkaRpcServiceTest extends TestLogger {
 	@Test(timeout = 60000)
 	public void testTerminationFuture() throws Exception {
 		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
-		final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
+		final AkkaRpcService rpcService = new AkkaRpcService(
+			actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 
 		CompletableFuture<Void> terminationFuture = rpcService.getTerminationFuture();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 2ce2905..d190e9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -46,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger {
 		// actual test
 		AkkaRpcService akkaRpcService = new AkkaRpcService(
 				AkkaUtils.createDefaultActorSystem(),
-				Time.milliseconds(10000));
+				AkkaRpcServiceConfiguration.defaultConfiguration());
 
 		try {
 			TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 9a2a1fe..e60c675 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -27,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -60,14 +60,15 @@ public class MessageSerializationTest extends TestLogger {
 
 	@BeforeClass
 	public static void setup() {
-		Config akkaConfig = AkkaUtils.getDefaultAkkaConfig();
-		Config modifiedAkkaConfig = akkaConfig.withValue(AkkaRpcService.MAXIMUM_FRAME_SIZE_PATH, ConfigValueFactory.fromAnyRef(maxFrameSize + "b"));
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, maxFrameSize + "b");
 
-		actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
-		actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
+		actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		actorSystem2 = AkkaUtils.createDefaultActorSystem();
 
-		akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
-		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
 	}
 
 	@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
new file mode 100644
index 0000000..523ad73
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * RPC sync invoke test.
+ */
+public class SyncCallsTest extends TestLogger {
+
+	// ------------------------------------------------------------------------
+	//  shared test members
+	// ------------------------------------------------------------------------
+
+	private static final Time timeout = Time.seconds(10L);
+
+	private static ActorSystem actorSystem1;
+	private static ActorSystem actorSystem2;
+	private static AkkaRpcService akkaRpcService1;
+	private static AkkaRpcService akkaRpcService2;
+
+	@BeforeClass
+	public static void setup() {
+		Configuration configuration = new Configuration();
+
+		actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		actorSystem2 = AkkaUtils.createDefaultActorSystem();
+
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+	}
+
+	@AfterClass
+	public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
+		final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+
+		terminationFutures.add(akkaRpcService1.stopService());
+		terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+		terminationFutures.add(akkaRpcService2.stopService());
+		terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+		FutureUtils
+			.waitForAll(terminationFutures)
+			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
+	@Test
+	public void testSimpleLocalSyncCall() throws Exception {
+		RpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
+		rpcEndpoint.start();
+
+		try {
+			DummyRpcGateway gateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
+
+			int actualResult = gateway.foobar();
+
+			assertEquals(1234, actualResult);
+		} finally {
+			rpcEndpoint.shutDown();
+		}
+
+	}
+
+	@Test
+	public void testSimpleRemoteSyncCall() throws Exception {
+		RpcEndpoint rpcEndpoint = null;
+
+		try {
+			rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
+			rpcEndpoint.start();
+
+			CompletableFuture<DummyRpcGateway> future = akkaRpcService2.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
+			DummyRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+			int actualResult = rpcGateway.foobar();
+
+			assertEquals(1234, actualResult);
+		} finally {
+			if (rpcEndpoint != null) {
+				rpcEndpoint.shutDown();
+			}
+		}
+	}
+
+	@Test
+	public void testSimpleRemoteSyncCallWithOversizedMsg() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
+		OversizedMsgRpcEndpoint rpcEndpoint = null;
+
+		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
+		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
+
+		try {
+			rpcEndpoint = new OversizedMsgRpcEndpoint(rpcService1, "hello world");
+
+			rpcEndpoint.start();
+
+			CompletableFuture<OversizedMsgRpcGateway> future = rpcService2.connect(
+				rpcEndpoint.getAddress(), OversizedMsgRpcGateway.class);
+			OversizedMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+			String result = rpcGateway.response();
+
+			fail("Expected an AkkaRpcException.");
+		} catch (Exception e) {
+			assertTrue(e.getCause() instanceof IOException);
+		} finally {
+			if (rpcEndpoint != null) {
+				rpcEndpoint.shutDown();
+			}
+
+			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+			terminationFutures.add(rpcService1.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+			terminationFutures.add(rpcService2.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+			FutureUtils
+				.waitForAll(terminationFutures)
+				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
+	}
+
+	/**
+	 * A dummy rpc gateway.
+	 */
+	public interface DummyRpcGateway extends RpcGateway {
+		int foobar();
+	}
+
+	/**
+	 * A dummy rpc endpoint.
+	 */
+	public static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
+
+		DummyRpcEndpoint(RpcService rpcService) {
+			super(rpcService);
+		}
+
+		@Override
+		public int foobar() {
+			return 1234;
+		}
+
+		@Override
+		public CompletableFuture<Void> postStop() {
+			return CompletableFuture.completedFuture(null);
+		}
+	}
+
+	/**
+	 * Oversized message rpc gateway.
+	 */
+	private interface OversizedMsgRpcGateway extends RpcGateway {
+		String response();
+	}
+
+	/**
+	 * Oversized message rpc endpoint.
+	 */
+	private static class OversizedMsgRpcEndpoint extends RpcEndpoint implements OversizedMsgRpcGateway {
+
+		private String txt;
+
+		public OversizedMsgRpcEndpoint(RpcService rpcService, String txt) {
+			super(rpcService);
+			this.txt = txt;
+		}
+
+		@Override
+		public CompletableFuture<Void> postStop() {
+			return CompletableFuture.completedFuture(null);
+		}
+
+		@Override
+		public String response() {
+			return this.txt;
+		}
+	}
+
+}


[flink] 03/03: [hotfix] Fix checkstyle violations in AkkaRpcActorTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35779f22b1921646d7a858156fcafeb8238974f3
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 24 09:00:07 2019 +0100

    [hotfix] Fix checkstyle violations in AkkaRpcActorTest
---
 .../org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java    | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 040e949..27eca97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -51,6 +51,9 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link AkkaRpcActor}.
+ */
 public class AkkaRpcActorTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
@@ -61,7 +64,6 @@ public class AkkaRpcActorTest extends TestLogger {
 
 	private static AkkaRpcService akkaRpcService;
 
-
 	@BeforeClass
 	public static void setup() {
 		akkaRpcService = new TestingRpcService();
@@ -304,7 +306,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 	static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway {
 
-		private volatile int _foobar = 42;
+		private volatile int foobar = 42;
 
 		protected DummyRpcEndpoint(RpcService rpcService) {
 			super(rpcService);
@@ -312,11 +314,11 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		@Override
 		public CompletableFuture<Integer> foobar() {
-			return CompletableFuture.completedFuture(_foobar);
+			return CompletableFuture.completedFuture(foobar);
 		}
 
 		public void setFoobar(int value) {
-			_foobar = value;
+			foobar = value;
 		}
 	}
 


[flink] 02/03: [FLINK-10251][rpc] Refactor handling of over sized messages

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3667ea605ec9002df225493f0a0b5a6840b6f28d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 24 08:48:50 2019 +0100

    [FLINK-10251][rpc] Refactor handling of over sized messages
    
    This commit slightly refactors and cleans up the handling of over sized messages.
    
    This closes #6876.
---
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  18 +-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  31 ++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |   6 +-
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  37 ++--
 .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java |   9 +-
 .../AkkaRpcActorOversizedResponseMessageTest.java  | 194 ++++++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 131 +-----------
 .../flink/runtime/rpc/akka/SyncCallsTest.java      | 227 ---------------------
 .../flink/runtime/rpc/akka/TestRpcEndpoint.java    |  36 ++++
 9 files changed, 287 insertions(+), 402 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 90e126c..e147cf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
@@ -51,6 +52,7 @@ import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -208,12 +210,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 			result = null;
 		} else {
 			// execute an asynchronous call
-			CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout);
+			CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
 
-			CompletableFuture completableFuture = resultFuture.thenApply((Object o) -> {
+			CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
 				if (o instanceof SerializedValue) {
 					try {
-						return  ((SerializedValue) o).deserializeValue(getClass().getClassLoader());
+						return  ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
 					} catch (IOException | ClassNotFoundException e) {
 						throw new CompletionException(
 							new RpcException("Could not deserialize the serialized payload of RPC method : "
@@ -224,10 +226,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 				}
 			});
 
-			if (!Objects.equals(returnType, CompletableFuture.class)) {
-				result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
-			} else {
+			if (Objects.equals(returnType, CompletableFuture.class)) {
 				result = completableFuture;
+			} else {
+				try {
+					result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
+				} catch (ExecutionException ee) {
+					throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee));
+				}
 			}
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 84d75fe..b17222e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -266,7 +266,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 						return;
 					}
 
-					boolean remoteSender = isRemoteSender();
+					final boolean isRemoteSender = isRemoteSender();
 					final String methodName = rpcMethod.getName();
 
 					if (result instanceof CompletableFuture) {
@@ -278,32 +278,32 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 								if (throwable != null) {
 									promise.failure(throwable);
 								} else {
-									if (!remoteSender) {
-										promise.success(value);
-									} else {
-										Either<SerializedValue, AkkaRpcException> serializedResult =
-											serializeRemoteResultAndVerifySize(value, methodName);
+									if (isRemoteSender) {
+										Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(value, methodName);
+
 										if (serializedResult.isLeft()) {
 											promise.success(serializedResult.left());
 										} else {
 											promise.failure(serializedResult.right());
 										}
+									} else {
+										promise.success(value);
 									}
 								}
 							});
 
 						Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
 					} else {
-						if (!remoteSender) {
-							getSender().tell(result, getSelf());
-						} else {
-							Either<SerializedValue, AkkaRpcException> serializedResult =
-								serializeRemoteResultAndVerifySize(result, methodName);
+						if (isRemoteSender) {
+							Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(result, methodName);
+
 							if (serializedResult.isLeft()) {
 								getSender().tell(new Status.Success(serializedResult.left()), getSelf());
 							} else {
 								getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
 							}
+						} else {
+							getSender().tell(new Status.Success(result), getSelf());
 						}
 					}
 				}
@@ -315,14 +315,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 		}
 	}
 
-	protected boolean isRemoteSender() {
+	private boolean isRemoteSender() {
 		return !getSender().path().address().hasLocalScope();
 	}
 
-	private Either<SerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(
-		Object result, String methodName) {
+	private Either<SerializedValue<?>, AkkaRpcException> serializeRemoteResultAndVerifySize(Object result, String methodName) {
 		try {
-			SerializedValue serializedResult = new SerializedValue(result);
+			SerializedValue<?> serializedResult = new SerializedValue<>(result);
 
 			long resultSize = serializedResult.getByteArray().length;
 			if (resultSize > maximumFramesize) {
@@ -334,7 +333,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 			}
 		} catch (IOException e) {
 			return Either.Right(new AkkaRpcException(
-				"Failed to serialize the result for RPC call : " + methodName + ".", e));
+				"Failed to serialize the result for RPC call : " + methodName + '.', e));
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 57c4dee..6548513 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -204,7 +204,7 @@ public class AkkaRpcService implements RpcService {
 				configuration.getMaximumFramesize());
 		} else {
 			akkaRpcActorProps = Props.create(
-				getAkkaRpcActorClass(),
+				AkkaRpcActor.class,
 				rpcEndpoint,
 				terminationFuture,
 				getVersion(),
@@ -390,10 +390,6 @@ public class AkkaRpcService implements RpcService {
 		return FutureUtils.toJava(scalaFuture);
 	}
 
-	protected Class getAkkaRpcActorClass() {
-		return AkkaRpcActor.class;
-	}
-
 	// ---------------------------------------------------------------------------------------
 	// Private helper methods
 	// ---------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
index 35f464b..41f4c71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -20,28 +20,39 @@ package org.apache.flink.runtime.rpc.akka;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+
+import javax.annotation.Nonnull;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Configuration object for {@link AkkaRpcService}.
+ * Configuration for the {@link AkkaRpcService}.
  */
 public class AkkaRpcServiceConfiguration {
 
+	@Nonnull
+	private final Configuration configuration;
+
+	@Nonnull
 	private final Time timeout;
+
 	private final long maximumFramesize;
-	private final Configuration configuration;
 
-	public AkkaRpcServiceConfiguration(Time timeout, long maximumFramesize, Configuration configuration) {
-		checkNotNull(timeout);
-		checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
+	public AkkaRpcServiceConfiguration(@Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize) {
+		checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
+		this.configuration = configuration;
 		this.timeout = timeout;
 		this.maximumFramesize = maximumFramesize;
-		this.configuration = configuration;
 	}
 
+	@Nonnull
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	@Nonnull
 	public Time getTimeout() {
 		return timeout;
 	}
@@ -50,17 +61,13 @@ public class AkkaRpcServiceConfiguration {
 		return maximumFramesize;
 	}
 
-	public Configuration getConfiguration() {
-		return configuration;
-	}
-
 	public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
-		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
-		Time timeout = Time.of(duration.length(), duration.unit());
+		final FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+		final Time timeout = Time.of(duration.length(), duration.unit());
 
-		long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
+		final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
 
-		return new AkkaRpcServiceConfiguration(timeout, maximumFramesize, configuration);
+		return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize);
 	}
 
 	public static AkkaRpcServiceConfiguration defaultConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 72c783b..64a2a05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -40,11 +40,10 @@ import java.util.concurrent.CompletableFuture;
 public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
 
 	public FencedAkkaRpcActor(
-		T rpcEndpoint,
-		CompletableFuture<Boolean> terminationFuture,
-		int version,
-		final long maximumFramesize) {
-
+			T rpcEndpoint,
+			CompletableFuture<Boolean> terminationFuture,
+			int version,
+			final long maximumFramesize) {
 		super(rpcEndpoint, terminationFuture, version, maximumFramesize);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
new file mode 100644
index 0000000..73de0a1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the over sized response message handling of the {@link AkkaRpcActor}.
+ */
+public class AkkaRpcActorOversizedResponseMessageTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+
+	private static final int FRAMESIZE = 32000;
+
+	private static final String OVERSIZED_PAYLOAD = new String(new byte[FRAMESIZE]);
+
+	private static final String PAYLOAD = "Hello";
+
+	private static RpcService rpcService1;
+
+	private static RpcService rpcService2;
+
+	@BeforeClass
+	public static void setupClass() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, FRAMESIZE + " b");
+
+		rpcService1 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
+		rpcService2 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
+	}
+
+	@AfterClass
+	public static void teardownClass() throws Exception {
+		RpcUtils.terminateRpcServices(TIMEOUT, rpcService1, rpcService2);
+	}
+
+	@Test
+	public void testOverSizedResponseMsgAsync() throws Exception {
+		try {
+			runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
+
+			fail("Expected the RPC to fail.");
+		} catch (ExecutionException e) {
+			assertThat(ExceptionUtils.findThrowable(e, AkkaRpcException.class).isPresent(), is(true));
+		}
+	}
+
+	@Test
+	public void testNormalSizedResponseMsgAsync() throws Exception {
+		final String message = runRemoteMessageResponseTest(PAYLOAD, this::requestMessageAsync);
+		assertThat(message, is(equalTo(PAYLOAD)));
+	}
+
+	@Test
+	public void testNormalSizedResponseMsgSync() throws Exception {
+		final String message = runRemoteMessageResponseTest(PAYLOAD, MessageRpcGateway::messageSync);
+		assertThat(message, is(equalTo(PAYLOAD)));
+	}
+
+	@Test
+	public void testOverSizedResponseMsgSync() throws Exception {
+		try {
+			runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
+
+			fail("Expected the RPC to fail.");
+		} catch (RpcException e) {
+			assertThat(ExceptionUtils.findThrowable(e, AkkaRpcException.class).isPresent(), is(true));
+		}
+	}
+
+	/**
+	 * Tests that we can send arbitrarily large objects when communicating locally with
+	 * the rpc endpoint.
+	 */
+	@Test
+	public void testLocalOverSizedResponseMsgSync() throws Exception {
+		final String message = runLocalMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
+		assertThat(message, is(equalTo(OVERSIZED_PAYLOAD)));
+	}
+
+	/**
+	 * Tests that we can send arbitrarily large objects when communicating locally with
+	 * the rpc endpoint.
+	 */
+	@Test
+	public void testLocalOverSizedResponseMsgAsync() throws Exception {
+		final String message = runLocalMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
+		assertThat(message, is(equalTo(OVERSIZED_PAYLOAD)));
+	}
+
+	private String requestMessageAsync(MessageRpcGateway messageRpcGateway) throws Exception {
+		CompletableFuture<String> messageFuture = messageRpcGateway.messageAsync();
+		return messageFuture.get();
+	}
+
+	private <T> T runRemoteMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
+		final MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
+
+		try {
+			rpcEndpoint.start();
+
+			MessageRpcGateway rpcGateway = rpcService2.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
+
+			return rpcCall.apply(rpcGateway);
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, TIMEOUT);
+		}
+	}
+
+	private <T> T runLocalMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
+		final MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
+
+		try {
+			rpcEndpoint.start();
+
+			MessageRpcGateway rpcGateway = rpcService1.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
+
+			return rpcCall.apply(rpcGateway);
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, TIMEOUT);
+		}
+	}
+
+	// -------------------------------------------------------------------------
+
+	interface MessageRpcGateway extends RpcGateway {
+		CompletableFuture<String> messageAsync();
+
+		String messageSync() throws RpcException;
+	}
+
+	static class MessageRpcEndpoint extends TestRpcEndpoint implements MessageRpcGateway {
+
+		@Nonnull
+		private final String message;
+
+		MessageRpcEndpoint(RpcService rpcService, @Nonnull String message) {
+			super(rpcService);
+			this.message = message;
+		}
+
+		@Override
+		public CompletableFuture<String> messageAsync() {
+			return CompletableFuture.completedFuture(message);
+		}
+
+		@Override
+		public String messageSync() throws RpcException {
+			return message;
+		}
+
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index fd62305..040e949 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,8 +19,6 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -40,22 +38,19 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import scala.concurrent.Await;
-
 public class AkkaRpcActorTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
@@ -149,99 +144,13 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.shutDown();
 	}
 
-	@Test
-	public void testOversizedResponseMsg() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
-		OversizedResponseRpcEndpoint rpcEndpoint = null;
-
-		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
-		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
-		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
-		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
-
-		try {
-			rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
-
-			rpcEndpoint.start();
-
-			CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(
-				rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
-			OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
-			CompletableFuture<String> result = rpcGateway.calculate();
-
-			result.get(timeout.getSize(), timeout.getUnit());
-
-			fail("Expected an AkkaRpcException.");
-		} catch (Exception e) {
-			assertTrue(e.getCause() instanceof IOException);
-		} finally {
-			if (rpcEndpoint != null) {
-				rpcEndpoint.shutDown();
-			}
-
-			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-			terminationFutures.add(rpcService1.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
-			terminationFutures.add(rpcService2.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
-			FutureUtils
-				.waitForAll(terminationFutures)
-				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-		}
-	}
-
-	@Test
-	public void testNonOversizedResponseMsg() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(AkkaOptions.FRAMESIZE, "1000 kB");
-		OversizedResponseRpcEndpoint rpcEndpoint = null;
-
-		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
-		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
-		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
-		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
-
-		try {
-			rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
-			rpcEndpoint.start();
-
-			CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
-			OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
-			CompletableFuture<String> result = rpcGateway.calculate();
-
-			String actualTxt = result.get(timeout.getSize(), timeout.getUnit());
-
-			assertEquals("hello world", actualTxt);
-		} finally {
-			if (rpcEndpoint != null) {
-				rpcEndpoint.shutDown();
-			}
-
-			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-			terminationFutures.add(rpcService1.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
-			terminationFutures.add(rpcService2.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
-			FutureUtils
-				.waitForAll(terminationFutures)
-				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-		}
-	}
-
 	/**
 	 * Tests that we can wait for a RpcEndpoint to terminate.
 	 *
 	 * @throws ExecutionException
 	 * @throws InterruptedException
 	 */
-	@Test(timeout=5000)
+	@Test(timeout = 5000)
 	public void testRpcEndpointTerminationFuture() throws Exception {
 		final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 		rpcEndpoint.start();
@@ -393,18 +302,6 @@ public class AkkaRpcActorTest extends TestLogger {
 		CompletableFuture<Integer> foobar();
 	}
 
-	private static class TestRpcEndpoint extends RpcEndpoint {
-
-		protected TestRpcEndpoint(RpcService rpcService) {
-			super(rpcService);
-		}
-
-		@Override
-		public CompletableFuture<Void> postStop() {
-			return CompletableFuture.completedFuture(null);
-		}
-	}
-
 	static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway {
 
 		private volatile int _foobar = 42;
@@ -522,26 +419,4 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 	}
 
-	// -------------------------------------------------------------------------
-
-	interface OversizedResponseMsgRpcGateway extends RpcGateway {
-		CompletableFuture<String> calculate();
-	}
-
-	static class OversizedResponseRpcEndpoint extends TestRpcEndpoint implements OversizedResponseMsgRpcGateway {
-
-		private volatile String txt;
-
-		public OversizedResponseRpcEndpoint(RpcService rpcService, String txt) {
-			super(rpcService);
-			this.txt = txt;
-		}
-
-		@Override
-		public CompletableFuture<String> calculate() {
-			return CompletableFuture.completedFuture(txt);
-		}
-
-	}
-
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
deleted file mode 100644
index 523ad73..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.ActorSystem;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * RPC sync invoke test.
- */
-public class SyncCallsTest extends TestLogger {
-
-	// ------------------------------------------------------------------------
-	//  shared test members
-	// ------------------------------------------------------------------------
-
-	private static final Time timeout = Time.seconds(10L);
-
-	private static ActorSystem actorSystem1;
-	private static ActorSystem actorSystem2;
-	private static AkkaRpcService akkaRpcService1;
-	private static AkkaRpcService akkaRpcService2;
-
-	@BeforeClass
-	public static void setup() {
-		Configuration configuration = new Configuration();
-
-		actorSystem1 = AkkaUtils.createDefaultActorSystem();
-		actorSystem2 = AkkaUtils.createDefaultActorSystem();
-
-		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-		akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
-		akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
-	}
-
-	@AfterClass
-	public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
-		final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-
-		terminationFutures.add(akkaRpcService1.stopService());
-		terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
-		terminationFutures.add(akkaRpcService2.stopService());
-		terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
-		FutureUtils
-			.waitForAll(terminationFutures)
-			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-	}
-
-	@Test
-	public void testSimpleLocalSyncCall() throws Exception {
-		RpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
-		rpcEndpoint.start();
-
-		try {
-			DummyRpcGateway gateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
-
-			int actualResult = gateway.foobar();
-
-			assertEquals(1234, actualResult);
-		} finally {
-			rpcEndpoint.shutDown();
-		}
-
-	}
-
-	@Test
-	public void testSimpleRemoteSyncCall() throws Exception {
-		RpcEndpoint rpcEndpoint = null;
-
-		try {
-			rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
-			rpcEndpoint.start();
-
-			CompletableFuture<DummyRpcGateway> future = akkaRpcService2.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
-			DummyRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
-			int actualResult = rpcGateway.foobar();
-
-			assertEquals(1234, actualResult);
-		} finally {
-			if (rpcEndpoint != null) {
-				rpcEndpoint.shutDown();
-			}
-		}
-	}
-
-	@Test
-	public void testSimpleRemoteSyncCallWithOversizedMsg() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
-		OversizedMsgRpcEndpoint rpcEndpoint = null;
-
-		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
-		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
-		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
-		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
-
-		try {
-			rpcEndpoint = new OversizedMsgRpcEndpoint(rpcService1, "hello world");
-
-			rpcEndpoint.start();
-
-			CompletableFuture<OversizedMsgRpcGateway> future = rpcService2.connect(
-				rpcEndpoint.getAddress(), OversizedMsgRpcGateway.class);
-			OversizedMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
-			String result = rpcGateway.response();
-
-			fail("Expected an AkkaRpcException.");
-		} catch (Exception e) {
-			assertTrue(e.getCause() instanceof IOException);
-		} finally {
-			if (rpcEndpoint != null) {
-				rpcEndpoint.shutDown();
-			}
-
-			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-			terminationFutures.add(rpcService1.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
-			terminationFutures.add(rpcService2.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
-			FutureUtils
-				.waitForAll(terminationFutures)
-				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-		}
-	}
-
-	/**
-	 * A dummy rpc gateway.
-	 */
-	public interface DummyRpcGateway extends RpcGateway {
-		int foobar();
-	}
-
-	/**
-	 * A dummy rpc endpoint.
-	 */
-	public static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
-
-		DummyRpcEndpoint(RpcService rpcService) {
-			super(rpcService);
-		}
-
-		@Override
-		public int foobar() {
-			return 1234;
-		}
-
-		@Override
-		public CompletableFuture<Void> postStop() {
-			return CompletableFuture.completedFuture(null);
-		}
-	}
-
-	/**
-	 * Oversized message rpc gateway.
-	 */
-	private interface OversizedMsgRpcGateway extends RpcGateway {
-		String response();
-	}
-
-	/**
-	 * Oversized message rpc endpoint.
-	 */
-	private static class OversizedMsgRpcEndpoint extends RpcEndpoint implements OversizedMsgRpcGateway {
-
-		private String txt;
-
-		public OversizedMsgRpcEndpoint(RpcService rpcService, String txt) {
-			super(rpcService);
-			this.txt = txt;
-		}
-
-		@Override
-		public CompletableFuture<Void> postStop() {
-			return CompletableFuture.completedFuture(null);
-		}
-
-		@Override
-		public String response() {
-			return this.txt;
-		}
-	}
-
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java
new file mode 100644
index 0000000..073dabe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import java.util.concurrent.CompletableFuture;
+
+class TestRpcEndpoint extends RpcEndpoint {
+
+	protected TestRpcEndpoint(RpcService rpcService) {
+		super(rpcService);
+	}
+
+	@Override
+	public CompletableFuture<Void> postStop() {
+		return CompletableFuture.completedFuture(null);
+	}
+}