You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 10:34:07 UTC

[flink] 02/03: [FLINK-10251][rpc] Refactor handling of over sized messages

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3667ea605ec9002df225493f0a0b5a6840b6f28d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 24 08:48:50 2019 +0100

    [FLINK-10251][rpc] Refactor handling of over sized messages
    
    This commit slightly refactors and cleans up the handling of over sized messages.
    
    This closes #6876.
---
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  18 +-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  31 ++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |   6 +-
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  37 ++--
 .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java |   9 +-
 .../AkkaRpcActorOversizedResponseMessageTest.java  | 194 ++++++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 131 +-----------
 .../flink/runtime/rpc/akka/SyncCallsTest.java      | 227 ---------------------
 .../flink/runtime/rpc/akka/TestRpcEndpoint.java    |  36 ++++
 9 files changed, 287 insertions(+), 402 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 90e126c..e147cf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.messages.RunAsync;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
@@ -51,6 +52,7 @@ import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -208,12 +210,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 			result = null;
 		} else {
 			// execute an asynchronous call
-			CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout);
+			CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
 
-			CompletableFuture completableFuture = resultFuture.thenApply((Object o) -> {
+			CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
 				if (o instanceof SerializedValue) {
 					try {
-						return  ((SerializedValue) o).deserializeValue(getClass().getClassLoader());
+						return  ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
 					} catch (IOException | ClassNotFoundException e) {
 						throw new CompletionException(
 							new RpcException("Could not deserialize the serialized payload of RPC method : "
@@ -224,10 +226,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 				}
 			});
 
-			if (!Objects.equals(returnType, CompletableFuture.class)) {
-				result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
-			} else {
+			if (Objects.equals(returnType, CompletableFuture.class)) {
 				result = completableFuture;
+			} else {
+				try {
+					result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
+				} catch (ExecutionException ee) {
+					throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee));
+				}
 			}
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 84d75fe..b17222e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -266,7 +266,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 						return;
 					}
 
-					boolean remoteSender = isRemoteSender();
+					final boolean isRemoteSender = isRemoteSender();
 					final String methodName = rpcMethod.getName();
 
 					if (result instanceof CompletableFuture) {
@@ -278,32 +278,32 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 								if (throwable != null) {
 									promise.failure(throwable);
 								} else {
-									if (!remoteSender) {
-										promise.success(value);
-									} else {
-										Either<SerializedValue, AkkaRpcException> serializedResult =
-											serializeRemoteResultAndVerifySize(value, methodName);
+									if (isRemoteSender) {
+										Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(value, methodName);
+
 										if (serializedResult.isLeft()) {
 											promise.success(serializedResult.left());
 										} else {
 											promise.failure(serializedResult.right());
 										}
+									} else {
+										promise.success(value);
 									}
 								}
 							});
 
 						Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
 					} else {
-						if (!remoteSender) {
-							getSender().tell(result, getSelf());
-						} else {
-							Either<SerializedValue, AkkaRpcException> serializedResult =
-								serializeRemoteResultAndVerifySize(result, methodName);
+						if (isRemoteSender) {
+							Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(result, methodName);
+
 							if (serializedResult.isLeft()) {
 								getSender().tell(new Status.Success(serializedResult.left()), getSelf());
 							} else {
 								getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
 							}
+						} else {
+							getSender().tell(new Status.Success(result), getSelf());
 						}
 					}
 				}
@@ -315,14 +315,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 		}
 	}
 
