You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/08/08 13:27:26 UTC

flink git commit: Add runAsync method to execute runnables in the main rpc thread

Repository: flink
Updated Branches:
  refs/heads/flip-6 583911497 -> fee1bef80


Add runAsync method to execute runnables in the main rpc thread

Add CompletedFuture


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fee1bef8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fee1bef8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fee1bef8

Branch: refs/heads/flip-6
Commit: fee1bef804e4e4716f201283beb79e77ef8d36e6
Parents: 5839114
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 5 00:27:31 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Aug 8 15:19:20 2016 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  5 +++
 .../org/apache/flink/runtime/rpc/RpcServer.java | 35 ++++++++++++++---
 .../apache/flink/runtime/rpc/RpcService.java    |  4 +-
 .../flink/runtime/rpc/RunnableRpcGateway.java   | 23 +++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  8 ++--
 .../runtime/rpc/akka/RunnableAkkaActor.java     | 33 ++++++++++++++++
 .../runtime/rpc/akka/RunnableAkkaGateway.java   | 30 ++++++++++++++
 .../rpc/akka/jobmaster/JobMasterAkkaActor.java  |  6 +--
 .../akka/jobmaster/JobMasterAkkaGateway.java    |  4 +-
 .../rpc/akka/messages/RunnableMessage.java      | 31 +++++++++++++++
 .../ResourceManagerAkkaActor.java               |  6 +--
 .../ResourceManagerAkkaGateway.java             |  4 +-
 .../taskexecutor/TaskExecutorAkkaActor.java     |  6 +--
 .../taskexecutor/TaskExecutorAkkaGateway.java   |  4 +-
 .../flink/runtime/rpc/jobmaster/JobMaster.java  | 34 ++++++----------
 .../rpc/resourcemanager/ResourceManager.java    | 19 +--------
 .../runtime/rpc/taskexecutor/TaskExecutor.java  | 19 +--------
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 24 ++++++------
 .../runtime/util/DirectExecutorService.java     | 41 +++++++++++++++++++-
 pom.xml                                         | 14 +++----
 20 files changed, 245 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5fea8fb..09c6fd0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -189,6 +189,11 @@ under the License.
 			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
 		</dependency>
 
+		<dependency>
+			<groupId>org.reflections</groupId>
+			<artifactId>reflections</artifactId>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
index fba9250..c064c09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
@@ -18,12 +18,21 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
+
 /**
- * Marker interface for rpc servers. Every rpc server should implement this interface.
+ * Base class for rpc servers. Every rpc server should implement this interface.
  *
- * @param <C> Rpc client counter part matching the RpcServer
+ * @param <C> Rpc gateway counter part matching the RpcServer
  */
