You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:59 UTC
[26/50] [abbrv] flink git commit: [FLINK-4382] [rpc] Buffer rpc calls
until the RpcEndpoint has been started
[FLINK-4382] [rpc] Buffer rpc calls until the RpcEndpoint has been started
This PR allows the AkkaRpcActor to stash messages until the corresponding RcpEndpoint
has been started. When receiving a Processing.START message, the AkkaRpcActor
unstashes all messages and starts processing rpcs. When receiving a Processing.STOP
message, it will stop processing messages and stash incoming messages again.
Add test case for message stashing
This closes #2358.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/613f5a7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/613f5a7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/613f5a7d
Branch: refs/heads/flip-6
Commit: 613f5a7dfe551725306a799ce6815a496f302813
Parents: 9159ad6
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 11 18:13:25 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 21 11:39:14 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rpc/RpcEndpoint.java | 15 ++-
.../flink/runtime/rpc/StartStoppable.java | 35 ++++++
.../runtime/rpc/akka/AkkaInvocationHandler.java | 21 +++-
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 39 ++++++-
.../flink/runtime/rpc/akka/AkkaRpcService.java | 8 +-
.../runtime/rpc/akka/messages/Processing.java | 27 +++++
.../flink/runtime/rpc/RpcCompletenessTest.java | 45 +++++++-
.../runtime/rpc/akka/AkkaRpcActorTest.java | 108 +++++++++++++++++++
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 3 +
.../flink/runtime/rpc/akka/AsyncCallsTest.java | 5 +-
.../rpc/akka/MainThreadValidationTest.java | 4 +-
.../rpc/akka/MessageSerializationTest.java | 4 +
.../rpc/taskexecutor/TaskExecutorTest.java | 18 ++++
13 files changed, 315 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index d36a283..67ac182 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -74,7 +74,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
/** The main thread execution context to be used to execute future callbacks in the main thread
* of the executing rpc server. */
- private final MainThreadExecutionContext mainThreadExecutionContext;
+ private final ExecutionContext mainThreadExecutionContext;
/** A reference to the endpoint's main thread, if the current method is called by the main thread */
final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null);
@@ -106,10 +106,21 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
}
// ------------------------------------------------------------------------
- // Shutdown
+ // Start & Shutdown
// ------------------------------------------------------------------------
/**
+ * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
+ * to process remote procedure calls.
+ *
+ * IMPORTANT: Whenever you override this method, call the parent implementation to enable
+ * rpc processing. It is advised to make the parent call last.
+ */
+ public void start() {
+ ((StartStoppable) self).start();
+ }
+
+ /**
* Shuts down the underlying RPC endpoint via the RPC service.
* After this method was called, the RPC endpoint will no longer be reachable, neither remotely,
* not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
new file mode 100644
index 0000000..dd5595f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+/**
+ * Interface to start and stop the processing of rpc calls in the rpc server.
+ */
+public interface StartStoppable {
+
+ /**
+ * Starts the processing of remote procedure calls.
+ */
+ void start();
+
+ /**
+ * Stops the processing of remote procedure calls.
+ */
+ void stop();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 297104b..524bf74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -24,8 +24,10 @@ import akka.util.Timeout;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rpc.MainThreadExecutor;
import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
@@ -50,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
* executed.
*/
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable {
private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
private final ActorRef rpcEndpoint;
@@ -76,7 +78,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
Object result;
- if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) || declaringClass.equals(Object.class)) {
+ if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) ||
+ declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class)) {
result = method.invoke(this, args);
} else {
String methodName = method.getName();
@@ -171,6 +174,20 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
}
}
+ @Override
+ public void start() {
+ rpcEndpoint.tell(Processing.START, ActorRef.noSender());
+ }
+
+ @Override
+ public void stop() {
+ rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
+ }
+
+ // ------------------------------------------------------------------------
+ // Helper methods
+ // ------------------------------------------------------------------------
+
/**
* Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
* arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index dfcbcc3..2373be9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -20,13 +20,15 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorRef;
import akka.actor.Status;
-import akka.actor.UntypedActor;
+import akka.actor.UntypedActorWithStash;
+import akka.japi.Procedure;
import akka.pattern.Patterns;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
@@ -45,18 +47,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} and {@link CallAsync}
- * messages.
+ * {@link Processing} messages.
* <p>
* The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint}
* instance.
* <p>
* The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
* in the context of the actor thread.
+ * <p>
+ * The {@link Processing} message controls the processing behaviour of the akka rpc actor. A
+ * {@link Processing#START} message unstashes all stashed messages and starts processing incoming
+ * messages. A {@link Processing#STOP} message stops processing messages and stashes incoming
+ * messages.
*
* @param <C> Type of the {@link RpcGateway} associated with the {@link RpcEndpoint}
* @param <T> Type of the {@link RpcEndpoint}
*/
-class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActor {
+class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActorWithStash {
private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class);
@@ -73,6 +80,27 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
@Override
public void onReceive(final Object message) {
+ if (message.equals(Processing.START)) {
+ unstashAll();
+ getContext().become(new Procedure<Object>() {
+ @Override
+ public void apply(Object message) throws Exception {
+ if (message.equals(Processing.STOP)) {
+ getContext().unbecome();
+ } else {
+ handleMessage(message);
+ }
+ }
+ });
+ } else {
+ LOG.info("The rpc endpoint {} has not been started yet. Stashing message {} until processing is started.",
+ rpcEndpoint.getClass().getName(),
+ message.getClass().getName());
+ stash();
+ }
+ }
+
+ private void handleMessage(Object message) {
mainThreadValidator.enterMainThread();
try {
if (message instanceof RunAsync) {
@@ -82,7 +110,10 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
} else if (message instanceof RpcInvocation) {
handleRpcInvocation((RpcInvocation) message);
} else {
- LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass());
+ LOG.warn(
+ "Received message of unknown type {} with value {}. Dropping this message!",
+ message.getClass().getName(),
+ message);
}
} finally {
mainThreadValidator.exitMainThread();
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index b963c53..7b33524 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.rpc.MainThreadExecutor;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
-
+import org.apache.flink.runtime.rpc.StartStoppable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,7 +136,11 @@ public class AkkaRpcService implements RpcService {
@SuppressWarnings("unchecked")
C self = (C) Proxy.newProxyInstance(
classLoader,
- new Class<?>[]{rpcEndpoint.getSelfGatewayType(), MainThreadExecutor.class, AkkaGateway.class},
+ new Class<?>[]{
+ rpcEndpoint.getSelfGatewayType(),
+ MainThreadExecutor.class,
+ StartStoppable.class,
+ AkkaGateway.class},
akkaInvocationHandler);
return self;
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
new file mode 100644
index 0000000..5c7df5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+/**
+ * Controls the processing behaviour of the {@link org.apache.flink.runtime.rpc.akka.AkkaRpcActor}
+ */
+public enum Processing {
+ START, // Unstashes all stashed messages and starts processing incoming messages
+ STOP // Stop processing messages and stashes all incoming messages
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index e50533e..97cf0cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.rpc;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.ReflectionUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -140,7 +142,7 @@ public class RpcCompletenessTest extends TestLogger {
int rpcTimeoutParameters = 0;
for (int i = 0; i < parameterAnnotations.length; i++) {
- if (isRpcTimeout(parameterAnnotations[i])) {
+ if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
assertTrue(
"The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".",
parameterTypes[i].equals(FiniteDuration.class));
@@ -185,7 +187,7 @@ public class RpcCompletenessTest extends TestLogger {
// filter out the RpcTimeout parameters
for (int i = 0; i < gatewayParameterTypes.length; i++) {
- if (!isRpcTimeout(gatewayParameterAnnotations[i])) {
+ if (!RpcCompletenessTest.isRpcTimeout(gatewayParameterAnnotations[i])) {
filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
}
}
@@ -235,7 +237,22 @@ public class RpcCompletenessTest extends TestLogger {
}
private boolean checkType(Class<?> firstType, Class<?> secondType) {
- return firstType.equals(secondType);
+ Class<?> firstResolvedType;
+ Class<?> secondResolvedType;
+
+ if (firstType.isPrimitive()) {
+ firstResolvedType = RpcCompletenessTest.resolvePrimitiveType(firstType);
+ } else {
+ firstResolvedType = firstType;
+ }
+
+ if (secondType.isPrimitive()) {
+ secondResolvedType = RpcCompletenessTest.resolvePrimitiveType(secondType);
+ } else {
+ secondResolvedType = secondType;
+ }
+
+ return firstResolvedType.equals(secondResolvedType);
}
/**
@@ -279,7 +296,7 @@ public class RpcCompletenessTest extends TestLogger {
for (int i = 0; i < parameterTypes.length; i++) {
// filter out the RpcTimeout parameters
- if (!isRpcTimeout(parameterAnnotations[i])) {
+ if (!RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
builder.append(parameterTypes[i].getName());
if (i < parameterTypes.length -1) {
@@ -293,7 +310,7 @@ public class RpcCompletenessTest extends TestLogger {
return builder.toString();
}
- private boolean isRpcTimeout(Annotation[] annotations) {
+ private static boolean isRpcTimeout(Annotation[] annotations) {
for (Annotation annotation : annotations) {
if (annotation.annotationType().equals(RpcTimeout.class)) {
return true;
@@ -302,4 +319,22 @@ public class RpcCompletenessTest extends TestLogger {
return false;
}
+
+ /**
+ * Returns the boxed type for a primitive type.
+ *
+ * @param primitveType Primitive type to resolve
+ * @return Boxed type for the given primitive type
+ */
+ private static Class<?> resolvePrimitiveType(Class<?> primitveType) {
+ assert primitveType.isPrimitive();
+
+ TypeInformation<?> typeInformation = BasicTypeInfo.getInfoFor(primitveType);
+
+ if (typeInformation != null) {
+ return typeInformation.getTypeClass();
+ } else {
+ throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.');
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
new file mode 100644
index 0000000..1653fac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
+import org.hamcrest.core.Is;
+import org.junit.AfterClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThat;
+
+public class AkkaRpcActorTest extends TestLogger {
+
+ // ------------------------------------------------------------------------
+ // shared test members
+ // ------------------------------------------------------------------------
+
+ private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+ private static Timeout timeout = new Timeout(10000, TimeUnit.MILLISECONDS);
+
+ private static AkkaRpcService akkaRpcService =
+ new AkkaRpcService(actorSystem, timeout);
+
+ @AfterClass
+ public static void shutdown() {
+ akkaRpcService.stopService();
+ actorSystem.shutdown();
+ actorSystem.awaitTermination();
+ }
+
+ /**
+ * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding
+ * {@link RpcEndpoint} has been started.
+ */
+ @Test
+ public void testMessageStashing() throws Exception {
+ int expectedValue = 1337;
+
+ DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
+
+ DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
+
+ // this message should not be processed until we've started the rpc endpoint
+ Future<Integer> result = rpcGateway.foobar();
+
+ // set a new value which we expect to be returned
+ rpcEndpoint.setFoobar(expectedValue);
+
+ // now process the rpc
+ rpcEndpoint.start();
+
+ Integer actualValue = Await.result(result, timeout.duration());
+
+ assertThat("The new foobar value should have been returned.", actualValue, Is.is(expectedValue));
+
+ rpcEndpoint.shutDown();
+ }
+
+ private interface DummyRpcGateway extends RpcGateway {
+ Future<Integer> foobar();
+ }
+
+ private static class DummyRpcEndpoint extends RpcEndpoint<DummyRpcGateway> {
+
+ private volatile int _foobar = 42;
+
+ protected DummyRpcEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @RpcMethod
+ public int foobar() {
+ return _foobar;
+ }
+
+ public void setFoobar(int value) {
+ _foobar = value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index f26b40b..fd55904 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -57,6 +57,9 @@ public class AkkaRpcServiceTest extends TestLogger {
ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
+ resourceManager.start();
+ jobMaster.start();
+
ResourceManagerGateway rm = resourceManager.getSelf();
assertTrue(rm instanceof AkkaGateway);
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
index f2ce52d..d33987c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
@@ -42,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.*;
-public class AsyncCallsTest {
+public class AsyncCallsTest extends TestLogger {
// ------------------------------------------------------------------------
// shared test members
@@ -72,6 +73,7 @@ public class AsyncCallsTest {
final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+ testEndpoint.start();
TestGateway gateway = testEndpoint.getSelf();
// a bunch of gateway calls
@@ -127,6 +129,7 @@ public class AsyncCallsTest {
final long delay = 200;
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+ testEndpoint.start();
// run something asynchronously
testEndpoint.runAsync(new Runnable() {
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index b854143..9ffafda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -27,13 +27,14 @@ import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
-public class MainThreadValidationTest {
+public class MainThreadValidationTest extends TestLogger {
@Test
public void failIfNotInMainThread() {
@@ -51,6 +52,7 @@ public class MainThreadValidationTest {
try {
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
+ testEndpoint.start();
// this works, because it is executed as an RPC call
testEndpoint.getSelf().someConcurrencyCriticalFunction();
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index ca8179c..9d2ed99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -86,6 +86,7 @@ public class MessageSerializationTest extends TestLogger {
public void testNonSerializableLocalMessageTransfer() throws InterruptedException, IOException {
LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
+ testEndpoint.start();
TestGateway testGateway = testEndpoint.getSelf();
@@ -106,6 +107,7 @@ public class MessageSerializationTest extends TestLogger {
LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
+ testEndpoint.start();
String address = testEndpoint.getAddress();
@@ -126,6 +128,7 @@ public class MessageSerializationTest extends TestLogger {
LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
+ testEndpoint.start();
String address = testEndpoint.getAddress();
@@ -149,6 +152,7 @@ public class MessageSerializationTest extends TestLogger {
LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
+ testEndpoint.start();
String address = testEndpoint.getAddress();
http://git-wip-us.apache.org/repos/asf/flink/blob/613f5a7d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index 33c9cb6..c96f4f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -28,17 +28,26 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.cglib.proxy.InvocationHandler;
+import org.mockito.cglib.proxy.Proxy;
+import scala.concurrent.Future;
import java.net.URL;
import java.util.Collections;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TaskExecutorTest extends TestLogger {
@@ -48,8 +57,13 @@ public class TaskExecutorTest extends TestLogger {
@Test
public void testTaskExecution() throws Exception {
RpcService testingRpcService = mock(RpcService.class);
+ InvocationHandler invocationHandler = mock(InvocationHandler.class);
+ Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler);
+ when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
+
DirectExecutorService directExecutorService = new DirectExecutorService();
TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+ taskExecutor.start();
TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
new JobID(),
@@ -82,8 +96,12 @@ public class TaskExecutorTest extends TestLogger {
@Test(expected=Exception.class)
public void testWrongTaskCancellation() throws Exception {
RpcService testingRpcService = mock(RpcService.class);
+ InvocationHandler invocationHandler = mock(InvocationHandler.class);
+ Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler);
+ when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
DirectExecutorService directExecutorService = null;
TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+ taskExecutor.start();
taskExecutor.cancelTask(new ExecutionAttemptID());