-	protected boolean isRemoteSender() {
+	private boolean isRemoteSender() {
 		return !getSender().path().address().hasLocalScope();
 	}
 
-	private Either<SerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(
-		Object result, String methodName) {
+	private Either<SerializedValue<?>, AkkaRpcException> serializeRemoteResultAndVerifySize(Object result, String methodName) {
 		try {
-			SerializedValue serializedResult = new SerializedValue(result);
+			SerializedValue<?> serializedResult = new SerializedValue<>(result);
 
 			long resultSize = serializedResult.getByteArray().length;
 			if (resultSize > maximumFramesize) {
@@ -334,7 +333,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 			}
 		} catch (IOException e) {
 			return Either.Right(new AkkaRpcException(
-				"Failed to serialize the result for RPC call : " + methodName + ".", e));
+				"Failed to serialize the result for RPC call : " + methodName + '.', e));
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 57c4dee..6548513 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -204,7 +204,7 @@ public class AkkaRpcService implements RpcService {
 				configuration.getMaximumFramesize());
 		} else {
 			akkaRpcActorProps = Props.create(
-				getAkkaRpcActorClass(),
+				AkkaRpcActor.class,
 				rpcEndpoint,
 				terminationFuture,
 				getVersion(),
@@ -390,10 +390,6 @@ public class AkkaRpcService implements RpcService {
 		return FutureUtils.toJava(scalaFuture);
 	}
 
-	protected Class getAkkaRpcActorClass() {
-		return AkkaRpcActor.class;
-	}
-
 	// ---------------------------------------------------------------------------------------
 	// Private helper methods
 	// ---------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
index 35f464b..41f4c71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -20,28 +20,39 @@ package org.apache.flink.runtime.rpc.akka;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+
+import javax.annotation.Nonnull;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Configuration object for {@link AkkaRpcService}.
+ * Configuration for the {@link AkkaRpcService}.
  */
 public class AkkaRpcServiceConfiguration {
 
+	@Nonnull
+	private final Configuration configuration;
+
+	@Nonnull
 	private final Time timeout;
+
 	private final long maximumFramesize;
-	private final Configuration configuration;
 
-	public AkkaRpcServiceConfiguration(Time timeout, long maximumFramesize, Configuration configuration) {
-		checkNotNull(timeout);
-		checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
+	public AkkaRpcServiceConfiguration(@Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize) {
+		checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
+		this.configuration = configuration;
 		this.timeout = timeout;
 		this.maximumFramesize = maximumFramesize;
-		this.configuration = configuration;
 	}
 
+	@Nonnull
+	public Configuration getConfiguration() {
+		return configuration;
+	}
+
+	@Nonnull
 	public Time getTimeout() {
 		return timeout;
 	}
@@ -50,17 +61,13 @@ public class AkkaRpcServiceConfiguration {
 		return maximumFramesize;
 	}
 
-	public Configuration getConfiguration() {
-		return configuration;
-	}
-
 	public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
-		FiniteDuration duration = AkkaUtils.getTimeout(configuration);
-		Time timeout = Time.of(duration.length(), duration.unit());
+		final FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+		final Time timeout = Time.of(duration.length(), duration.unit());
 
-		long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
+		final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
 
-		return new AkkaRpcServiceConfiguration(timeout, maximumFramesize, configuration);
+		return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize);
 	}
 
 	public static AkkaRpcServiceConfiguration defaultConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 72c783b..64a2a05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -40,11 +40,10 @@ import java.util.concurrent.CompletableFuture;
 public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
 
 	public FencedAkkaRpcActor(
-		T rpcEndpoint,
-		CompletableFuture<Boolean> terminationFuture,
-		int version,
-		final long maximumFramesize) {
-
+			T rpcEndpoint,
+			CompletableFuture<Boolean> terminationFuture,
+			int version,
+			final long maximumFramesize) {
 		super(rpcEndpoint, terminationFuture, version, maximumFramesize);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
new file mode 100644
index 0000000..73de0a1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the over sized response message handling of the {@link AkkaRpcActor}.
+ */
+public class AkkaRpcActorOversizedResponseMessageTest extends TestLogger {
+
+	private static final Time TIMEOUT = Time.seconds(10L);
+
+	private static final int FRAMESIZE = 32000;
+
+	private static final String OVERSIZED_PAYLOAD = new String(new byte[FRAMESIZE]);
+
+	private static final String PAYLOAD = "Hello";
+
+	private static RpcService rpcService1;
+
+	private static RpcService rpcService2;
+
+	@BeforeClass
+	public static void setupClass() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(AkkaOptions.FRAMESIZE, FRAMESIZE + " b");
+
+		rpcService1 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
+		rpcService2 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
+	}
+
+	@AfterClass
+	public static void teardownClass() throws Exception {
+		RpcUtils.terminateRpcServices(TIMEOUT, rpcService1, rpcService2);
+	}
+
+	@Test
+	public void testOverSizedResponseMsgAsync() throws Exception {
+		try {
+			runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
+
+			fail("Expected the RPC to fail.");
+		} catch (ExecutionException e) {
+			assertThat(ExceptionUtils.findThrowable(e, AkkaRpcException.class).isPresent(), is(true));
+		}
+	}
+
+	@Test
+	public void testNormalSizedResponseMsgAsync() throws Exception {
+		final String message = runRemoteMessageResponseTest(PAYLOAD, this::requestMessageAsync);
+		assertThat(message, is(equalTo(PAYLOAD)));
+	}
+
+	@Test
+	public void testNormalSizedResponseMsgSync() throws Exception {
+		final String message = runRemoteMessageResponseTest(PAYLOAD, MessageRpcGateway::messageSync);
+		assertThat(message, is(equalTo(PAYLOAD)));
+	}
+
+	@Test
+	public void testOverSizedResponseMsgSync() throws Exception {
+		try {
+			runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
+
+			fail("Expected the RPC to fail.");
+		} catch (RpcException e) {
+			assertThat(ExceptionUtils.findThrowable(e, AkkaRpcException.class).isPresent(), is(true));
+		}
+	}
+
+	/**
+	 * Tests that we can send arbitrarily large objects when communicating locally with
+	 * the rpc endpoint.
+	 */
+	@Test
+	public void testLocalOverSizedResponseMsgSync() throws Exception {
+		final String message = runLocalMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
+		assertThat(message, is(equalTo(OVERSIZED_PAYLOAD)));
+	}
+
+	/**
+	 * Tests that we can send arbitrarily large objects when communicating locally with
+	 * the rpc endpoint.
+	 */
+	@Test
+	public void testLocalOverSizedResponseMsgAsync() throws Exception {
+		final String message = runLocalMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
+		assertThat(message, is(equalTo(OVERSIZED_PAYLOAD)));
+	}
+
+	private String requestMessageAsync(MessageRpcGateway messageRpcGateway) throws Exception {
+		CompletableFuture<String> messageFuture = messageRpcGateway.messageAsync();
+		return messageFuture.get();
+	}
+
+	private <T> T runRemoteMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
+		final MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
+
+		try {
+			rpcEndpoint.start();
+
+			MessageRpcGateway rpcGateway = rpcService2.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
+
+			return rpcCall.apply(rpcGateway);
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, TIMEOUT);
+		}
+	}
+
+	private <T> T runLocalMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
+		final MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
+
+		try {
+			rpcEndpoint.start();
+
+			MessageRpcGateway rpcGateway = rpcService1.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
+
+			return rpcCall.apply(rpcGateway);
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, TIMEOUT);
+		}
+	}
+
+	// -------------------------------------------------------------------------
+
+	interface MessageRpcGateway extends RpcGateway {
+		CompletableFuture<String> messageAsync();
+
+		String messageSync() throws RpcException;
+	}
+
+	static class MessageRpcEndpoint extends TestRpcEndpoint implements MessageRpcGateway {
+
+		@Nonnull
+		private final String message;
+
+		MessageRpcEndpoint(RpcService rpcService, @Nonnull String message) {
+			super(rpcService);
+			this.message = message;
+		}
+
+		@Override
+		public CompletableFuture<String> messageAsync() {
+			return CompletableFuture.completedFuture(message);
+		}
+
+		@Override
+		public String messageSync() throws RpcException {
+			return message;
+		}
+
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index fd62305..040e949 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,8 +19,6 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -40,22 +38,19 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import scala.concurrent.Await;
-
 public class AkkaRpcActorTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
@@ -149,99 +144,13 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.shutDown();
 	}
 
