You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:28:43 UTC

[21/50] [abbrv] flink git commit: [FLINK-4346] [rpc] Add new RPC abstraction

[FLINK-4346] [rpc] Add new RPC abstraction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f36fb7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f36fb7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f36fb7f

Branch: refs/heads/flip-6
Commit: 0f36fb7f10c1b40005b897d85fec04e858c4909b
Parents: 0735b5b
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 3 19:31:34 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 8 17:26:56 2016 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |   5 +
 .../flink/runtime/rpc/MainThreadExecutor.java   |  54 +++
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 182 +++++++++++
 .../apache/flink/runtime/rpc/RpcGateway.java    |  25 ++
 .../org/apache/flink/runtime/rpc/RpcMethod.java |  35 ++
 .../apache/flink/runtime/rpc/RpcService.java    |  74 +++++
 .../apache/flink/runtime/rpc/RpcTimeout.java    |  34 ++
 .../flink/runtime/rpc/akka/AkkaGateway.java     |  29 ++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 145 ++++++++
 .../flink/runtime/rpc/akka/BaseAkkaActor.java   |  50 +++
 .../flink/runtime/rpc/akka/BaseAkkaGateway.java |  41 +++
 .../rpc/akka/jobmaster/JobMasterAkkaActor.java  |  58 ++++
 .../akka/jobmaster/JobMasterAkkaGateway.java    |  57 ++++
 .../rpc/akka/messages/CallableMessage.java      |  33 ++
 .../runtime/rpc/akka/messages/CancelTask.java   |  36 ++
 .../runtime/rpc/akka/messages/ExecuteTask.java  |  36 ++
 .../messages/RegisterAtResourceManager.java     |  36 ++
 .../rpc/akka/messages/RegisterJobMaster.java    |  36 ++
 .../runtime/rpc/akka/messages/RequestSlot.java  |  37 +++
 .../rpc/akka/messages/RunnableMessage.java      |  31 ++
 .../akka/messages/UpdateTaskExecutionState.java |  37 +++
 .../ResourceManagerAkkaActor.java               |  65 ++++
 .../ResourceManagerAkkaGateway.java             |  67 ++++
 .../taskexecutor/TaskExecutorAkkaActor.java     |  77 +++++
 .../taskexecutor/TaskExecutorAkkaGateway.java   |  59 ++++
 .../flink/runtime/rpc/jobmaster/JobMaster.java  | 249 ++++++++++++++
 .../runtime/rpc/jobmaster/JobMasterGateway.java |  45 +++
 .../resourcemanager/JobMasterRegistration.java  |  35 ++
 .../resourcemanager/RegistrationResponse.java   |  43 +++
 .../rpc/resourcemanager/ResourceManager.java    |  94 ++++++
 .../resourcemanager/ResourceManagerGateway.java |  58 ++++
 .../rpc/resourcemanager/SlotAssignment.java     |  25 ++
 .../rpc/resourcemanager/SlotRequest.java        |  25 ++
 .../runtime/rpc/taskexecutor/TaskExecutor.java  |  82 +++++
 .../rpc/taskexecutor/TaskExecutorGateway.java   |  48 +++
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 327 +++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  81 +++++
 .../rpc/taskexecutor/TaskExecutorTest.java      |  92 ++++++
 .../runtime/util/DirectExecutorService.java     | 234 +++++++++++++
 flink-tests/pom.xml                             |   1 -
 pom.xml                                         |   7 +
 41 files changed, 2784 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5fea8fb..09c6fd0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -189,6 +189,11 @@ under the License.
 			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
 		</dependency>
 
