You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/24 10:34:07 UTC
[flink] 02/03: [FLINK-10251][rpc] Refactor handling of over sized
messages
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3667ea605ec9002df225493f0a0b5a6840b6f28d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 24 08:48:50 2019 +0100
[FLINK-10251][rpc] Refactor handling of over sized messages
This commit slightly refactors and cleans up the handling of over sized messages.
This closes #6876.
---
.../runtime/rpc/akka/AkkaInvocationHandler.java | 18 +-
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 31 ++-
.../flink/runtime/rpc/akka/AkkaRpcService.java | 6 +-
.../rpc/akka/AkkaRpcServiceConfiguration.java | 37 ++--
.../flink/runtime/rpc/akka/FencedAkkaRpcActor.java | 9 +-
.../AkkaRpcActorOversizedResponseMessageTest.java | 194 ++++++++++++++++++
.../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 131 +-----------
.../flink/runtime/rpc/akka/SyncCallsTest.java | 227 ---------------------
.../flink/runtime/rpc/akka/TestRpcEndpoint.java | 36 ++++
9 files changed, 287 insertions(+), 402 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 90e126c..e147cf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
@@ -51,6 +52,7 @@ import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -208,12 +210,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
result = null;
} else {
// execute an asynchronous call
- CompletableFuture resultFuture = ask(rpcInvocation, futureTimeout);
+ CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
- CompletableFuture completableFuture = resultFuture.thenApply((Object o) -> {
+ CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
if (o instanceof SerializedValue) {
try {
- return ((SerializedValue) o).deserializeValue(getClass().getClassLoader());
+ return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new CompletionException(
new RpcException("Could not deserialize the serialized payload of RPC method : "
@@ -224,10 +226,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
}
});
- if (!Objects.equals(returnType, CompletableFuture.class)) {
- result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
- } else {
+ if (Objects.equals(returnType, CompletableFuture.class)) {
result = completableFuture;
+ } else {
+ try {
+ result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
+ } catch (ExecutionException ee) {
+ throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee));
+ }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 84d75fe..b17222e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -266,7 +266,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
return;
}
- boolean remoteSender = isRemoteSender();
+ final boolean isRemoteSender = isRemoteSender();
final String methodName = rpcMethod.getName();
if (result instanceof CompletableFuture) {
@@ -278,32 +278,32 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
if (throwable != null) {
promise.failure(throwable);
} else {
- if (!remoteSender) {
- promise.success(value);
- } else {
- Either<SerializedValue, AkkaRpcException> serializedResult =
- serializeRemoteResultAndVerifySize(value, methodName);
+ if (isRemoteSender) {
+ Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(value, methodName);
+
if (serializedResult.isLeft()) {
promise.success(serializedResult.left());
} else {
promise.failure(serializedResult.right());
}
+ } else {
+ promise.success(value);
}
}
});
Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
} else {
- if (!remoteSender) {
- getSender().tell(result, getSelf());
- } else {
- Either<SerializedValue, AkkaRpcException> serializedResult =
- serializeRemoteResultAndVerifySize(result, methodName);
+ if (isRemoteSender) {
+ Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(result, methodName);
+
if (serializedResult.isLeft()) {
getSender().tell(new Status.Success(serializedResult.left()), getSelf());
} else {
getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
}
+ } else {
+ getSender().tell(new Status.Success(result), getSelf());
}
}
}
@@ -315,14 +315,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
}
}
- protected boolean isRemoteSender() {
+ private boolean isRemoteSender() {
return !getSender().path().address().hasLocalScope();
}
- private Either<SerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(
- Object result, String methodName) {
+ private Either<SerializedValue<?>, AkkaRpcException> serializeRemoteResultAndVerifySize(Object result, String methodName) {
try {
- SerializedValue serializedResult = new SerializedValue(result);
+ SerializedValue<?> serializedResult = new SerializedValue<>(result);
long resultSize = serializedResult.getByteArray().length;
if (resultSize > maximumFramesize) {
@@ -334,7 +333,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
}
} catch (IOException e) {
return Either.Right(new AkkaRpcException(
- "Failed to serialize the result for RPC call : " + methodName + ".", e));
+ "Failed to serialize the result for RPC call : " + methodName + '.', e));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 57c4dee..6548513 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -204,7 +204,7 @@ public class AkkaRpcService implements RpcService {
configuration.getMaximumFramesize());
} else {
akkaRpcActorProps = Props.create(
- getAkkaRpcActorClass(),
+ AkkaRpcActor.class,
rpcEndpoint,
terminationFuture,
getVersion(),
@@ -390,10 +390,6 @@ public class AkkaRpcService implements RpcService {
return FutureUtils.toJava(scalaFuture);
}
- protected Class getAkkaRpcActorClass() {
- return AkkaRpcActor.class;
- }
-
// ---------------------------------------------------------------------------------------
// Private helper methods
// ---------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
index 35f464b..41f4c71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -20,28 +20,39 @@ package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+
+import javax.annotation.Nonnull;
+
import scala.concurrent.duration.FiniteDuration;
import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Configuration object for {@link AkkaRpcService}.
+ * Configuration for the {@link AkkaRpcService}.
*/
public class AkkaRpcServiceConfiguration {
+ @Nonnull
+ private final Configuration configuration;
+
+ @Nonnull
private final Time timeout;
+
private final long maximumFramesize;
- private final Configuration configuration;
- public AkkaRpcServiceConfiguration(Time timeout, long maximumFramesize, Configuration configuration) {
- checkNotNull(timeout);
- checkArgument(maximumFramesize > 0, "Maximum framesize must be positive.");
+ public AkkaRpcServiceConfiguration(@Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize) {
+ checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
+ this.configuration = configuration;
this.timeout = timeout;
this.maximumFramesize = maximumFramesize;
- this.configuration = configuration;
}
+ @Nonnull
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Nonnull
public Time getTimeout() {
return timeout;
}
@@ -50,17 +61,13 @@ public class AkkaRpcServiceConfiguration {
return maximumFramesize;
}
- public Configuration getConfiguration() {
- return configuration;
- }
-
public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
- FiniteDuration duration = AkkaUtils.getTimeout(configuration);
- Time timeout = Time.of(duration.length(), duration.unit());
+ final FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+ final Time timeout = Time.of(duration.length(), duration.unit());
- long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
+ final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
- return new AkkaRpcServiceConfiguration(timeout, maximumFramesize, configuration);
+ return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize);
}
public static AkkaRpcServiceConfiguration defaultConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 72c783b..64a2a05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -40,11 +40,10 @@ import java.util.concurrent.CompletableFuture;
public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
public FencedAkkaRpcActor(
- T rpcEndpoint,
- CompletableFuture<Boolean> terminationFuture,
- int version,
- final long maximumFramesize) {
-
+ T rpcEndpoint,
+ CompletableFuture<Boolean> terminationFuture,
+ int version,
+ final long maximumFramesize) {
super(rpcEndpoint, terminationFuture, version, maximumFramesize);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
new file mode 100644
index 0000000..73de0a1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorOversizedResponseMessageTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the over sized response message handling of the {@link AkkaRpcActor}.
+ */
+public class AkkaRpcActorOversizedResponseMessageTest extends TestLogger {
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+
+ private static final int FRAMESIZE = 32000;
+
+ private static final String OVERSIZED_PAYLOAD = new String(new byte[FRAMESIZE]);
+
+ private static final String PAYLOAD = "Hello";
+
+ private static RpcService rpcService1;
+
+ private static RpcService rpcService2;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.setString(AkkaOptions.FRAMESIZE, FRAMESIZE + " b");
+
+ rpcService1 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
+ rpcService2 = AkkaRpcServiceUtils.createRpcService("localhost", 0, configuration);
+ }
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
+ RpcUtils.terminateRpcServices(TIMEOUT, rpcService1, rpcService2);
+ }
+
+ @Test
+ public void testOverSizedResponseMsgAsync() throws Exception {
+ try {
+ runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
+
+ fail("Expected the RPC to fail.");
+ } catch (ExecutionException e) {
+ assertThat(ExceptionUtils.findThrowable(e, AkkaRpcException.class).isPresent(), is(true));
+ }
+ }
+
+ @Test
+ public void testNormalSizedResponseMsgAsync() throws Exception {
+ final String message = runRemoteMessageResponseTest(PAYLOAD, this::requestMessageAsync);
+ assertThat(message, is(equalTo(PAYLOAD)));
+ }
+
+ @Test
+ public void testNormalSizedResponseMsgSync() throws Exception {
+ final String message = runRemoteMessageResponseTest(PAYLOAD, MessageRpcGateway::messageSync);
+ assertThat(message, is(equalTo(PAYLOAD)));
+ }
+
+ @Test
+ public void testOverSizedResponseMsgSync() throws Exception {
+ try {
+ runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
+
+ fail("Expected the RPC to fail.");
+ } catch (RpcException e) {
+ assertThat(ExceptionUtils.findThrowable(e, AkkaRpcException.class).isPresent(), is(true));
+ }
+ }
+
+ /**
+ * Tests that we can send arbitrarily large objects when communicating locally with
+ * the rpc endpoint.
+ */
+ @Test
+ public void testLocalOverSizedResponseMsgSync() throws Exception {
+ final String message = runLocalMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
+ assertThat(message, is(equalTo(OVERSIZED_PAYLOAD)));
+ }
+
+ /**
+ * Tests that we can send arbitrarily large objects when communicating locally with
+ * the rpc endpoint.
+ */
+ @Test
+ public void testLocalOverSizedResponseMsgAsync() throws Exception {
+ final String message = runLocalMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
+ assertThat(message, is(equalTo(OVERSIZED_PAYLOAD)));
+ }
+
+ private String requestMessageAsync(MessageRpcGateway messageRpcGateway) throws Exception {
+ CompletableFuture<String> messageFuture = messageRpcGateway.messageAsync();
+ return messageFuture.get();
+ }
+
+ private <T> T runRemoteMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
+ final MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
+
+ try {
+ rpcEndpoint.start();
+
+ MessageRpcGateway rpcGateway = rpcService2.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
+
+ return rpcCall.apply(rpcGateway);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(rpcEndpoint, TIMEOUT);
+ }
+ }
+
+ private <T> T runLocalMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
+ final MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
+
+ try {
+ rpcEndpoint.start();
+
+ MessageRpcGateway rpcGateway = rpcService1.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
+
+ return rpcCall.apply(rpcGateway);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(rpcEndpoint, TIMEOUT);
+ }
+ }
+
+ // -------------------------------------------------------------------------
+
+ interface MessageRpcGateway extends RpcGateway {
+ CompletableFuture<String> messageAsync();
+
+ String messageSync() throws RpcException;
+ }
+
+ static class MessageRpcEndpoint extends TestRpcEndpoint implements MessageRpcGateway {
+
+ @Nonnull
+ private final String message;
+
+ MessageRpcEndpoint(RpcService rpcService, @Nonnull String message) {
+ super(rpcService);
+ this.message = message;
+ }
+
+ @Override
+ public CompletableFuture<String> messageAsync() {
+ return CompletableFuture.completedFuture(message);
+ }
+
+ @Override
+ public String messageSync() throws RpcException {
+ return message;
+ }
+
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index fd62305..040e949 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -19,8 +19,6 @@
package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -40,22 +38,19 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import scala.concurrent.Await;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import scala.concurrent.Await;
-
public class AkkaRpcActorTest extends TestLogger {
// ------------------------------------------------------------------------
@@ -149,99 +144,13 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.shutDown();
}
- @Test
- public void testOversizedResponseMsg() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
- OversizedResponseRpcEndpoint rpcEndpoint = null;
-
- ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
- ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
- AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
- AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
- AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
-
- try {
- rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
-
- rpcEndpoint.start();
-
- CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(
- rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
- OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
- CompletableFuture<String> result = rpcGateway.calculate();
-
- result.get(timeout.getSize(), timeout.getUnit());
-
- fail("Expected an AkkaRpcException.");
- } catch (Exception e) {
- assertTrue(e.getCause() instanceof IOException);
- } finally {
- if (rpcEndpoint != null) {
- rpcEndpoint.shutDown();
- }
-
- final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
- terminationFutures.add(rpcService1.stopService());
- terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
- terminationFutures.add(rpcService2.stopService());
- terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
- FutureUtils
- .waitForAll(terminationFutures)
- .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
- }
-
- @Test
- public void testNonOversizedResponseMsg() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(AkkaOptions.FRAMESIZE, "1000 kB");
- OversizedResponseRpcEndpoint rpcEndpoint = null;
-
- ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
- ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
- AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
- AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
- AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
-
- try {
- rpcEndpoint = new OversizedResponseRpcEndpoint(rpcService1, "hello world");
- rpcEndpoint.start();
-
- CompletableFuture<OversizedResponseMsgRpcGateway> future = rpcService2.connect(rpcEndpoint.getAddress(), OversizedResponseMsgRpcGateway.class);
- OversizedResponseMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
- CompletableFuture<String> result = rpcGateway.calculate();
-
- String actualTxt = result.get(timeout.getSize(), timeout.getUnit());
-
- assertEquals("hello world", actualTxt);
- } finally {
- if (rpcEndpoint != null) {
- rpcEndpoint.shutDown();
- }
-
- final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
- terminationFutures.add(rpcService1.stopService());
- terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
- terminationFutures.add(rpcService2.stopService());
- terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
- FutureUtils
- .waitForAll(terminationFutures)
- .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
- }
-
/**
* Tests that we can wait for a RpcEndpoint to terminate.
*
* @throws ExecutionException
* @throws InterruptedException
*/
- @Test(timeout=5000)
+ @Test(timeout = 5000)
public void testRpcEndpointTerminationFuture() throws Exception {
final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
rpcEndpoint.start();
@@ -393,18 +302,6 @@ public class AkkaRpcActorTest extends TestLogger {
CompletableFuture<Integer> foobar();
}
- private static class TestRpcEndpoint extends RpcEndpoint {
-
- protected TestRpcEndpoint(RpcService rpcService) {
- super(rpcService);
- }
-
- @Override
- public CompletableFuture<Void> postStop() {
- return CompletableFuture.completedFuture(null);
- }
- }
-
static class DummyRpcEndpoint extends TestRpcEndpoint implements DummyRpcGateway {
private volatile int _foobar = 42;
@@ -522,26 +419,4 @@ public class AkkaRpcActorTest extends TestLogger {
}
}
- // -------------------------------------------------------------------------
-
- interface OversizedResponseMsgRpcGateway extends RpcGateway {
- CompletableFuture<String> calculate();
- }
-
- static class OversizedResponseRpcEndpoint extends TestRpcEndpoint implements OversizedResponseMsgRpcGateway {
-
- private volatile String txt;
-
- public OversizedResponseRpcEndpoint(RpcService rpcService, String txt) {
- super(rpcService);
- this.txt = txt;
- }
-
- @Override
- public CompletableFuture<String> calculate() {
- return CompletableFuture.completedFuture(txt);
- }
-
- }
-
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
deleted file mode 100644
index 523ad73..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/SyncCallsTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.ActorSystem;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * RPC sync invoke test.
- */
-public class SyncCallsTest extends TestLogger {
-
- // ------------------------------------------------------------------------
- // shared test members
- // ------------------------------------------------------------------------
-
- private static final Time timeout = Time.seconds(10L);
-
- private static ActorSystem actorSystem1;
- private static ActorSystem actorSystem2;
- private static AkkaRpcService akkaRpcService1;
- private static AkkaRpcService akkaRpcService2;
-
- @BeforeClass
- public static void setup() {
- Configuration configuration = new Configuration();
-
- actorSystem1 = AkkaUtils.createDefaultActorSystem();
- actorSystem2 = AkkaUtils.createDefaultActorSystem();
-
- AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
- akkaRpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);
- akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);
- }
-
- @AfterClass
- public static void teardown() throws InterruptedException, ExecutionException, TimeoutException {
- final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
-
- terminationFutures.add(akkaRpcService1.stopService());
- terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
- terminationFutures.add(akkaRpcService2.stopService());
- terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
- FutureUtils
- .waitForAll(terminationFutures)
- .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
-
- @Test
- public void testSimpleLocalSyncCall() throws Exception {
- RpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
- rpcEndpoint.start();
-
- try {
- DummyRpcGateway gateway = rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
-
- int actualResult = gateway.foobar();
-
- assertEquals(1234, actualResult);
- } finally {
- rpcEndpoint.shutDown();
- }
-
- }
-
- @Test
- public void testSimpleRemoteSyncCall() throws Exception {
- RpcEndpoint rpcEndpoint = null;
-
- try {
- rpcEndpoint = new DummyRpcEndpoint(akkaRpcService1);
- rpcEndpoint.start();
-
- CompletableFuture<DummyRpcGateway> future = akkaRpcService2.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
- DummyRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
- int actualResult = rpcGateway.foobar();
-
- assertEquals(1234, actualResult);
- } finally {
- if (rpcEndpoint != null) {
- rpcEndpoint.shutDown();
- }
- }
- }
-
- @Test
- public void testSimpleRemoteSyncCallWithOversizedMsg() throws Exception {
- Configuration configuration = new Configuration();
- configuration.setString(AkkaOptions.FRAMESIZE, "10 b");
- OversizedMsgRpcEndpoint rpcEndpoint = null;
-
- ActorSystem actorSystem1 = AkkaUtils.createDefaultActorSystem();
- ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
- AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
- AkkaRpcService rpcService1 = new AkkaRpcService(actorSystem1, akkaRpcServiceConfig);;
- AkkaRpcService rpcService2 = new AkkaRpcService(actorSystem2, akkaRpcServiceConfig);;
-
- try {
- rpcEndpoint = new OversizedMsgRpcEndpoint(rpcService1, "hello world");
-
- rpcEndpoint.start();
-
- CompletableFuture<OversizedMsgRpcGateway> future = rpcService2.connect(
- rpcEndpoint.getAddress(), OversizedMsgRpcGateway.class);
- OversizedMsgRpcGateway rpcGateway = future.get(10000, TimeUnit.SECONDS);
-
- String result = rpcGateway.response();
-
- fail("Expected an AkkaRpcException.");
- } catch (Exception e) {
- assertTrue(e.getCause() instanceof IOException);
- } finally {
- if (rpcEndpoint != null) {
- rpcEndpoint.shutDown();
- }
-
- final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4);
- terminationFutures.add(rpcService1.stopService());
- terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
- terminationFutures.add(rpcService2.stopService());
- terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
-
- FutureUtils
- .waitForAll(terminationFutures)
- .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
- }
-
- /**
- * A dummy rpc gateway.
- */
- public interface DummyRpcGateway extends RpcGateway {
- int foobar();
- }
-
- /**
- * A dummy rpc endpoint.
- */
- public static class DummyRpcEndpoint extends RpcEndpoint implements DummyRpcGateway {
-
- DummyRpcEndpoint(RpcService rpcService) {
- super(rpcService);
- }
-
- @Override
- public int foobar() {
- return 1234;
- }
-
- @Override
- public CompletableFuture<Void> postStop() {
- return CompletableFuture.completedFuture(null);
- }
- }
-
- /**
- * Oversized message rpc gateway.
- */
- private interface OversizedMsgRpcGateway extends RpcGateway {
- String response();
- }
-
- /**
- * Oversized message rpc endpoint.
- */
- private static class OversizedMsgRpcEndpoint extends RpcEndpoint implements OversizedMsgRpcGateway {
-
- private String txt;
-
- public OversizedMsgRpcEndpoint(RpcService rpcService, String txt) {
- super(rpcService);
- this.txt = txt;
- }
-
- @Override
- public CompletableFuture<Void> postStop() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public String response() {
- return this.txt;
- }
- }
-
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java
new file mode 100644
index 0000000..073dabe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TestRpcEndpoint.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import java.util.concurrent.CompletableFuture;
+
+class TestRpcEndpoint extends RpcEndpoint {
+
+ protected TestRpcEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @Override
+ public CompletableFuture<Void> postStop() {
+ return CompletableFuture.completedFuture(null);
+ }
+}