You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 10:34:06 UTC

[flink] 01/03: [FLINK-10251][rpc] Handle oversized response messages in AkkaRpcActor

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63641cd07a8afc9dcfdab04cc6dff7e1265b076c
Author: yanghua <ya...@gmail.com>
AuthorDate: Thu Oct 18 11:26:18 2018 +0800

    [FLINK-10251][rpc] Handle oversized response messages in AkkaRpcActor
---
 .../metrics/MetricRegistryConfiguration.java       |   9 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  24 +--
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  30 ++-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  65 +++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  60 +++---
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  70 +++++++
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java      |  24 ++-
 .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java |   9 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   6 +-
 .../jobmaster/slotpool/SlotPoolRpcTest.java        |   3 +-
 .../apache/flink/runtime/rpc/AsyncCallsTest.java   |   3 +-
 .../flink/runtime/rpc/RpcConnectionTest.java       |   7 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  |   3 +-
 .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java |   9 +-
 .../flink/runtime/rpc/TestingRpcService.java       |   5 +-
 .../rpc/akka/AkkaRpcActorHandshakeTest.java        |  12 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 117 ++++++++++-
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java |   5 +-
 .../runtime/rpc/akka/MainThreadValidationTest.java |   3 +-
 .../runtime/rpc/akka/MessageSerializationTest.java |  17 +-
 .../flink/runtime/rpc/akka/SyncCallsTest.java      | 227 +++++++++++++++++++++
 21 files changed, 613 insertions(+), 95 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 244a1ed..c739170 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -19,16 +19,14 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.util.Preconditions;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -171,10 +169,7 @@ public class MetricRegistryConfiguration {
 			}
 		}
 
-		final String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
-		final String akkaConfigStr = String.format("akka {remote {netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr);
-		final Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
-		final long maximumFrameSize = akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size");
+		final long maximumFrameSize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
 
 		// padding to account for serialization overhead
 		final long messageSizeLimitPadding = 256;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d0b4219..f25c73c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -262,8 +263,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 				// bring up all the RPC services
 				LOG.info("Starting RPC Service(s)");
 
+				AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
 				// we always need the 'commonRpcService' for auxiliary calls
-				commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+				commonRpcService = createRpcService(akkaRpcServiceConfig, false, null);
 
 				// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
 				metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(
@@ -290,12 +292,11 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
 					final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();
 
-					jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
-					resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
+					jobManagerRpcService = createRpcService(akkaRpcServiceConfig, true, jobManagerBindAddress);
+					resourceManagerRpcService = createRpcService(akkaRpcServiceConfig, true, resourceManagerBindAddress);
 
 					for (int i = 0; i < numTaskManagers; i++) {
-						taskManagerRpcServices[i] = createRpcService(
-								configuration, rpcTimeout, true, taskManagerBindAddress);
+						taskManagerRpcServices[i] = createRpcService(akkaRpcServiceConfig, true, taskManagerBindAddress);
 					}
 
 					this.jobManagerRpcService = jobManagerRpcService;
@@ -742,9 +743,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	/**
 	 * Factory method to instantiate the RPC service.
 	 *
-	 * @param configuration
-	 *            The configuration of the mini cluster
-	 * @param askTimeout
+	 * @param akkaRpcServiceConfig
 	 *            The default RPC timeout for asynchronous "ask" requests.
 	 * @param remoteEnabled
 	 *            True, if the RPC service should be reachable from other (remote) RPC services.
@@ -754,24 +753,23 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 	 * @return The instantiated RPC service
 	 */
 	protected RpcService createRpcService(
-			Configuration configuration,
-			Time askTimeout,
+			AkkaRpcServiceConfiguration akkaRpcServiceConfig,
 			boolean remoteEnabled,
 			String bindAddress) {
 
 		final Config akkaConfig;
 
 		if (remoteEnabled) {
-			akkaConfig = AkkaUtils.getAkkaConfig(configuration, bindAddress, 0);
+			akkaConfig = AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration(), bindAddress, 0);
 		} else {
-			akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+			akkaConfig = AkkaUtils.getAkkaConfig(akkaRpcServiceConfig.getConfiguration());
 		}
 
 		final Config effectiveAkkaConfig = AkkaUtils.testDispatcherConfig().withFallback(akkaConfig);
 
 		final ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig);
 
-		return new AkkaRpcService(actorSystem, askTimeout);
+		return new AkkaRpcService(actorSystem, akkaRpcServiceConfig);
 	}
 
 	protected ResourceManagerRunner startResourceManager(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index cc54f2e..90e126c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -27,12 +27,14 @@ import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.akka.messages.Processing;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
 import org.apache.flink.runtime.rpc.messages.CallAsync;
 import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
@@ -48,6 +50,7 @@ import java.lang.reflect.Method;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -203,14 +206,29 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 			tell(rpcInvocation);
 
 			result = null;
-		} else if (Objects.equals(returnType, CompletableFuture.class)) {
-			// execute an asynchronous call
-			result = ask(rpcInvocation, futureTimeout);
 		} else {
-			// execute a synchronous call
-			CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);
+			// execute an asynchronous call
+			CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout);
+
+			CompletableFuture completableFuture = resultFuture.thenApply((Object o) -> {
+				if (o instanceof SerializedValue) {
+					try {
+						return  ((SerializedValue) o).deserializeValue(getClass().getClassLoader());
+					} catch (IOException | ClassNotFoundException e) {
+						throw new CompletionException(
+							new RpcException("Could not deserialize the serialized payload of RPC method : "
+								+ methodName, e));
+					}
+				} else {
+					return o;
+				}
+			});
 