+		<dependency>
+			<groupId>org.reflections</groupId>
+			<artifactId>reflections</artifactId>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
new file mode 100644
index 0000000..e06711e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.util.Timeout;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
+ * rpc server.
+ *
+ * This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
+ * implementation which allows to dispatch local procedures to the main thread of the underlying
+ * rpc server.
+ */
+public interface MainThreadExecutor {
+	/**
+	 * Execute the runnable in the main thread of the underlying rpc server.
+	 *
+	 * @param runnable Runnable to be executed
+	 */
+	void runAsync(Runnable runnable);
+
+	/**
+	 * Execute the callable in the main thread of the underlying rpc server and return a future for
+	 * the callable result. If the future is not completed within the given timeout, the returned
+	 * future will throw a {@link TimeoutException}.
+	 *
+	 * @param callable Callable to be executed
+	 * @param timeout Timeout for the future to complete
+	 * @param <V> Return value of the callable
+	 * @return Future of the callable result
+	 */
+	<V> Future<V> callAsync(Callable<V> callable, Timeout timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
new file mode 100644
index 0000000..3d8757f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.util.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Base class for rpc endpoints. Distributed components which offer remote procedure calls have to
+ * extend the rpc endpoint base class.
+ *
+ * The main idea is that a rpc endpoint is backed by a rpc server which has a single thread
+ * processing the rpc calls. Thus, by executing all state changing operations within the main
+ * thread, we don't have to reason about concurrent accesses. The rpc provides provides
+ * {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the
+ * {@link #getMainThreadExecutionContext()} to execute code in the rpc server's main thread.
+ *
+ * @param <C> Rpc gateway counterpart for the implementing rpc endpoint
+ */
+public abstract class RpcEndpoint<C extends RpcGateway> {
+
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
+	/** Rpc service to be used to start the rpc server and to obtain rpc gateways */
+	private final RpcService rpcService;
+
+	/** Self gateway which can be used to schedule asynchronous calls on yourself */
+	private C self;
+
+	/**
+	 * The main thread execution context to be used to execute future callbacks in the main thread
+	 * of the executing rpc server.
+	 *
+	 * IMPORTANT: The main thread context is only available after the rpc server has been started.
+	 */
+	private MainThreadExecutionContext mainThreadExecutionContext;
+
+	public RpcEndpoint(RpcService rpcService) {
+		this.rpcService = rpcService;
+	}
+
+	/**
+	 * Get self-gateway which should be used to run asynchronous rpc calls on this endpoint.
+	 *
+	 * IMPORTANT: Always issue local method calls via the self-gateway if the current thread
+	 * is not the main thread of the underlying rpc server, e.g. from within a future callback.
+	 *
+	 * @return Self gateway
+	 */
+	public C getSelf() {
+		return self;
+	}
+
+	/**
+	 * Execute the runnable in the main thread of the underlying rpc server.
+	 *
+	 * @param runnable Runnable to be executed in the main thread of the underlying rpc server
+	 */
+	public void runAsync(Runnable runnable) {
+		((MainThreadExecutor) self).runAsync(runnable);
+	}
+
+	/**
+	 * Execute the callable in the main thread of the underlying rpc server returning a future for
+	 * the result of the callable. If the callable is not completed within the given timeout, then
+	 * the future will be failed with a {@link java.util.concurrent.TimeoutException}.
+	 *
+	 * @param callable Callable to be executed in the main thread of the underlying rpc server
+	 * @param timeout Timeout for the callable to be completed
+	 * @param <V> Return type of the callable
+	 * @return Future for the result of the callable.
+	 */
+	public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+		return ((MainThreadExecutor) self).callAsync(callable, timeout);
+	}
+
+	/**
+	 * Gets the main thread execution context. The main thread execution context can be used to
+	 * execute tasks in the main thread of the underlying rpc server.
+	 *
+	 * @return Main thread execution context
+	 */
+	public ExecutionContext getMainThreadExecutionContext() {
+		return mainThreadExecutionContext;
+	}
+
+	/**
+	 * Gets the used rpc service.
+	 *
+	 * @return Rpc service
+	 */
+	public RpcService getRpcService() {
+		return rpcService;
+	}
+
+	/**
+	 * Starts the underlying rpc server via the rpc service and creates the main thread execution
+	 * context. This makes the rpc endpoint effectively reachable from the outside.
+	 *
+	 * Can be overriden to add rpc endpoint specific start up code. Should always call the parent
+	 * start method.
+	 */
+	public void start() {
+		self = rpcService.startServer(this);
+		mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
+	}
+
+
+	/**
+	 * Shuts down the underlying rpc server via the rpc service.
+	 *
+	 * Can be overriden to add rpc endpoint specific shut down code. Should always call the parent
+	 * shut down method.
+	 */
+	public void shutDown() {
+		rpcService.stopServer(self);
+	}
+
+	/**
+	 * Gets the address of the underlying rpc server. The address should be fully qualified so that
+	 * a remote system can connect to this rpc server via this address.
+	 *
+	 * @return Fully qualified address of the underlying rpc server
+	 */
+	public String getAddress() {
+		return rpcService.getAddress(self);
+	}
+
+	/**
+	 * Execution context which executes runnables in the main thread context. A reported failure
+	 * will cause the underlying rpc server to shut down.
+	 */
+	private class MainThreadExecutionContext implements ExecutionContext {
+		private final MainThreadExecutor gateway;
+
+		MainThreadExecutionContext(MainThreadExecutor gateway) {
+			this.gateway = gateway;
+		}
+
+		@Override
+		public void execute(Runnable runnable) {
+			gateway.runAsync(runnable);
+		}
+
+		@Override
+		public void reportFailure(final Throwable t) {
+			gateway.runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.error("Encountered failure in the main thread execution context.", t);
+					shutDown();
+				}
+			});
+		}
+
+		@Override
+		public ExecutionContext prepare() {
+			return this;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
new file mode 100644
index 0000000..e3a16b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+/**
+ * Rpc gateway interface which has to be implemented by Rpc gateways.
+ */
+public interface RpcGateway {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
new file mode 100644
index 0000000..875e557
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for rpc method in a {@link RpcEndpoint} implementation. Every rpc method must have a
+ * respective counterpart in the {@link RpcGateway} implementation for this rpc server. The
+ * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set of
+ * gateway methods in the corresponding gateway implementation are identical.
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcMethod {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
new file mode 100644
index 0000000..90ff7b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import scala.concurrent.Future;
+
+/**
+ * Interface for rpc services. An rpc service is used to start and connect to a {@link RpcEndpoint}.
+ * Connecting to a rpc server will return a {@link RpcGateway} which can be used to call remote
+ * procedures.
+ */
+public interface RpcService {
+
+	/**
+	 * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
+	 * be used to communicate with the rpc server.
+	 *
+	 * @param address Address of the remote rpc server
+	 * @param clazz Class of the rpc gateway to return
+	 * @param <C> Type of the rpc gateway to return
+	 * @return Future containing the rpc gateway
+	 */
+	<C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
+
+	/**
+	 * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
+	 *
+	 * @param rpcEndpoint Rpc protocl to dispath the rpcs to
+	 * @param <S> Type of the rpc endpoint
+	 * @param <C> Type of the self rpc gateway associated with the rpc server
+	 * @return Self gateway to dispatch remote procedure calls to oneself
+	 */
+	<S extends RpcEndpoint, C extends RpcGateway> C startServer(S rpcEndpoint);
+
+	/**
+	 * Stop the underlying rpc server of the provided self gateway.
+	 *
+	 * @param selfGateway Self gateway describing the underlying rpc server
+	 * @param <C> Type of the rpc gateway
+	 */
+	<C extends RpcGateway> void stopServer(C selfGateway);
+
+	/**
+	 * Stop the rpc service shutting down all started rpc servers.
+	 */
+	void stopService();
+
+	/**
+	 * Get the fully qualified address of the underlying rpc server represented by the self gateway.
+	 * It must be possible to connect from a remote host to the rpc server via the returned fully
+	 * qualified address.
+	 *
+	 * @param selfGateway Self gateway associated with the underlying rpc server
+	 * @param <C> Type of the rpc gateway
+	 * @return Fully qualified address
+	 */
+	<C extends RpcGateway> String getAddress(C selfGateway);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
new file mode 100644
index 0000000..3d36d47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for {@link RpcGateway} methods to specify an additional timeout parameter for the
+ * returned future to be completed. The rest of the provided parameters is passed to the remote rpc
+ * server for the rpc.
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcTimeout {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
new file mode 100644
index 0000000..a96a600
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for Akka based rpc gateways
+ */
+public interface AkkaGateway {
+
+	ActorRef getActorRef();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
new file mode 100644
index 0000000..d55bd13
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorIdentity;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Identify;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Mapper;
+import akka.pattern.AskableActorSelection;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor;
+import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor;
+import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor;
+import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
+import scala.concurrent.Future;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class AkkaRpcService implements RpcService {
+	private final ActorSystem actorSystem;
+	private final Timeout timeout;
+	private final Set<ActorRef> actors = new HashSet<>();
+
+	public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) {
+		this.actorSystem = actorSystem;
+		this.timeout = timeout;
+	}
+
+	@Override
+	public <C extends RpcGateway> Future<C> connect(String address, final Class<C> clazz) {
+		ActorSelection actorSel = actorSystem.actorSelection(address);
+
+		AskableActorSelection asker = new AskableActorSelection(actorSel);
+
+		Future<Object> identify = asker.ask(new Identify(42), timeout);
+
+		return identify.map(new Mapper<Object, C>(){
+			public C apply(Object obj) {
+				ActorRef actorRef = ((ActorIdentity) obj).getRef();
+
+				if (clazz == TaskExecutorGateway.class) {
+					return (C) new TaskExecutorAkkaGateway(actorRef, timeout);
+				} else if (clazz == ResourceManagerGateway.class) {
+					return (C) new ResourceManagerAkkaGateway(actorRef, timeout);
+				} else if (clazz == JobMasterGateway.class) {
+					return (C) new JobMasterAkkaGateway(actorRef, timeout);
+				} else {
+					throw new RuntimeException("Could not find remote endpoint " + clazz);
+				}
+			}
+		}, actorSystem.dispatcher());
+	}
+
+	@Override
+	public <S extends RpcEndpoint, C extends RpcGateway> C startServer(S rpcEndpoint) {
+		ActorRef ref;
+		C self;
+		if (rpcEndpoint instanceof TaskExecutor) {
+			ref = actorSystem.actorOf(
+				Props.create(TaskExecutorAkkaActor.class, rpcEndpoint)
+			);
+
+			self = (C) new TaskExecutorAkkaGateway(ref, timeout);
+		} else if (rpcEndpoint instanceof ResourceManager) {
+			ref = actorSystem.actorOf(
+				Props.create(ResourceManagerAkkaActor.class, rpcEndpoint)
+			);
+
+			self = (C) new ResourceManagerAkkaGateway(ref, timeout);
+		} else if (rpcEndpoint instanceof JobMaster) {
+			ref = actorSystem.actorOf(
+				Props.create(JobMasterAkkaActor.class, rpcEndpoint)
+			);
+
+			self = (C) new JobMasterAkkaGateway(ref, timeout);
+		} else {
+			throw new RuntimeException("Could not start RPC server for class " + rpcEndpoint.getClass());
+		}
+
+		actors.add(ref);
+
+		return self;
+	}
+
+	@Override
+	public <C extends RpcGateway> void stopServer(C selfGateway) {
+		if (selfGateway instanceof AkkaGateway) {
+			AkkaGateway akkaClient = (AkkaGateway) selfGateway;
+
+			if (actors.contains(akkaClient.getActorRef())) {
+				akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			} else {
+				// don't stop this actor since it was not started by this RPC service
+			}
+		}
+	}
+
+	@Override
+	public void stopService() {
+		actorSystem.shutdown();
+		actorSystem.awaitTermination();
+	}
+
+	@Override
+	public <C extends RpcGateway> String getAddress(C selfGateway) {
+		if (selfGateway instanceof AkkaGateway) {
+			return AkkaUtils.getAkkaURL(actorSystem, ((AkkaGateway) selfGateway).getActorRef());
+		} else {
+			throw new RuntimeException("Cannot get address for non " + AkkaGateway.class.getName() + ".");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
new file mode 100644
index 0000000..3cb499c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseAkkaActor extends UntypedActor {
+	private static final Logger LOG = LoggerFactory.getLogger(BaseAkkaActor.class);
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		if (message instanceof RunnableMessage) {
+			try {
+				((RunnableMessage) message).getRunnable().run();
+			} catch (Exception e) {
+				LOG.error("Encountered error while executing runnable.", e);
+			}
+		} else if (message instanceof CallableMessage<?>) {
+			try {
+				Object result = ((CallableMessage<?>) message).getCallable().call();
+				sender().tell(new Status.Success(result), getSelf());
+			} catch (Exception e) {
+				sender().tell(new Status.Failure(e), getSelf());
+			}
+		} else {
+			throw new RuntimeException("Unknown message " + message);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
new file mode 100644
index 0000000..512790d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+
+public abstract class BaseAkkaGateway implements MainThreadExecutor, AkkaGateway {
+	@Override
+	public void runAsync(Runnable runnable) {
+		getActorRef().tell(new RunnableMessage(runnable), ActorRef.noSender());
+	}
+
+	@Override
+	public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+		return (Future<V>) Patterns.ask(getActorRef(), new CallableMessage(callable), timeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
new file mode 100644
index 0000000..9e04ea9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.jobmaster;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
+
+public class JobMasterAkkaActor extends BaseAkkaActor {
+	private final JobMaster jobMaster;
+
+	public JobMasterAkkaActor(JobMaster jobMaster) {
+		this.jobMaster = jobMaster;
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		if (message instanceof UpdateTaskExecutionState) {
+
+			final ActorRef sender = getSender();
+
+			UpdateTaskExecutionState updateTaskExecutionState = (UpdateTaskExecutionState) message;
+
+			try {
+				Acknowledge result = jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState());
+				sender.tell(new Status.Success(result), getSelf());
+			} catch (Exception e) {
+				sender.tell(new Status.Failure(e), getSelf());
+			}
+		} else if (message instanceof RegisterAtResourceManager) {
+			RegisterAtResourceManager registerAtResourceManager = (RegisterAtResourceManager) message;
+
+			jobMaster.registerAtResourceManager(registerAtResourceManager.getAddress());
+		} else {
+			super.onReceive(message);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
new file mode 100644
index 0000000..e6bf061
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.jobmaster;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class JobMasterAkkaGateway extends BaseAkkaGateway implements JobMasterGateway {
+	private final AskableActorRef actorRef;
+	private final Timeout timeout;
+
+	public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) {
+		this.actorRef = new AskableActorRef(actorRef);
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		return actorRef.ask(new UpdateTaskExecutionState(taskExecutionState), timeout)
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+	}
+
+	@Override
+	public void registerAtResourceManager(String address) {
+		actorRef.actorRef().tell(new RegisterAtResourceManager(address), actorRef.actorRef());
+	}
+
+	@Override
+	public ActorRef getActorRef() {
+		return actorRef.actorRef();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
new file mode 100644
index 0000000..f0e555f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import java.util.concurrent.Callable;
+
+public class CallableMessage<V> {
+	private final Callable<V> callable;
+
+	public CallableMessage(Callable<V> callable) {
+		this.callable = callable;
+	}
+
+	public Callable<V> getCallable() {
+		return callable;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
new file mode 100644
index 0000000..0b9e9dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.io.Serializable;
+
+public class CancelTask implements Serializable {
+	private static final long serialVersionUID = -2998176874447950595L;
+	private final ExecutionAttemptID executionAttemptID;
+
+	public CancelTask(ExecutionAttemptID executionAttemptID) {
+		this.executionAttemptID = executionAttemptID;
+	}
+
+	public ExecutionAttemptID getExecutionAttemptID() {
+		return executionAttemptID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
new file mode 100644
index 0000000..a83d539
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+
+import java.io.Serializable;
+
+public class ExecuteTask implements Serializable {
+	private static final long serialVersionUID = -6769958430967048348L;
+	private final TaskDeploymentDescriptor taskDeploymentDescriptor;
+
+	public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+		this.taskDeploymentDescriptor = taskDeploymentDescriptor;
+	}
+
+	public TaskDeploymentDescriptor getTaskDeploymentDescriptor() {
+		return taskDeploymentDescriptor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
new file mode 100644
index 0000000..3ade082
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import java.io.Serializable;
+
+public class RegisterAtResourceManager implements Serializable {
+
+	private static final long serialVersionUID = -4175905742620903602L;
+
+	private final String address;
+
+	public RegisterAtResourceManager(String address) {
+		this.address = address;
+	}
+
+	public String getAddress() {
+		return address;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
new file mode 100644
index 0000000..b35ea38
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+
+import java.io.Serializable;
+
+public class RegisterJobMaster implements Serializable{
+	private static final long serialVersionUID = -4616879574192641507L;
+	private final JobMasterRegistration jobMasterRegistration;
+
+	public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) {
+		this.jobMasterRegistration = jobMasterRegistration;
+	}
+
+	public JobMasterRegistration getJobMasterRegistration() {
+		return jobMasterRegistration;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
new file mode 100644
index 0000000..85ceeec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+
+import java.io.Serializable;
+
+public class RequestSlot implements Serializable {
+	private static final long serialVersionUID = 7207463889348525866L;
+
+	private final SlotRequest slotRequest;
+
+	public RequestSlot(SlotRequest slotRequest) {
+		this.slotRequest = slotRequest;
+	}
+
+	public SlotRequest getSlotRequest() {
+		return slotRequest;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
new file mode 100644
index 0000000..3556738
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+public class RunnableMessage {
+	private final Runnable runnable;
+
+	public RunnableMessage(Runnable runnable) {
+		this.runnable = runnable;
+	}
+
+	public Runnable getRunnable() {
+		return runnable;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
new file mode 100644
index 0000000..f89cd2f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import java.io.Serializable;
+
+public class UpdateTaskExecutionState implements Serializable{
+	private static final long serialVersionUID = -6662229114427331436L;
+
+	private final TaskExecutionState taskExecutionState;
+
+	public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		this.taskExecutionState = taskExecutionState;
+	}
+
+	public TaskExecutionState getTaskExecutionState() {
+		return taskExecutionState;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
new file mode 100644
index 0000000..13101f9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.resourcemanager;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.pattern.Patterns;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
+import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
+import scala.concurrent.Future;
+
+public class ResourceManagerAkkaActor extends BaseAkkaActor {
+	private final ResourceManager resourceManager;
+
+	public ResourceManagerAkkaActor(ResourceManager resourceManager) {
+		this.resourceManager = resourceManager;
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		final ActorRef sender = getSender();
+
+		if (message instanceof RegisterJobMaster) {
+			RegisterJobMaster registerJobMaster = (RegisterJobMaster) message;
+
+			try {
+				Future<RegistrationResponse> response = resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration());
+				Patterns.pipe(response, getContext().dispatcher()).to(sender());
+			} catch (Exception e) {
+				sender.tell(new Status.Failure(e), getSelf());
+			}
+		} else if (message instanceof RequestSlot) {
+			RequestSlot requestSlot = (RequestSlot) message;
+
+			try {
+				SlotAssignment response = resourceManager.requestSlot(requestSlot.getSlotRequest());
+				sender.tell(new Status.Success(response), getSelf());
+			} catch (Exception e) {
+				sender.tell(new Status.Failure(e), getSelf());
+			}
+		} else {
+			super.onReceive(message);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
new file mode 100644
index 0000000..1304707
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.resourcemanager;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
+import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+public class ResourceManagerAkkaGateway extends BaseAkkaGateway implements ResourceManagerGateway {
+	private final AskableActorRef actorRef;
+	private final Timeout timeout;
+
+	public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) {
+		this.actorRef = new AskableActorRef(actorRef);
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration timeout) {
+		return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), new Timeout(timeout))
+			.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
+	}
+
+	@Override
+	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+		return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), timeout)
+			.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
+	}
+
+	@Override
+	public Future<SlotAssignment> requestSlot(SlotRequest slotRequest) {
+		return actorRef.ask(new RequestSlot(slotRequest), timeout)
+			.mapTo(ClassTag$.MODULE$.<SlotAssignment>apply(SlotAssignment.class));
+	}
+
+	@Override
+	public ActorRef getActorRef() {
+		return actorRef.actorRef();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
new file mode 100644
index 0000000..ed522cc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.taskexecutor;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
+import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+
+public class TaskExecutorAkkaActor extends BaseAkkaActor {
+	private final TaskExecutorGateway taskExecutor;
+
+	public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
+		this.taskExecutor = taskExecutor;
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		final ActorRef sender = getSender();
+
+		if (message instanceof ExecuteTask) {
+			ExecuteTask executeTask = (ExecuteTask) message;
+
+			taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete(
+				new OnComplete<Acknowledge>() {
+					@Override
+					public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
+						if (failure != null) {
+							sender.tell(new Status.Failure(failure), getSelf());
+						} else {
+							sender.tell(new Status.Success(Acknowledge.get()), getSelf());
+						}
+					}
+				},
+				getContext().dispatcher()
+			);
+		} else if (message instanceof CancelTask) {
+			CancelTask cancelTask = (CancelTask) message;
+
+			taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete(
+				new OnComplete<Acknowledge>() {
+					@Override
+					public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
+						if (failure != null) {
+							sender.tell(new Status.Failure(failure), getSelf());
+						} else {
+							sender.tell(new Status.Success(Acknowledge.get()), getSelf());
+						}
+					}
+				},
+				getContext().dispatcher()
+			);
+		} else {
+			super.onReceive(message);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
new file mode 100644
index 0000000..7f0a522
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.taskexecutor;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
+import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class TaskExecutorAkkaGateway extends BaseAkkaGateway implements TaskExecutorGateway {
+	private final AskableActorRef actorRef;
+	private final Timeout timeout;
+
+	public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) {
+		this.actorRef = new AskableActorRef(actorRef);
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+		return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), timeout)
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+	}
+
+	@Override
+	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId) {
+		return actorRef.ask(new CancelTask(executionAttemptId), timeout)
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+	}
+
+	@Override
+	public ActorRef getActorRef() {
+		return actorRef.actorRef();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
new file mode 100644
index 0000000..b81b19c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.jobmaster;
+
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * JobMaster implementation. The job master is responsible for the execution of a single
+ * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+ *
+ * It offers the following methods as part of its rpc interface to interact with the JobMaster
+ * remotely:
+ * <ul>
+ *     <li>{@link #registerAtResourceManager(String)} triggers the registration at the resource manager</li>
+ *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
+ * given task</li>
+ * </ul>
+ */
+public class JobMaster extends RpcEndpoint<JobMasterGateway> {
+	/** Execution context for future callbacks */
+	private final ExecutionContext executionContext;
+
+	/** Execution context for scheduled runnables */
+	private final ScheduledExecutorService scheduledExecutorService;
+
+	private final FiniteDuration initialRegistrationTimeout = new FiniteDuration(500, TimeUnit.MILLISECONDS);
+	private final FiniteDuration maxRegistrationTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
+	private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS);
+	private final long failedRegistrationDelay = 10000;
+
+	/** Gateway to connected resource manager, null iff not connected */
+	private ResourceManagerGateway resourceManager = null;
+
+	/** UUID to filter out old registration runs */
+	private UUID currentRegistrationRun;
+
+	public JobMaster(RpcService rpcService, ExecutorService executorService) {
+		super(rpcService);
+		executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
+		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+	}
+
+	public ResourceManagerGateway getResourceManager() {
+		return resourceManager;
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// RPC methods
+	//----------------------------------------------------------------------------------------------
+
+	/**
+	 * Updates the task execution state for a given task.
+	 *
+	 * @param taskExecutionState New task execution state for a given task
+	 * @return Acknowledge the task execution state update
+	 */
+	@RpcMethod
+	public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		System.out.println("TaskExecutionState: " + taskExecutionState);
+		return Acknowledge.get();
+	}
+
+	/**
+	 * Triggers the registration of the job master at the resource manager.
+	 *
+	 * @param address Address of the resource manager
+	 */
+	@RpcMethod
+	public void registerAtResourceManager(final String address) {
+		currentRegistrationRun = UUID.randomUUID();
+
+		Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class);
+
+		handleResourceManagerRegistration(
+			new JobMasterRegistration(getAddress()),
+			1,
+			resourceManagerFuture,
+			currentRegistrationRun,
+			initialRegistrationTimeout,
+			maxRegistrationTimeout,
+			registrationDuration.fromNow());
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// Helper methods
+	//----------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper method to handle the resource manager registration process. If a registration attempt
+	 * times out, then a new attempt with the doubled time out is initiated. The whole registration
+	 * process has a deadline. Once this deadline is overdue without successful registration, the
+	 * job master shuts down.
+	 *
+	 * @param jobMasterRegistration Job master registration info which is sent to the resource
+	 *                              manager
+	 * @param attemptNumber Registration attempt number
+	 * @param resourceManagerFuture Future of the resource manager gateway
+	 * @param registrationRun UUID describing the current registration run
+	 * @param timeout Timeout of the last registration attempt
+	 * @param maxTimeout Maximum timeout between registration attempts
+	 * @param deadline Deadline for the registration
+	 */
+	void handleResourceManagerRegistration(
+		final JobMasterRegistration jobMasterRegistration,
+		final int attemptNumber,
+		final Future<ResourceManagerGateway> resourceManagerFuture,
+		final UUID registrationRun,
+		final FiniteDuration timeout,
+		final FiniteDuration maxTimeout,
+		final Deadline deadline) {
+
+		// filter out concurrent registration runs
+		if (registrationRun.equals(currentRegistrationRun)) {
+
+			log.info("Start registration attempt #{}.", attemptNumber);
+
+			if (deadline.isOverdue()) {
+				// we've exceeded our registration deadline. This means that we have to shutdown the JobMaster
+				log.error("Exceeded registration deadline without successfully registering at the ResourceManager.");
+				shutDown();
+			} else {
+				Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() {
+					@Override
+					public Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> apply(ResourceManagerGateway resourceManagerGateway) {
+						return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout).zip(Futures.successful(resourceManagerGateway));
+					}
+				}, executionContext);
+
+				registrationResponseFuture.onComplete(new OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
+					@Override
+					public void onComplete(Throwable failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws Throwable {
+						if (failure != null) {
+							if (failure instanceof TimeoutException) {
+								// we haven't received an answer in the given timeout interval,
+								// so increase it and try again.
+								final FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout);
+
+								handleResourceManagerRegistration(
+									jobMasterRegistration,
+									attemptNumber + 1,
+									resourceManagerFuture,
+									registrationRun,
+									newTimeout,
+									maxTimeout,
+									deadline);
+							} else {
+								log.error("Received unknown error while registering at the ResourceManager.", failure);
+								shutDown();
+							}
+						} else {
+							final RegistrationResponse response = tuple._1();
+							final ResourceManagerGateway gateway = tuple._2();
+
+							if (response.isSuccess()) {
+								finishResourceManagerRegistration(gateway, response.getInstanceID());
+							} else {
+								log.info("The registration was refused. Try again.");
+
+								scheduledExecutorService.schedule(new Runnable() {
+									@Override
+									public void run() {
+										// we have to execute scheduled runnable in the main thread
+										// because we need consistency wrt currentRegistrationRun
+										runAsync(new Runnable() {
+											@Override
+											public void run() {
+												// our registration attempt was refused. Start over.
+												handleResourceManagerRegistration(
+													jobMasterRegistration,
+													1,
+													resourceManagerFuture,
+													registrationRun,
+													initialRegistrationTimeout,
+													maxTimeout,
+													deadline);
+											}
+										});
+									}
+								}, failedRegistrationDelay, TimeUnit.MILLISECONDS);
+							}
+						}
+					}
+				}, getMainThreadExecutionContext()); // use the main thread execution context to execute the call back in the main thread
+			}
+		} else {
+			log.info("Discard out-dated registration run.");
+		}
+	}
+
+	/**
+	 * Finish the resource manager registration by setting the new resource manager gateway.
+	 *
+	 * @param resourceManager New resource manager gateway
+	 * @param instanceID Instance id assigned by the resource manager
+	 */
+	void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, InstanceID instanceID) {
+		log.info("Successfully registered at the ResourceManager under instance id {}.", instanceID);
+		this.resourceManager = resourceManager;
+	}
+
+	/**
+	 * Return if the job master is connected to a resource manager.
+	 *
+	 * @return true if the job master is connected to the resource manager
+	 */
+	public boolean isConnected() {
+		return resourceManager != null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
new file mode 100644
index 0000000..17a4c3a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.jobmaster;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+
+/**
+ * {@link JobMaster} rpc gateway interface
+ */
+public interface JobMasterGateway extends RpcGateway {
+
+	/**
+	 * Updates the task execution state for a given task.
+	 *
+	 * @param taskExecutionState New task execution state for a given task
+	 * @return Future acknowledge of the task execution state update
+	 */
+	Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+
+	/**
+	 * Triggers the registration of the job master at the resource manager.
+	 *
+	 * @param address Address of the resource manager
+	 */
+	void registerAtResourceManager(final String address);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
new file mode 100644
index 0000000..7a2deae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class JobMasterRegistration implements Serializable {
+	private static final long serialVersionUID = 8411214999193765202L;
+
+	private final String address;
+
+	public JobMasterRegistration(String address) {
+		this.address = address;
+	}
+
+	public String getAddress() {
+		return address;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
new file mode 100644
index 0000000..8ac9e49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.io.Serializable;
+
+public class RegistrationResponse implements Serializable {
+	private static final long serialVersionUID = -2379003255993119993L;
+
+	private final boolean isSuccess;
+	private final InstanceID instanceID;
+
+	public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
+		this.isSuccess = isSuccess;
+		this.instanceID = instanceID;
+	}
+
+	public boolean isSuccess() {
+		return isSuccess;
+	}
+
+	public InstanceID getInstanceID() {
+		return instanceID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
new file mode 100644
index 0000000..c7e8def
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import akka.dispatch.Mapper;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * ResourceManager implementation. The resource manager is responsible for resource de-/allocation
+ * and bookkeeping.
+ *
+ * It offers the following methods as part of its rpc interface to interact with the him remotely:
+ * <ul>
+ *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
+ * </ul>
+ */
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+	private final ExecutionContext executionContext;
+	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+
+	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
+		super(rpcService);
+		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
+		this.jobMasterGateways = new HashMap<>();
+	}
+
+	/**
+	 * Register a {@link JobMaster} at the resource manager.
+	 *
+	 * @param jobMasterRegistration Job master registration information
+	 * @return Future registration response
+	 */
+	@RpcMethod
+	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+
+		return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
+			@Override
+			public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
+				InstanceID instanceID;
+
+				if (jobMasterGateways.containsKey(jobMasterGateway)) {
+					instanceID = jobMasterGateways.get(jobMasterGateway);
+				} else {
+					instanceID = new InstanceID();
+					jobMasterGateways.put(jobMasterGateway, instanceID);
+				}
+
+				return new RegistrationResponse(true, instanceID);
+			}
+		}, getMainThreadExecutionContext());
+	}
+
+	/**
+	 * Requests a slot from the resource manager.
+	 *
+	 * @param slotRequest Slot request
+	 * @return Slot assignment
+	 */
+	@RpcMethod
+	public SlotAssignment requestSlot(SlotRequest slotRequest) {
+		System.out.println("SlotRequest: " + slotRequest);
+		return new SlotAssignment();
+	}
+}