-	@Test
-	public void testOversizedResponseMsg() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
-		OversizedResponseRpcEndpoint rpcEndpoint = null;
-
-		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
-		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
-		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
-		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
-
-		try {
-			rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
-
-			rpcEndpoint.start();
-
-			CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(
-				rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
-			OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
-			CompletableFuture<String> result = rpcGateway.calculate();
-
-			result.get(timeout.getSize(), timeout.getUnit());
-
-			fail("Expected an AkkaRpcException.");
-		} catch (Exception e) {
-			assertTrue(e.getCause() instanceof IOException);
-		} finally {
-			if (rpcEndpoint != null) {
-				rpcEndpoint.shutDown();
-			}
-
-			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-			terminationFutures.add(rpcService1.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
-			terminationFutures.add(rpcService2.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
-			FutureUtils
-				.waitForAll(terminationFutures)
-				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-		}
-	}
-
-	@Test
-	public void testNonOversizedResponseMsg() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(AkkaOptions.FRAMESIZE, "1000 kB");
-		OversizedResponseRpcEndpoint rpcEndpoint = null;
-
-		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
-		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
-		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
-		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
-
-		try {
-			rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
-			rpcEndpoint.start();
-
-			CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
-			OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
-			CompletableFuture<String> result = rpcGateway.calculate();
-
-			String actualTxt = result.get(timeout.getSize(), timeout.getUnit());
-
-			assertEquals("hello world", actualTxt);
-		} finally {
-			if (rpcEndpoint != null) {
-				rpcEndpoint.shutDown();
-			}
-
-			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-			terminationFutures.add(rpcService1.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
-			terminationFutures.add(rpcService2.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
-			FutureUtils
-				.waitForAll(terminationFutures)
-				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-		}
-	}
-
 	/**
 	 * Tests that we can wait for a RpcEndpoint to terminate.
 	 *
 	 * @throws ExecutionException
 	 * @throws InterruptedException
 	 */
-	@Test(timeout=5000)
+	@Test(timeout = 5000)
 	public void testRpcEndpointTerminationFuture() throws Exception {
 		final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 		rpcEndpoint.start();
@@ -393,18 +302,6 @@ public class AkkaRpcActorTest extends TestLogger {
 		CompletableFuture<Integer> foobar();
 	}
 
-	private static class TestRpcEndpoint extends RpcEndpoint {
-
-		protected TestRpcEndpoint(RpcService rpcService) {
-			super(rpcService);
-		}
-
-		@Override
-		public CompletableFuture<Void> postStop() {
-			return CompletableFuture.completedFuture(null);
-		}
-	}
-
 	static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway {
 
 		private volatile int _foobar = 42;
@@ -522,26 +419,4 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 	}
 
-	// -------------------------------------------------------------------------
-
-	interface OversizedResponseMsgRpcGateway extends RpcGateway {
-		CompletableFuture<String> calculate();
-	}
-
-	static class OversizedResponseRpcEndpoint extends TestRpcEndpoint implements OversizedResponseMsgRpcGateway {
-
-		private volatile String txt;
-
-		public OversizedResponseRpcEndpoint(RpcService rpcService, String txt) {
-			super(rpcService);
-			this.txt = txt;
-		}
-
-		@Override
-		public CompletableFuture<String> calculate() {
-			return CompletableFuture.completedFuture(txt);
-		}
-
-	}
-
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
deleted file mode 100644
index 523ad73..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.ActorSystem;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * RPC sync invoke test.
- */
-public class SyncCallsTest extends TestLogger {
-
-	// ------------------------------------------------------------------------
-	//  shared test members
-	// ------------------------------------------------------------------------
-
-	private static final Time timeout = Time.seconds(10L);
-
-	private static ActorSystem actorSystem1;
-	private static ActorSystem actorSystem2;
-	private static AkkaRpcService akkaRpcService1;
-	private static AkkaRpcService akkaRpcService2;
-
-	@BeforeClass
-	public static void setup() {
-		Configuration configuration = new Configuration();
-
-		actorSystem1 = AkkaUtils.createDefaultActorSystem();
-		actorSystem2 = AkkaUtils.createDefaultActorSystem();
-
-		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-		akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
-		akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
-	}
-
-	@AfterClass
-	public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
-		final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-
-		terminationFutures.add(akkaRpcService1.stopService());
-		terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
-		terminationFutures.add(akkaRpcService2.stopService());
-		terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
-		FutureUtils
-			.waitForAll(terminationFutures)
-			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-	}
-
-	@Test
-	public void testSimpleLocalSyncCall() throws Exception {
-		RpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
-		rpcEndpoint.start();
-
-		try {
-			DummyRpcGateway gateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
-
-			int actualResult = gateway.foobar();
-
-			assertEquals(1234, actualResult);
-		} finally {
-			rpcEndpoint.shutDown();
-		}
-
-	}
-
-	@Test
-	public void testSimpleRemoteSyncCall() throws Exception {
-		RpcEndpoint rpcEndpoint = null;
-
-		try {
-			rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
-			rpcEndpoint.start();
-
-			CompletableFuture<DummyRpcGateway> future = akkaRpcService2.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
-			DummyRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
-			int actualResult = rpcGateway.foobar();
-
-			assertEquals(1234, actualResult);
-		} finally {
-			if (rpcEndpoint != null) {
-				rpcEndpoint.shutDown();
-			}
-		}
-	}
-
-	@Test
-	public void testSimpleRemoteSyncCallWithOversizedMsg() throws Exception {
-		Configuration configuration = new Configuration();
-		configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
-		OversizedMsgRpcEndpoint rpcEndpoint = null;
-
-		ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
-		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
-		AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
-		AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
-		AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
-
-		try {
-			rpcEndpoint = new OversizedMsgRpcEndpoint(rpcService1, "hello world");
-
-			rpcEndpoint.start();
-
-			CompletableFuture<OversizedMsgRpcGateway> future = rpcService2.connect(
-				rpcEndpoint.getAddress(), OversizedMsgRpcGateway.class);
-			OversizedMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
-			String result = rpcGateway.response();
-
-			fail("Expected an AkkaRpcException.");
-		} catch (Exception e) {
-			assertTrue(e.getCause() instanceof IOException);
-		} finally {
-			if (rpcEndpoint != null) {
-				rpcEndpoint.shutDown();
-			}
-
-			final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-			terminationFutures.add(rpcService1.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
-			terminationFutures.add(rpcService2.stopService());
-			terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
-			FutureUtils
-				.waitForAll(terminationFutures)
-				.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-		}
-	}
-
-	/**
-	 * A dummy rpc gateway.
-	 */
-	public interface DummyRpcGateway extends RpcGateway {
-		int foobar();
-	}
-
-	/**
-	 * A dummy rpc endpoint.
-	 */
-	public static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
-
-		DummyRpcEndpoint(RpcService rpcService) {
-			super(rpcService);
-		}
-
-		@Override
-		public int foobar() {
-			return 1234;
-		}
-
-		@Override
-		public CompletableFuture<Void> postStop() {
-			return CompletableFuture.completedFuture(null);
-		}
-	}
-
-	/**
-	 * Oversized message rpc gateway.
-	 */
-	private interface OversizedMsgRpcGateway extends RpcGateway {
-		String response();
-	}
-
-	/**
-	 * Oversized message rpc endpoint.
-	 */
-	private static class OversizedMsgRpcEndpoint extends RpcEndpoint implements OversizedMsgRpcGateway {
-
-		private String txt;
-
-		public OversizedMsgRpcEndpoint(RpcService rpcService, String txt) {
-			super(rpcService);
-			this.txt = txt;
-		}
-
-		@Override
-		public CompletableFuture<Void> postStop() {
-			return CompletableFuture.completedFuture(null);
-		}
-
-		@Override
-		public String response() {
-			return this.txt;
-		}
-	}
-
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java
new file mode 100644
index 0000000..073dabe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import java.util.concurrent.CompletableFuture;
+
+class TestRpcEndpoint extends RpcEndpoint {
+
+	protected TestRpcEndpoint(RpcService rpcService) {
+		super(rpcService);
+	}
+
+	@Override
+	public CompletableFuture<Void> postStop() {
+		return CompletableFuture.completedFuture(null);
+	}
+}