-			result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
+			if (!Objects.equals(returnType, CompletableFuture.class)) {
+				result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
+			} else {
+				result = completableFuture;
+			}
 		}
 
 		return result;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 8471d7e..84d75fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -33,7 +33,9 @@ import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
@@ -52,6 +54,7 @@ import java.util.concurrent.TimeUnit;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -85,13 +88,22 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 
 	private final int version;
 
+	private final long maximumFramesize;
+
 	private State state;
 
-	AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture, final int version) {
+	AkkaRpcActor(
+			final T rpcEndpoint,
+			final CompletableFuture<Boolean> terminationFuture,
+			final int version,
+			final long maximumFramesize) {
+
+		checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
 		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
 		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
 		this.terminationFuture = checkNotNull(terminationFuture);
 		this.version = version;
+		this.maximumFramesize = maximumFramesize;
 		this.state = State.STOPPED;
 	}
 
@@ -254,6 +266,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 						return;
 					}
 
+					boolean remoteSender = isRemoteSender();
+					final String methodName = rpcMethod.getName();
+
 					if (result instanceof CompletableFuture) {
 						final CompletableFuture<?> future = (CompletableFuture<?>) result;
 						Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>();
@@ -263,14 +278,33 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 								if (throwable != null) {
 									promise.failure(throwable);
 								} else {
-									promise.success(value);
+									if (!remoteSender) {
+										promise.success(value);
+									} else {
+										Either<SerializedValue, AkkaRpcException> serializedResult =
+											serializeRemoteResultAndVerifySize(value, methodName);
+										if (serializedResult.isLeft()) {
+											promise.success(serializedResult.left());
+										} else {
+											promise.failure(serializedResult.right());
+										}
+									}
 								}
 							});
 
 						Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
 					} else {
-						// tell the sender the result of the computation
-						getSender().tell(new Status.Success(result), getSelf());
+						if (!remoteSender) {
+							getSender().tell(result, getSelf());
+						} else {
+							Either<SerializedValue, AkkaRpcException> serializedResult =
+								serializeRemoteResultAndVerifySize(result, methodName);
+							if (serializedResult.isLeft()) {
+								getSender().tell(new Status.Success(serializedResult.left()), getSelf());
+							} else {
+								getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
+							}
+						}
 					}
 				}
 			} catch (Throwable e) {
@@ -281,6 +315,29 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 		}
 	}
 