-public interface RpcServer<C extends RpcGateway> {
+public abstract class RpcServer<C extends RpcGateway> {
+	private final RpcService rpcService;
+	private C self;
+
+	public RpcServer(RpcService rpcService) {
+		this.rpcService = rpcService;
+	}
+
 	/**
 	 * Get self-gateway which should be used to run asynchronous rpc calls on the server.
 	 *
@@ -32,9 +41,23 @@ public interface RpcServer<C extends RpcGateway> {
 	 *
 	 * @return Self gateway
 	 */
-	C getSelf();
+	public C getSelf() {
+		return self;
+	}
+
+	public void runAsync(Runnable runnable) {
+		((RunnableAkkaGateway) self).runAsync(runnable);
+	}
+
+	public RpcService getRpcService() {
+		return rpcService;
+	}
 
-	void start();
+	public void start() {
+		self = rpcService.startServer(this);
+	}
 
-	void shutDown();
+	public void shutDown() {
+		rpcService.stopServer(self);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index a12039c..fddcf9d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -23,9 +23,9 @@ import scala.concurrent.Future;
 public interface RpcService {
 	<C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
 
-	<S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler, Class<C> rpcClientClass);
+	<S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler);
 
-	<T> void stopServer(T server);
+	<C extends RpcGateway> void stopServer(C gateway);
 
 	void stopService();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RunnableRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RunnableRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RunnableRpcGateway.java
new file mode 100644
index 0000000..c05c5fa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RunnableRpcGateway.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+public interface RunnableRpcGateway {
+	void runAsync(Runnable runnable);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index cb707bb..858e41a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -84,7 +84,7 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
-	public <S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler, Class<C> rpcClientClass) {
+	public <S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler) {
 		ActorRef ref;
 		C self;
 		if (methodHandler instanceof TaskExecutor) {
@@ -115,9 +115,9 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
-	public <T> void stopServer(T server) {
-		if (server instanceof AkkaGateway) {
-			AkkaGateway akkaClient = (AkkaGateway) server;
+	public <C extends RpcGateway> void stopServer(C selfGateway) {
+		if (selfGateway instanceof AkkaGateway) {
+			AkkaGateway akkaClient = (AkkaGateway) selfGateway;
 
 			if (actors.contains(akkaClient.getActorRef())) {
 				akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaActor.java
new file mode 100644
index 0000000..745f3ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaActor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+
+public class RunnableAkkaActor extends UntypedActor {
+	@Override
+	public void onReceive(Object message) throws Exception {
+		if (message instanceof RunnableMessage) {
+			((RunnableMessage) message).getRunnable().run();
+		} else {
+			throw new RuntimeException("Unknown message " + message);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaGateway.java
new file mode 100644
index 0000000..b7c379d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/RunnableAkkaGateway.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.rpc.RunnableRpcGateway;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+
+public abstract class RunnableAkkaGateway implements RunnableRpcGateway, AkkaGateway {
+	@Override
+	public void runAsync(Runnable runnable) {
+		getActorRef().tell(new RunnableMessage(runnable), ActorRef.noSender());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
index a91d7d4..a1bff44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.rpc.akka.jobmaster;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaActor;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.akka.messages.HandleRegistrationResponse;
 import org.apache.flink.runtime.rpc.akka.messages.TriggerResourceManagerRegistration;
 import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
 
-public class JobMasterAkkaActor extends UntypedActor {
+public class JobMasterAkkaActor extends RunnableAkkaActor {
 	private final JobMaster jobMaster;
 
 	public JobMasterAkkaActor(JobMaster jobMaster) {
@@ -57,7 +57,7 @@ public class JobMasterAkkaActor extends UntypedActor {
 
 			jobMaster.handleRegistrationResponse(registrationResponse.getRegistrationResponse(), registrationResponse.getResourceManagerGateway());
 		} else {
-			throw new RuntimeException("Unknown message type received: " + message.getClass());
+			super.onReceive(message);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
index fe4b0c1..15dad2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.rpc.akka.jobmaster;
 import akka.actor.ActorRef;
 import akka.pattern.AskableActorRef;
 import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
 import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.akka.AkkaGateway;
 import org.apache.flink.runtime.rpc.akka.messages.HandleRegistrationResponse;
 import org.apache.flink.runtime.rpc.akka.messages.TriggerResourceManagerRegistration;
 import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import scala.concurrent.Future;
 import scala.reflect.ClassTag$;
 
-public class JobMasterAkkaGateway implements JobMasterGateway, AkkaGateway {
+public class JobMasterAkkaGateway extends RunnableAkkaGateway implements JobMasterGateway {
 	private final AskableActorRef actorRef;
 	private final Timeout timeout;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
new file mode 100644
index 0000000..3556738
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+public class RunnableMessage {
+	private final Runnable runnable;
+
+	public RunnableMessage(Runnable runnable) {
+		this.runnable = runnable;
+	}
+
+	public Runnable getRunnable() {
+		return runnable;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
index 7fc2ffa..9eef6ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.rpc.akka.resourcemanager;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaActor;
 import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
 import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
 import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
 
-public class ResourceManagerAkkaActor extends UntypedActor {
+public class ResourceManagerAkkaActor extends RunnableAkkaActor {
 	private final ResourceManager resourceManager;
 
 	public ResourceManagerAkkaActor(ResourceManager resourceManager) {
@@ -57,7 +57,7 @@ public class ResourceManagerAkkaActor extends UntypedActor {
 				sender.tell(new Status.Failure(e), getSelf());
 			}
 		} else {
-			throw new RuntimeException("Encountered unknown message type: " + message.getClass());
+			super.onReceive(message);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
index c902bff..a02a070 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
@@ -21,18 +21,18 @@ package org.apache.flink.runtime.rpc.akka.resourcemanager;
 import akka.actor.ActorRef;
 import akka.pattern.AskableActorRef;
 import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
 import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
 import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
 import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.akka.AkkaGateway;
 import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
 import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
 import scala.concurrent.Future;
 import scala.reflect.ClassTag$;
 
-public class ResourceManagerAkkaGateway implements ResourceManagerGateway, AkkaGateway {
+public class ResourceManagerAkkaGateway extends RunnableAkkaGateway implements ResourceManagerGateway {
 	private final AskableActorRef actorRef;
 	private final Timeout timeout;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
index 8974d2b..89dad34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.rpc.akka.taskexecutor;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import akka.actor.UntypedActor;
 import akka.dispatch.OnComplete;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaActor;
 import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
 import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
 import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
 
-public class TaskExecutorAkkaActor extends UntypedActor {
+public class TaskExecutorAkkaActor extends RunnableAkkaActor {
 	private final TaskExecutorGateway taskExecutor;
 
 	public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
@@ -71,7 +71,7 @@ public class TaskExecutorAkkaActor extends UntypedActor {
 				getContext().dispatcher()
 			);
 		} else {
-			throw new RuntimeException("Encountered unknown message type: " + message.getClass());
+			super.onReceive(message);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
index ca064c1..0452804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
@@ -24,14 +24,14 @@ import akka.util.Timeout;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.AkkaGateway;
+import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
 import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
 import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
 import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
 import scala.concurrent.Future;
 import scala.reflect.ClassTag$;
 
-public class TaskExecutorAkkaGateway implements TaskExecutorGateway, AkkaGateway {
+public class TaskExecutorAkkaGateway extends RunnableAkkaGateway implements TaskExecutorGateway {
 	private final AskableActorRef actorRef;
 	private final Timeout timeout;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
index c2e2686..e40c148 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -37,16 +37,14 @@ import scala.concurrent.Future;
 
 import java.util.concurrent.ExecutorService;
 
-public class JobMaster implements RpcServer<JobMasterGateway> {
+public class JobMaster extends RpcServer<JobMasterGateway> {
 	private final Logger LOG = LoggerFactory.getLogger(JobMaster.class);
-	private final RpcService rpcService;
 	private final ExecutionContext executionContext;
-	private JobMasterGateway self;
 
 	private ResourceManagerGateway resourceManager = null;
 
 	public JobMaster(RpcService rpcService, ExecutorService executorService) {
-		this.rpcService = rpcService;
+		super(rpcService);
 		executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
 	}
 
@@ -54,17 +52,6 @@ public class JobMaster implements RpcServer<JobMasterGateway> {
 		return resourceManager;
 	}
 
-	@Override
-	public void start() {
-		// start rpc server
-		self = rpcService.startServer(this, JobMasterGateway.class);
-	}
-
-	@Override
-	public void shutDown() {
-		rpcService.stopServer(getSelf());
-	}
-
 	@RpcMethod
 	public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
 		System.out.println("TaskExecutionState: " + taskExecutionState);
@@ -73,12 +60,19 @@ public class JobMaster implements RpcServer<JobMasterGateway> {
 
 	@RpcMethod
 	public void triggerResourceManagerRegistration(final String address) {
-		Future<ResourceManagerGateway> resourceManagerFuture = rpcService.connect(address, ResourceManagerGateway.class);
+		Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class);
 
 		Future<RegistrationResponse> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<RegistrationResponse>>() {
 			@Override
-			public Future<RegistrationResponse> apply(ResourceManagerGateway resourceManagerGateway) {
- 				return resourceManagerGateway.registerJobMaster(new JobMasterRegistration());
+			public Future<RegistrationResponse> apply(final ResourceManagerGateway resourceManagerGateway) {
+				runAsync(new Runnable() {
+					@Override
+					public void run() {
+						resourceManager = resourceManagerGateway;
+					}
+				});
+
+				return resourceManagerGateway.registerJobMaster(new JobMasterRegistration());
 			}
 		}, executionContext);
 
@@ -103,8 +97,4 @@ public class JobMaster implements RpcServer<JobMasterGateway> {
 	public boolean isConnected() {
 		return resourceManager != null;
 	}
-
-	public JobMasterGateway getSelf() {
-		return self;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 00aad0d..bdcd8cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -26,25 +26,14 @@ import scala.concurrent.ExecutionContext$;
 
 import java.util.concurrent.ExecutorService;
 
-public class ResourceManager implements RpcServer<ResourceManagerGateway> {
-	private final RpcService rpcService;
+public class ResourceManager extends RpcServer<ResourceManagerGateway> {
 	private final ExecutionContext executionContext;
 
-	private ResourceManagerGateway self;
-
 	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
-		this.rpcService = rpcService;
+		super(rpcService);
 		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
 	}
 
-	public void start() {
-		self = rpcService.startServer(this, ResourceManagerGateway.class);
-	}
-
-	public void shutDown() {
-		rpcService.stopServer(getSelf());
-	}
-
 	@RpcMethod
 	public RegistrationResponse registerJobMaster(JobMasterRegistration jobMasterRegistration) {
 		System.out.println("JobMasterRegistration: " + jobMasterRegistration);
@@ -56,8 +45,4 @@ public class ResourceManager implements RpcServer<ResourceManagerGateway> {
 		System.out.println("SlotRequest: " + slotRequest);
 		return new SlotAssignment();
 	}
-
-	public ResourceManagerGateway getSelf() {
-		return self;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index 5c5b8f4..ef0b180 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -31,26 +31,15 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
-public class TaskExecutor implements RpcServer<TaskExecutorGateway> {
-	private final RpcService rpcService;
+public class TaskExecutor extends RpcServer<TaskExecutorGateway> {
 	private final ExecutionContext executionContext;
 	private final Set<ExecutionAttemptID> tasks = new HashSet<>();
 
-	private TaskExecutorGateway self;
-
 	public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
-		this.rpcService = rpcService;
+		super(rpcService);
 		this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService);
 	}
 
-	public void start() {
-		self = rpcService.startServer(this, TaskExecutorGateway.class);
-	}
-
-	public void shutDown() {
-		rpcService.stopServer(getSelf());
-	}
-
 	@RpcMethod
 	public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
 		tasks.add(taskDeploymentDescriptor.getExecutionId());
@@ -65,8 +54,4 @@ public class TaskExecutor implements RpcServer<TaskExecutorGateway> {
 			throw new Exception("Could not find task.");
 		}
 	}
-
-	public TaskExecutorGateway getSelf() {
-		return self;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index e05bbac..27c8171 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -48,27 +48,25 @@ public class RpcCompletenessTest extends TestLogger {
 
 		for (Class<? extends RpcServer> rpcServer :classes){
 			c = rpcServer;
-			Type[] interfaces = c.getGenericInterfaces();
+			Type superClass = c.getGenericSuperclass();
 
 			boolean foundRpcServerInterface = false;
 
-			for (Type in: interfaces) {
-				if (in instanceof ParameterizedType) {
-					ParameterizedType parameterizedType = (ParameterizedType) in;
+			if (superClass instanceof ParameterizedType) {
+				ParameterizedType parameterizedType = (ParameterizedType) superClass;
 
-					if (parameterizedType.getRawType() == RpcServer.class) {
-						foundRpcServerInterface = true;
-						Type[] typeArguments = parameterizedType.getActualTypeArguments();
+				if (parameterizedType.getRawType() == RpcServer.class) {
+					foundRpcServerInterface = true;
+					Type[] typeArguments = parameterizedType.getActualTypeArguments();
 
-						assertEquals(1, typeArguments.length);
-						assertTrue(typeArguments[0] instanceof Class<?>);
+					assertEquals(1, typeArguments.length);
+					assertTrue(typeArguments[0] instanceof Class<?>);
 
-						Type rpcGatewayType = typeArguments[0];
+					Type rpcGatewayType = typeArguments[0];
 
-						assertTrue(rpcGatewayType instanceof Class);
+					assertTrue(rpcGatewayType instanceof Class);
 
-						checkCompleteness(rpcServer, (Class<?>) rpcGatewayType);
-					}
+					checkCompleteness(rpcServer, (Class<?>) rpcGatewayType);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
index 0a6630d..1d7c971 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.util;
 
-import com.sun.xml.internal.ws.util.CompletedFuture;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -194,4 +192,43 @@ public class DirectExecutorService implements ExecutorService {
 	public void execute(Runnable command) {
 		command.run();
 	}
+
+	public static class CompletedFuture<V> implements Future<V> {
+		private final V value;
+		private final Exception exception;
+
+		public CompletedFuture(V value, Exception exception) {
+			this.value = value;
+			this.exception = exception;
+		}
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			return false;
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return false;
+		}
+
+		@Override
+		public boolean isDone() {
+			return true;
+		}
+
+		@Override
+		public V get() throws InterruptedException, ExecutionException {
+			if (exception != null) {
+				throw new ExecutionException(exception);
+			} else {
+				return value;
+			}
+		}
+
+		@Override
+		public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+			return get();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fee1bef8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 479bb60..ada79eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,13 +187,6 @@ under the License.
 			<type>jar</type>
 			<scope>test</scope>
 		</dependency>
-
-		<dependency>
-			<groupId>org.reflections</groupId>
-			<artifactId>reflections</artifactId>
-			<version>0.9.10</version>
-			<scope>test</scope>
-		</dependency>
 	</dependencies>
 
 	<!-- this section defines the module versions that are used if nothing else is specified. -->
@@ -407,6 +400,13 @@ under the License.
 				<artifactId>jackson-annotations</artifactId>
 				<version>${jackson.version}</version>
 			</dependency>
+
+			<dependency>
+				<groupId>org.reflections</groupId>
+				<artifactId>reflections</artifactId>
+				<version>0.9.10</version>
+				<scope>test</scope>
+			</dependency>
 		</dependencies>
 	</dependencyManagement>