+	protected boolean isRemoteSender() {
+		return !getSender().path().address().hasLocalScope();
+	}
+
+	private Either<SerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(
+		Object result, String methodName) {
+		try {
+			SerializedValue serializedResult = new SerializedValue(result);
+
+			long resultSize = serializedResult.getByteArray().length;
+			if (resultSize > maximumFramesize) {
+				return Either.Right(new AkkaRpcException(
+					"The method " + methodName + "'s result size " + resultSize
+						+ " exceeds the maximum size " + maximumFramesize + " ."));
+			} else {
+				return Either.Left(serializedResult);
+			}
+		} catch (IOException e) {
+			return Either.Right(new AkkaRpcException(
+				"Failed to serialize the result for RPC call : " + methodName + ".", e));
+		}
+	}
+
 	/**
 	 * Handle asynchronous {@link Callable}. This method simply executes the given {@link Callable}
 	 * in the context of the actor thread.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 519ac9e..57c4dee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -87,18 +86,14 @@ public class AkkaRpcService implements RpcService {
 
 	static final int VERSION = 1;
 
-	static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
-
 	private final Object lock = new Object();
 
 	private final ActorSystem actorSystem;
-	private final Time timeout;
+	private final AkkaRpcServiceConfiguration configuration;
 
 	@GuardedBy("lock")
 	private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);
 
-	private final long maximumFramesize;
-
 	private final String address;
 	private final int port;
 
@@ -108,16 +103,9 @@ public class AkkaRpcService implements RpcService {
 
 	private volatile boolean stopped;
 
-	public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
+	public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
 		this.actorSystem = checkNotNull(actorSystem, "actor system");
-		this.timeout = checkNotNull(timeout, "timeout");
-
-		if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
-			maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
-		} else {
-			// only local communication
-			maximumFramesize = Long.MAX_VALUE;
-		}
+		this.configuration = checkNotNull(configuration, "akka rpc service configuration");
 
 		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
 
@@ -174,8 +162,8 @@ public class AkkaRpcService implements RpcService {
 					addressHostname.f0,
 					addressHostname.f1,
 					actorRef,
-					timeout,
-					maximumFramesize,
+					configuration.getTimeout(),
+					configuration.getMaximumFramesize(),
 					null);
 			});
 	}
@@ -193,8 +181,8 @@ public class AkkaRpcService implements RpcService {
 					addressHostname.f0,
 					addressHostname.f1,
 					actorRef,
-					timeout,
-					maximumFramesize,
+					configuration.getTimeout(),
+					configuration.getMaximumFramesize(),
 					null,
 					() -> fencingToken);
 			});
@@ -208,9 +196,19 @@ public class AkkaRpcService implements RpcService {
 		final Props akkaRpcActorProps;
 
 		if (rpcEndpoint instanceof FencedRpcEndpoint) {
-			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
+			akkaRpcActorProps = Props.create(
+				FencedAkkaRpcActor.class,
+				rpcEndpoint,
+				terminationFuture,
+				getVersion(),
+				configuration.getMaximumFramesize());
 		} else {
-			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
+			akkaRpcActorProps = Props.create(
+				getAkkaRpcActorClass(),
+				rpcEndpoint,
+				terminationFuture,
+				getVersion(),
+				configuration.getMaximumFramesize());
 		}
 
 		ActorRef actorRef;
@@ -245,8 +243,8 @@ public class AkkaRpcService implements RpcService {
 				akkaAddress,
 				hostname,
 				actorRef,
-				timeout,
-				maximumFramesize,
+				configuration.getTimeout(),
+				configuration.getMaximumFramesize(),
 				terminationFuture,
 				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
 
@@ -256,8 +254,8 @@ public class AkkaRpcService implements RpcService {
 				akkaAddress,
 				hostname,
 				actorRef,
-				timeout,
-				maximumFramesize,
+				configuration.getTimeout(),
+				configuration.getMaximumFramesize(),
 				terminationFuture);
 		}
 
@@ -283,8 +281,8 @@ public class AkkaRpcService implements RpcService {
 				rpcServer.getAddress(),
 				rpcServer.getHostname(),
 				((AkkaBasedEndpoint) rpcServer).getActorRef(),
-				timeout,
-				maximumFramesize,
+				configuration.getTimeout(),
+				configuration.getMaximumFramesize(),
 				null,
 				() -> fencingToken);
 
@@ -392,6 +390,10 @@ public class AkkaRpcService implements RpcService {
 		return FutureUtils.toJava(scalaFuture);
 	}
 
+	protected Class getAkkaRpcActorClass() {
+		return AkkaRpcActor.class;
+	}
+
 	// ---------------------------------------------------------------------------------------
 	// Private helper methods
 	// ---------------------------------------------------------------------------------------
@@ -421,7 +423,7 @@ public class AkkaRpcService implements RpcService {
 		final ActorSelection actorSel = actorSystem.actorSelection(address);
 
 		final Future<ActorIdentity> identify = Patterns
-			.ask(actorSel, new Identify(42), timeout.toMilliseconds())
+			.ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
 			.<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));
 
 		final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
@@ -438,7 +440,7 @@ public class AkkaRpcService implements RpcService {
 		final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
 			(ActorRef actorRef) -> FutureUtils.toJava(
 				Patterns
-					.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds())
+					.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
 					.<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));
 
 		return actorRefFuture.thenCombineAsync(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
new file mode 100644
index 0000000..35f464b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Configuration object for {@link AkkaRpcService}.
+ */
+public class AkkaRpcServiceConfiguration {
+
+	private final Time timeout;
+	private final long maximumFramesize;
+	private final Configuration configuration;
+
+	public AkkaRpcServiceConfiguration(Time timeout, long maximumFramesize, Configuration configuration) {
+		checkNotNull(timeout);
+		checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
+		this.timeout = timeout;
+		this.maximumFramesize = maximumFramesize;
+		this.configuration = configuration;
+	}
+
+	public Time getTimeout() {
+		return timeout;
+	}
+
+	public long getMaximumFramesize() {
+		return maximumFramesize;
+	}
+
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
+		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+		Time timeout = Time.of(duration.length(), duration.unit());
+
+		long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
+
+		return new AkkaRpcServiceConfiguration(timeout, maximumFramesize, configuration);
+	}
+
+	public static AkkaRpcServiceConfiguration defaultConfiguration() {
+		return fromConfiguration(new Configuration());
+	}
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 43a52bb..746bc34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import org.apache.flink.api.common.time.Time;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
@@ -55,6 +55,12 @@ public class AkkaRpcServiceUtils {
 	private static final String AKKA_TCP = "akka.tcp";
 	private static final String AKKA_SSL_TCP = "akka.ssl.tcp";
 
+	private static final String SIMPLE_AKKA_CONFIG_TEMPLATE =
+		"akka {remote {netty.tcp {maximum-frame-size = %s}}}";
+
+	private static final String MAXIMUM_FRAME_SIZE_PATH =
+		"akka.remote.netty.tcp.maximum-frame-size";
+
 	private static final AtomicLong nextNameOffset = new AtomicLong(0L);
 
 	// ------------------------------------------------------------------------
@@ -99,8 +105,7 @@ public class AkkaRpcServiceUtils {
 
 	@Nonnull
 	private static RpcService instantiateAkkaRpcService(Configuration configuration, ActorSystem actorSystem) {
-		final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
-		return new AkkaRpcService(actorSystem, timeout);
+		return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
 	}
 
 	// ------------------------------------------------------------------------
@@ -200,6 +205,17 @@ public class AkkaRpcServiceUtils {
 	}
 
 	// ------------------------------------------------------------------------
+	//  RPC service configuration
+	// ------------------------------------------------------------------------
+
+	public static long extractMaximumFramesize(Configuration configuration) {
+		String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
+		String akkaConfigStr = String.format(SIMPLE_AKKA_CONFIG_TEMPLATE, maxFrameSizeStr);
+		Config akkaConfig = ConfigFactory.parseString(akkaConfigStr);
+		return akkaConfig.getBytes(MAXIMUM_FRAME_SIZE_PATH);
+	}
+
+	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated. */
 	private AkkaRpcServiceUtils() {}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 872effd..72c783b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -39,8 +39,13 @@ import java.util.concurrent.CompletableFuture;
  */
 public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
 
-	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version) {
-		super(rpcEndpoint, terminationFuture, version);
+	public FencedAkkaRpcActor(
+		T rpcEndpoint,
+		CompletableFuture<Boolean> terminationFuture,
+		int version,
+		final long maximumFramesize) {
+
+		super(rpcEndpoint, terminationFuture, version, maximumFramesize);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index de3f787..56d5044 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -96,6 +96,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
@@ -265,8 +266,9 @@ public class JobMasterTest extends TestLogger {
 			final ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
 			final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
 
-			rpcService1 = new AkkaRpcService(actorSystem1, testingTimeout);
-			rpcService2 = new AkkaRpcService(actorSystem2, testingTimeout);
+			AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+			rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+			rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
 
 			final CompletableFuture<Throwable> declineCheckpointMessageFuture = new CompletableFuture<>();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index 4fb2ada..33104c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -85,7 +86,7 @@ public class SlotPoolRpcTest extends TestLogger {
 	@BeforeClass
 	public static void setup() {
 		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-		rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+		rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 	}
 
 	@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 1331bb7..1233306 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -61,7 +62,7 @@ public class AsyncCallsTest extends TestLogger {
 	private static final Time timeout = Time.seconds(10L);
 
 	private static final AkkaRpcService akkaRpcService =
-			new AkkaRpcService(actorSystem, Time.milliseconds(10000L));
+			new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 
 	@AfterClass
 	public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index cf3e651..e93de88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -20,11 +20,12 @@ package org.apache.flink.runtime.rpc;
 
 import akka.actor.ActorSystem;
 
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.TestLogger;
@@ -59,7 +60,9 @@ public class RpcConnectionTest extends TestLogger {
 
 			// we start the RPC service with a very long timeout to ensure that the test
 			// can only pass if the connection problem is not recognized merely via a timeout
-			rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
+			Configuration configuration = new Configuration();
+			configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
+			rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
 
 			CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 9aa4520..0081d58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
@@ -49,7 +50,7 @@ public class RpcEndpointTest extends TestLogger {
 	@BeforeClass
 	public static void setup() {
 		actorSystem = AkkaUtils.createDefaultActorSystem();
-		rpcService = new AkkaRpcService(actorSystem, TIMEOUT);
+		rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 	}
 
 	@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
index 2aa38fa..138cf98 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.TestLogger;
 
@@ -91,8 +91,11 @@ public class RpcSSLAuthITCase extends TestLogger {
 
 			// we start the RPC service with a very long timeout to ensure that the test
 			// can only pass if the connection problem is not recognized merely via a timeout
-			rpcService1 = new AkkaRpcService(actorSystem1, Time.of(10000000, TimeUnit.SECONDS));
-			rpcService2 = new AkkaRpcService(actorSystem2, Time.of(10000000, TimeUnit.SECONDS));
+			Configuration configuration = new Configuration();
+			configuration.setString(AkkaOptions.ASK_TIMEOUT, "10000000 s");
+			AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+			rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+			rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
 
 			TestEndpoint endpoint = new TestEndpoint(rpcService1);
 			endpoint.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index db70a0f..85c5707 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
@@ -66,7 +66,8 @@ public class TestingRpcService extends AkkaRpcService {
 	 * Creates a new {@code TestingRpcService}, using the given configuration. 
 	 */
 	public TestingRpcService(Configuration configuration) {
-		super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
+		super(AkkaUtils.createLocalActorSystem(configuration),
+			AkkaRpcServiceConfiguration.fromConfiguration(configuration));
 
 		this.registeredConnections = new ConcurrentHashMap<>();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
index ed7a3bd..88aa95b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorHandshakeTest.java
@@ -60,9 +60,11 @@ public class AkkaRpcActorHandshakeTest extends TestLogger {
 		final ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
 		final ActorSystem wrongVersionActorSystem = AkkaUtils.createDefaultActorSystem();
 
-		akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
-		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
-		wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(wrongVersionActorSystem, timeout);
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.defaultConfiguration();
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+		wrongVersionAkkaRpcService = new WrongVersionAkkaRpcService(
+			wrongVersionActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 	}
 
 	@AfterClass
@@ -135,8 +137,8 @@ public class AkkaRpcActorHandshakeTest extends TestLogger {
 
 	private static class WrongVersionAkkaRpcService extends AkkaRpcService {
 
-		WrongVersionAkkaRpcService(ActorSystem actorSystem, Time timeout) {
-			super(actorSystem, timeout);
+		WrongVersionAkkaRpcService(ActorSystem actorSystem, AkkaRpcServiceConfiguration configuration) {
+			super(actorSystem, configuration);
 		}
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index a32c1f6..fd62305 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -38,6 +40,9 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -144,6 +149,92 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.shutDown();
 	}
 
+	@Test
+	public void testOversizedResponseMsg() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
+		OversizedResponseRpcEndpoint rpcEndpoint = null;
+
+		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
+		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
+
+		try {
+			rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
+
+			rpcEndpoint.start();
+
+			CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(
+				rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
+			OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+			CompletableFuture<String> result = rpcGateway.calculate();
+
+			result.get(timeout.getSize(), timeout.getUnit());
+
+			fail("Expected an AkkaRpcException.");
+		} catch (Exception e) {
+			assertTrue(e.getCause() instanceof IOException);
+		} finally {
+			if (rpcEndpoint != null) {
+				rpcEndpoint.shutDown();
+			}
+
+			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+			terminationFutures.add(rpcService1.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+			terminationFutures.add(rpcService2.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+			FutureUtils
+				.waitForAll(terminationFutures)
+				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
+	}
+
+	@Test
+	public void testNonOversizedResponseMsg() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, "1000 kB");
+		OversizedResponseRpcEndpoint rpcEndpoint = null;
+
+		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+
+		try {
+			rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
+			rpcEndpoint.start();
+
+			CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
+			OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+			CompletableFuture<String> result = rpcGateway.calculate();
+
+			String actualTxt = result.get(timeout.getSize(), timeout.getUnit());
+
+			assertEquals("hello world", actualTxt);
+		} finally {
+			if (rpcEndpoint != null) {
+				rpcEndpoint.shutDown();
+			}
+
+			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+			terminationFutures.add(rpcService1.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+			terminationFutures.add(rpcService2.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+			FutureUtils
+				.waitForAll(terminationFutures)
+				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
+	}
+
 	/**
 	 * Tests that we can wait for a RpcEndpoint to terminate.
 	 *
@@ -248,7 +339,8 @@ public class AkkaRpcActorTest extends TestLogger {
 	@Test
 	public void testActorTerminationWhenServiceShutdown() throws Exception {
 		final ActorSystem rpcActorSystem = AkkaUtils.createDefaultActorSystem();
-		final RpcService rpcService = new AkkaRpcService(rpcActorSystem, timeout);
+		final RpcService rpcService = new AkkaRpcService(
+			rpcActorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 
 		try {
 			SimpleRpcEndpoint rpcEndpoint = new SimpleRpcEndpoint(rpcService, SimpleRpcEndpoint.class.getSimpleName());
@@ -429,4 +521,27 @@ public class AkkaRpcActorTest extends TestLogger {
 			return postStopFuture;
 		}
 	}
+
+	// -------------------------------------------------------------------------
+
+	interface OversizedResponseMsgRpcGateway extends RpcGateway {
+		CompletableFuture<String> calculate();
+	}
+
+	static class OversizedResponseRpcEndpoint extends TestRpcEndpoint implements OversizedResponseMsgRpcGateway {
+
+		private volatile String txt;
+
+		public OversizedResponseRpcEndpoint(RpcService rpcService, String txt) {
+			super(rpcService);
+			this.txt = txt;
+		}
+
+		@Override
+		public CompletableFuture<String> calculate() {
+			return CompletableFuture.completedFuture(txt);
+		}
+
+	}
+
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index caf22f4..1b81331 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -56,7 +56,7 @@ public class AkkaRpcServiceTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.milliseconds(10000L);
 
-	private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, TIMEOUT);
+	private static final AkkaRpcService AKKA_RPC_SERVICE = new AkkaRpcService(ACTOR_SYSTEM, AkkaRpcServiceConfiguration.defaultConfiguration());
 
 	@AfterClass
 	public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
@@ -136,7 +136,8 @@ public class AkkaRpcServiceTest extends TestLogger {
 	@Test(timeout = 60000)
 	public void testTerminationFuture() throws Exception {
 		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
-		final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
+		final AkkaRpcService rpcService = new AkkaRpcService(
+			actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
 
 		CompletableFuture<Void> terminationFuture = rpcService.getTerminationFuture();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 2ce2905..d190e9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -46,7 +45,7 @@ public class MainThreadValidationTest extends TestLogger {
 		// actual test
 		AkkaRpcService akkaRpcService = new AkkaRpcService(
 				AkkaUtils.createDefaultActorSystem(),
-				Time.milliseconds(10000));
+				AkkaRpcServiceConfiguration.defaultConfiguration());
 
 		try {
 			TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 9a2a1fe..e60c675 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -27,8 +29,6 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -60,14 +60,15 @@ public class MessageSerializationTest extends TestLogger {
 
 	@BeforeClass
 	public static void setup() {
-		Config akkaConfig = AkkaUtils.getDefaultAkkaConfig();
-		Config modifiedAkkaConfig = akkaConfig.withValue(AkkaRpcService.MAXIMUM_FRAME_SIZE_PATH, ConfigValueFactory.fromAnyRef(maxFrameSize + "b"));
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, maxFrameSize + "b");
 
-		actorSystem1 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
-		actorSystem2 = AkkaUtils.createActorSystem(modifiedAkkaConfig);
+		actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		actorSystem2 = AkkaUtils.createDefaultActorSystem();
 
-		akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout);
-		akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout);
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
 	}
 
 	@AfterClass
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
new file mode 100644
index 0000000..523ad73
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * RPC sync invoke test.
+ */
+public class SyncCallsTest extends TestLogger {
+
+	// ------------------------------------------------------------------------
+	//  shared test members
+	// ------------------------------------------------------------------------
+
+	private static final Time timeout = Time.seconds(10L);
+
+	private static ActorSystem actorSystem1;
+	private static ActorSystem actorSystem2;
+	private static AkkaRpcService akkaRpcService1;
+	private static AkkaRpcService akkaRpcService2;
+
+	@BeforeClass
+	public static void setup() {
+		Configuration configuration = new Configuration();
+
+		actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		actorSystem2 = AkkaUtils.createDefaultActorSystem();
+
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
+		akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
+	}
+
+	@AfterClass
+	public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
+		final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+
+		terminationFutures.add(akkaRpcService1.stopService());
+		terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+		terminationFutures.add(akkaRpcService2.stopService());
+		terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+		FutureUtils
+			.waitForAll(terminationFutures)
+			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
+	@Test
+	public void testSimpleLocalSyncCall() throws Exception {
+		RpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
+		rpcEndpoint.start();
+
+		try {
+			DummyRpcGateway gateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
+
+			int actualResult = gateway.foobar();
+
+			assertEquals(1234, actualResult);
+		} finally {
+			rpcEndpoint.shutDown();
+		}
+
+	}
+
+	@Test
+	public void testSimpleRemoteSyncCall() throws Exception {
+		RpcEndpoint rpcEndpoint = null;
+
+		try {
+			rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
+			rpcEndpoint.start();
+
+			CompletableFuture<DummyRpcGateway> future = akkaRpcService2.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
+			DummyRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+			int actualResult = rpcGateway.foobar();
+
+			assertEquals(1234, actualResult);
+		} finally {
+			if (rpcEndpoint != null) {
+				rpcEndpoint.shutDown();
+			}
+		}
+	}
+
+	@Test
+	public void testSimpleRemoteSyncCallWithOversizedMsg() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
+		OversizedMsgRpcEndpoint rpcEndpoint = null;
+
+		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
+		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
+		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
+		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
+
+		try {
+			rpcEndpoint = new OversizedMsgRpcEndpoint(rpcService1, "hello world");
+
+			rpcEndpoint.start();
+
+			CompletableFuture<OversizedMsgRpcGateway> future = rpcService2.connect(
+				rpcEndpoint.getAddress(), OversizedMsgRpcGateway.class);
+			OversizedMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
+
+			String result = rpcGateway.response();
+
+			fail("Expected an AkkaRpcException.");
+		} catch (Exception e) {
+			assertTrue(e.getCause() instanceof IOException);
+		} finally {
+			if (rpcEndpoint != null) {
+				rpcEndpoint.shutDown();
+			}
+
+			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
+			terminationFutures.add(rpcService1.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+			terminationFutures.add(rpcService2.stopService());
+			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
+
+			FutureUtils
+				.waitForAll(terminationFutures)
+				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
+	}
+
+	/**
+	 * A dummy rpc gateway.
+	 */
+	public interface DummyRpcGateway extends RpcGateway {
+		int foobar();
+	}
+
+	/**
+	 * A dummy rpc endpoint.
+	 */
+	public static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
+
+		DummyRpcEndpoint(RpcService rpcService) {
+			super(rpcService);
+		}
+
+		@Override
+		public int foobar() {
+			return 1234;
+		}
+
+		@Override
+		public CompletableFuture<Void> postStop() {
+			return CompletableFuture.completedFuture(null);
+		}
+	}
+
+	/**
+	 * Oversized message rpc gateway.
+	 */
+	private interface OversizedMsgRpcGateway extends RpcGateway {
+		String response();
+	}
+
+	/**
+	 * Oversized message rpc endpoint.
+	 */
+	private static class OversizedMsgRpcEndpoint extends RpcEndpoint implements OversizedMsgRpcGateway {
+
+		private String txt;
+
+		public OversizedMsgRpcEndpoint(RpcService rpcService, String txt) {
+			super(rpcService);
+			this.txt = txt;
+		}
+
+		@Override
+		public CompletableFuture<Void> postStop() {
+			return CompletableFuture.completedFuture(null);
+		}
+
+		@Override
+		public String response() {
+			return this.txt;
+		}
+	}
+
+}