You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:28:43 UTC
[21/50] [abbrv] flink git commit: [FLINK-4346] [rpc] Add new RPC
abstraction
[FLINK-4346] [rpc] Add new RPC abstraction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f36fb7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f36fb7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f36fb7f
Branch: refs/heads/flip-6
Commit: 0f36fb7f10c1b40005b897d85fec04e858c4909b
Parents: 0735b5b
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 3 19:31:34 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 8 17:26:56 2016 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 5 +
.../flink/runtime/rpc/MainThreadExecutor.java | 54 +++
.../apache/flink/runtime/rpc/RpcEndpoint.java | 182 +++++++++++
.../apache/flink/runtime/rpc/RpcGateway.java | 25 ++
.../org/apache/flink/runtime/rpc/RpcMethod.java | 35 ++
.../apache/flink/runtime/rpc/RpcService.java | 74 +++++
.../apache/flink/runtime/rpc/RpcTimeout.java | 34 ++
.../flink/runtime/rpc/akka/AkkaGateway.java | 29 ++
.../flink/runtime/rpc/akka/AkkaRpcService.java | 145 ++++++++
.../flink/runtime/rpc/akka/BaseAkkaActor.java | 50 +++
.../flink/runtime/rpc/akka/BaseAkkaGateway.java | 41 +++
.../rpc/akka/jobmaster/JobMasterAkkaActor.java | 58 ++++
.../akka/jobmaster/JobMasterAkkaGateway.java | 57 ++++
.../rpc/akka/messages/CallableMessage.java | 33 ++
.../runtime/rpc/akka/messages/CancelTask.java | 36 ++
.../runtime/rpc/akka/messages/ExecuteTask.java | 36 ++
.../messages/RegisterAtResourceManager.java | 36 ++
.../rpc/akka/messages/RegisterJobMaster.java | 36 ++
.../runtime/rpc/akka/messages/RequestSlot.java | 37 +++
.../rpc/akka/messages/RunnableMessage.java | 31 ++
.../akka/messages/UpdateTaskExecutionState.java | 37 +++
.../ResourceManagerAkkaActor.java | 65 ++++
.../ResourceManagerAkkaGateway.java | 67 ++++
.../taskexecutor/TaskExecutorAkkaActor.java | 77 +++++
.../taskexecutor/TaskExecutorAkkaGateway.java | 59 ++++
.../flink/runtime/rpc/jobmaster/JobMaster.java | 249 ++++++++++++++
.../runtime/rpc/jobmaster/JobMasterGateway.java | 45 +++
.../resourcemanager/JobMasterRegistration.java | 35 ++
.../resourcemanager/RegistrationResponse.java | 43 +++
.../rpc/resourcemanager/ResourceManager.java | 94 ++++++
.../resourcemanager/ResourceManagerGateway.java | 58 ++++
.../rpc/resourcemanager/SlotAssignment.java | 25 ++
.../rpc/resourcemanager/SlotRequest.java | 25 ++
.../runtime/rpc/taskexecutor/TaskExecutor.java | 82 +++++
.../rpc/taskexecutor/TaskExecutorGateway.java | 48 +++
.../flink/runtime/rpc/RpcCompletenessTest.java | 327 +++++++++++++++++++
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 81 +++++
.../rpc/taskexecutor/TaskExecutorTest.java | 92 ++++++
.../runtime/util/DirectExecutorService.java | 234 +++++++++++++
flink-tests/pom.xml | 1 -
pom.xml | 7 +
41 files changed, 2784 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5fea8fb..09c6fd0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -189,6 +189,11 @@ under the License.
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
new file mode 100644
index 0000000..e06711e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.util.Timeout;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
+ * rpc server.
+ *
+ * This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
+ * implementation which allows to dispatch local procedures to the main thread of the underlying
+ * rpc server.
+ */
+public interface MainThreadExecutor {
+ /**
+ * Execute the runnable in the main thread of the underlying rpc server.
+ *
+ * @param runnable Runnable to be executed
+ */
+ void runAsync(Runnable runnable);
+
+ /**
+ * Execute the callable in the main thread of the underlying rpc server and return a future for
+ * the callable result. If the future is not completed within the given timeout, the returned
+ * future will throw a {@link TimeoutException}.
+ *
+ * @param callable Callable to be executed
+ * @param timeout Timeout for the future to complete
+ * @param <V> Return value of the callable
+ * @return Future of the callable result
+ */
+ <V> Future<V> callAsync(Callable<V> callable, Timeout timeout);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
new file mode 100644
index 0000000..3d8757f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.util.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Base class for rpc endpoints. Distributed components which offer remote procedure calls have to
+ * extend the rpc endpoint base class.
+ *
+ * The main idea is that a rpc endpoint is backed by a rpc server which has a single thread
+ * processing the rpc calls. Thus, by executing all state changing operations within the main
+ * thread, we don't have to reason about concurrent accesses. The rpc provides provides
+ * {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the
+ * {@link #getMainThreadExecutionContext()} to execute code in the rpc server's main thread.
+ *
+ * @param <C> Rpc gateway counterpart for the implementing rpc endpoint
+ */
+public abstract class RpcEndpoint<C extends RpcGateway> {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ /** Rpc service to be used to start the rpc server and to obtain rpc gateways */
+ private final RpcService rpcService;
+
+ /** Self gateway which can be used to schedule asynchronous calls on yourself */
+ private C self;
+
+ /**
+ * The main thread execution context to be used to execute future callbacks in the main thread
+ * of the executing rpc server.
+ *
+ * IMPORTANT: The main thread context is only available after the rpc server has been started.
+ */
+ private MainThreadExecutionContext mainThreadExecutionContext;
+
+ public RpcEndpoint(RpcService rpcService) {
+ this.rpcService = rpcService;
+ }
+
+ /**
+ * Get self-gateway which should be used to run asynchronous rpc calls on this endpoint.
+ *
+ * IMPORTANT: Always issue local method calls via the self-gateway if the current thread
+ * is not the main thread of the underlying rpc server, e.g. from within a future callback.
+ *
+ * @return Self gateway
+ */
+ public C getSelf() {
+ return self;
+ }
+
+ /**
+ * Execute the runnable in the main thread of the underlying rpc server.
+ *
+ * @param runnable Runnable to be executed in the main thread of the underlying rpc server
+ */
+ public void runAsync(Runnable runnable) {
+ ((MainThreadExecutor) self).runAsync(runnable);
+ }
+
+ /**
+ * Execute the callable in the main thread of the underlying rpc server returning a future for
+ * the result of the callable. If the callable is not completed within the given timeout, then
+ * the future will be failed with a {@link java.util.concurrent.TimeoutException}.
+ *
+ * @param callable Callable to be executed in the main thread of the underlying rpc server
+ * @param timeout Timeout for the callable to be completed
+ * @param <V> Return type of the callable
+ * @return Future for the result of the callable.
+ */
+ public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+ return ((MainThreadExecutor) self).callAsync(callable, timeout);
+ }
+
+ /**
+ * Gets the main thread execution context. The main thread execution context can be used to
+ * execute tasks in the main thread of the underlying rpc server.
+ *
+ * @return Main thread execution context
+ */
+ public ExecutionContext getMainThreadExecutionContext() {
+ return mainThreadExecutionContext;
+ }
+
+ /**
+ * Gets the used rpc service.
+ *
+ * @return Rpc service
+ */
+ public RpcService getRpcService() {
+ return rpcService;
+ }
+
+ /**
+ * Starts the underlying rpc server via the rpc service and creates the main thread execution
+ * context. This makes the rpc endpoint effectively reachable from the outside.
+ *
+ * Can be overriden to add rpc endpoint specific start up code. Should always call the parent
+ * start method.
+ */
+ public void start() {
+ self = rpcService.startServer(this);
+ mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
+ }
+
+
+ /**
+ * Shuts down the underlying rpc server via the rpc service.
+ *
+ * Can be overriden to add rpc endpoint specific shut down code. Should always call the parent
+ * shut down method.
+ */
+ public void shutDown() {
+ rpcService.stopServer(self);
+ }
+
+ /**
+ * Gets the address of the underlying rpc server. The address should be fully qualified so that
+ * a remote system can connect to this rpc server via this address.
+ *
+ * @return Fully qualified address of the underlying rpc server
+ */
+ public String getAddress() {
+ return rpcService.getAddress(self);
+ }
+
+ /**
+ * Execution context which executes runnables in the main thread context. A reported failure
+ * will cause the underlying rpc server to shut down.
+ */
+ private class MainThreadExecutionContext implements ExecutionContext {
+ private final MainThreadExecutor gateway;
+
+ MainThreadExecutionContext(MainThreadExecutor gateway) {
+ this.gateway = gateway;
+ }
+
+ @Override
+ public void execute(Runnable runnable) {
+ gateway.runAsync(runnable);
+ }
+
+ @Override
+ public void reportFailure(final Throwable t) {
+ gateway.runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("Encountered failure in the main thread execution context.", t);
+ shutDown();
+ }
+ });
+ }
+
+ @Override
+ public ExecutionContext prepare() {
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
new file mode 100644
index 0000000..e3a16b4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+/**
+ * Rpc gateway interface which has to be implemented by Rpc gateways.
+ */
+public interface RpcGateway {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
new file mode 100644
index 0000000..875e557
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for rpc method in a {@link RpcEndpoint} implementation. Every rpc method must have a
+ * respective counterpart in the {@link RpcGateway} implementation for this rpc server. The
+ * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set of
+ * gateway methods in the corresponding gateway implementation are identical.
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcMethod {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
new file mode 100644
index 0000000..90ff7b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import scala.concurrent.Future;
+
+/**
+ * Interface for rpc services. An rpc service is used to start and connect to a {@link RpcEndpoint}.
+ * Connecting to a rpc server will return a {@link RpcGateway} which can be used to call remote
+ * procedures.
+ */
+public interface RpcService {
+
+ /**
+ * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
+ * be used to communicate with the rpc server.
+ *
+ * @param address Address of the remote rpc server
+ * @param clazz Class of the rpc gateway to return
+ * @param <C> Type of the rpc gateway to return
+ * @return Future containing the rpc gateway
+ */
+ <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
+
+ /**
+ * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
+ *
+ * @param rpcEndpoint Rpc protocl to dispath the rpcs to
+ * @param <S> Type of the rpc endpoint
+ * @param <C> Type of the self rpc gateway associated with the rpc server
+ * @return Self gateway to dispatch remote procedure calls to oneself
+ */
+ <S extends RpcEndpoint, C extends RpcGateway> C startServer(S rpcEndpoint);
+
+ /**
+ * Stop the underlying rpc server of the provided self gateway.
+ *
+ * @param selfGateway Self gateway describing the underlying rpc server
+ * @param <C> Type of the rpc gateway
+ */
+ <C extends RpcGateway> void stopServer(C selfGateway);
+
+ /**
+ * Stop the rpc service shutting down all started rpc servers.
+ */
+ void stopService();
+
+ /**
+ * Get the fully qualified address of the underlying rpc server represented by the self gateway.
+ * It must be possible to connect from a remote host to the rpc server via the returned fully
+ * qualified address.
+ *
+ * @param selfGateway Self gateway associated with the underlying rpc server
+ * @param <C> Type of the rpc gateway
+ * @return Fully qualified address
+ */
+ <C extends RpcGateway> String getAddress(C selfGateway);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
new file mode 100644
index 0000000..3d36d47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for {@link RpcGateway} methods to specify an additional timeout parameter for the
+ * returned future to be completed. The rest of the provided parameters is passed to the remote rpc
+ * server for the rpc.
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcTimeout {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
new file mode 100644
index 0000000..a96a600
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for Akka based rpc gateways
+ */
+public interface AkkaGateway {
+
+ ActorRef getActorRef();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
new file mode 100644
index 0000000..d55bd13
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorIdentity;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Identify;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Mapper;
+import akka.pattern.AskableActorSelection;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor;
+import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor;
+import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor;
+import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
+import scala.concurrent.Future;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class AkkaRpcService implements RpcService {
+ private final ActorSystem actorSystem;
+ private final Timeout timeout;
+ private final Set<ActorRef> actors = new HashSet<>();
+
+ public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) {
+ this.actorSystem = actorSystem;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public <C extends RpcGateway> Future<C> connect(String address, final Class<C> clazz) {
+ ActorSelection actorSel = actorSystem.actorSelection(address);
+
+ AskableActorSelection asker = new AskableActorSelection(actorSel);
+
+ Future<Object> identify = asker.ask(new Identify(42), timeout);
+
+ return identify.map(new Mapper<Object, C>(){
+ public C apply(Object obj) {
+ ActorRef actorRef = ((ActorIdentity) obj).getRef();
+
+ if (clazz == TaskExecutorGateway.class) {
+ return (C) new TaskExecutorAkkaGateway(actorRef, timeout);
+ } else if (clazz == ResourceManagerGateway.class) {
+ return (C) new ResourceManagerAkkaGateway(actorRef, timeout);
+ } else if (clazz == JobMasterGateway.class) {
+ return (C) new JobMasterAkkaGateway(actorRef, timeout);
+ } else {
+ throw new RuntimeException("Could not find remote endpoint " + clazz);
+ }
+ }
+ }, actorSystem.dispatcher());
+ }
+
+ @Override
+ public <S extends RpcEndpoint, C extends RpcGateway> C startServer(S rpcEndpoint) {
+ ActorRef ref;
+ C self;
+ if (rpcEndpoint instanceof TaskExecutor) {
+ ref = actorSystem.actorOf(
+ Props.create(TaskExecutorAkkaActor.class, rpcEndpoint)
+ );
+
+ self = (C) new TaskExecutorAkkaGateway(ref, timeout);
+ } else if (rpcEndpoint instanceof ResourceManager) {
+ ref = actorSystem.actorOf(
+ Props.create(ResourceManagerAkkaActor.class, rpcEndpoint)
+ );
+
+ self = (C) new ResourceManagerAkkaGateway(ref, timeout);
+ } else if (rpcEndpoint instanceof JobMaster) {
+ ref = actorSystem.actorOf(
+ Props.create(JobMasterAkkaActor.class, rpcEndpoint)
+ );
+
+ self = (C) new JobMasterAkkaGateway(ref, timeout);
+ } else {
+ throw new RuntimeException("Could not start RPC server for class " + rpcEndpoint.getClass());
+ }
+
+ actors.add(ref);
+
+ return self;
+ }
+
+ @Override
+ public <C extends RpcGateway> void stopServer(C selfGateway) {
+ if (selfGateway instanceof AkkaGateway) {
+ AkkaGateway akkaClient = (AkkaGateway) selfGateway;
+
+ if (actors.contains(akkaClient.getActorRef())) {
+ akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ } else {
+ // don't stop this actor since it was not started by this RPC service
+ }
+ }
+ }
+
+ @Override
+ public void stopService() {
+ actorSystem.shutdown();
+ actorSystem.awaitTermination();
+ }
+
+ @Override
+ public <C extends RpcGateway> String getAddress(C selfGateway) {
+ if (selfGateway instanceof AkkaGateway) {
+ return AkkaUtils.getAkkaURL(actorSystem, ((AkkaGateway) selfGateway).getActorRef());
+ } else {
+ throw new RuntimeException("Cannot get address for non " + AkkaGateway.class.getName() + ".");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
new file mode 100644
index 0000000..3cb499c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseAkkaActor extends UntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(BaseAkkaActor.class);
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof RunnableMessage) {
+ try {
+ ((RunnableMessage) message).getRunnable().run();
+ } catch (Exception e) {
+ LOG.error("Encountered error while executing runnable.", e);
+ }
+ } else if (message instanceof CallableMessage<?>) {
+ try {
+ Object result = ((CallableMessage<?>) message).getCallable().call();
+ sender().tell(new Status.Success(result), getSelf());
+ } catch (Exception e) {
+ sender().tell(new Status.Failure(e), getSelf());
+ }
+ } else {
+ throw new RuntimeException("Unknown message " + message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
new file mode 100644
index 0000000..512790d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
+import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
+import scala.concurrent.Future;
+
+import java.util.concurrent.Callable;
+
+public abstract class BaseAkkaGateway implements MainThreadExecutor, AkkaGateway {
+ @Override
+ public void runAsync(Runnable runnable) {
+ getActorRef().tell(new RunnableMessage(runnable), ActorRef.noSender());
+ }
+
+ @Override
+ public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+ return (Future<V>) Patterns.ask(getActorRef(), new CallableMessage(callable), timeout);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
new file mode 100644
index 0000000..9e04ea9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.jobmaster;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
+
+public class JobMasterAkkaActor extends BaseAkkaActor {
+ private final JobMaster jobMaster;
+
+ public JobMasterAkkaActor(JobMaster jobMaster) {
+ this.jobMaster = jobMaster;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof UpdateTaskExecutionState) {
+
+ final ActorRef sender = getSender();
+
+ UpdateTaskExecutionState updateTaskExecutionState = (UpdateTaskExecutionState) message;
+
+ try {
+ Acknowledge result = jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState());
+ sender.tell(new Status.Success(result), getSelf());
+ } catch (Exception e) {
+ sender.tell(new Status.Failure(e), getSelf());
+ }
+ } else if (message instanceof RegisterAtResourceManager) {
+ RegisterAtResourceManager registerAtResourceManager = (RegisterAtResourceManager) message;
+
+ jobMaster.registerAtResourceManager(registerAtResourceManager.getAddress());
+ } else {
+ super.onReceive(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
new file mode 100644
index 0000000..e6bf061
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.jobmaster;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class JobMasterAkkaGateway extends BaseAkkaGateway implements JobMasterGateway {
+ private final AskableActorRef actorRef;
+ private final Timeout timeout;
+
+ public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) {
+ this.actorRef = new AskableActorRef(actorRef);
+ this.timeout = timeout;
+ }
+
+ @Override
+ public Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+ return actorRef.ask(new UpdateTaskExecutionState(taskExecutionState), timeout)
+ .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+ }
+
+ @Override
+ public void registerAtResourceManager(String address) {
+ actorRef.actorRef().tell(new RegisterAtResourceManager(address), actorRef.actorRef());
+ }
+
+ @Override
+ public ActorRef getActorRef() {
+ return actorRef.actorRef();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
new file mode 100644
index 0000000..f0e555f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import java.util.concurrent.Callable;
+
+public class CallableMessage<V> {
+ private final Callable<V> callable;
+
+ public CallableMessage(Callable<V> callable) {
+ this.callable = callable;
+ }
+
+ public Callable<V> getCallable() {
+ return callable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
new file mode 100644
index 0000000..0b9e9dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.io.Serializable;
+
+public class CancelTask implements Serializable {
+ private static final long serialVersionUID = -2998176874447950595L;
+ private final ExecutionAttemptID executionAttemptID;
+
+ public CancelTask(ExecutionAttemptID executionAttemptID) {
+ this.executionAttemptID = executionAttemptID;
+ }
+
+ public ExecutionAttemptID getExecutionAttemptID() {
+ return executionAttemptID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
new file mode 100644
index 0000000..a83d539
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+
+import java.io.Serializable;
+
+public class ExecuteTask implements Serializable {
+ private static final long serialVersionUID = -6769958430967048348L;
+ private final TaskDeploymentDescriptor taskDeploymentDescriptor;
+
+ public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+ this.taskDeploymentDescriptor = taskDeploymentDescriptor;
+ }
+
+ public TaskDeploymentDescriptor getTaskDeploymentDescriptor() {
+ return taskDeploymentDescriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
new file mode 100644
index 0000000..3ade082
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import java.io.Serializable;
+
+public class RegisterAtResourceManager implements Serializable {
+
+ private static final long serialVersionUID = -4175905742620903602L;
+
+ private final String address;
+
+ public RegisterAtResourceManager(String address) {
+ this.address = address;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
new file mode 100644
index 0000000..b35ea38
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+
+import java.io.Serializable;
+
+public class RegisterJobMaster implements Serializable{
+ private static final long serialVersionUID = -4616879574192641507L;
+ private final JobMasterRegistration jobMasterRegistration;
+
+ public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) {
+ this.jobMasterRegistration = jobMasterRegistration;
+ }
+
+ public JobMasterRegistration getJobMasterRegistration() {
+ return jobMasterRegistration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
new file mode 100644
index 0000000..85ceeec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+
+import java.io.Serializable;
+
+public class RequestSlot implements Serializable {
+ private static final long serialVersionUID = 7207463889348525866L;
+
+ private final SlotRequest slotRequest;
+
+ public RequestSlot(SlotRequest slotRequest) {
+ this.slotRequest = slotRequest;
+ }
+
+ public SlotRequest getSlotRequest() {
+ return slotRequest;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
new file mode 100644
index 0000000..3556738
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+public class RunnableMessage {
+ private final Runnable runnable;
+
+ public RunnableMessage(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ public Runnable getRunnable() {
+ return runnable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
new file mode 100644
index 0000000..f89cd2f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import java.io.Serializable;
+
+public class UpdateTaskExecutionState implements Serializable{
+ private static final long serialVersionUID = -6662229114427331436L;
+
+ private final TaskExecutionState taskExecutionState;
+
+ public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) {
+ this.taskExecutionState = taskExecutionState;
+ }
+
+ public TaskExecutionState getTaskExecutionState() {
+ return taskExecutionState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
new file mode 100644
index 0000000..13101f9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.resourcemanager;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.pattern.Patterns;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
+import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
+import scala.concurrent.Future;
+
+public class ResourceManagerAkkaActor extends BaseAkkaActor {
+ private final ResourceManager resourceManager;
+
+ public ResourceManagerAkkaActor(ResourceManager resourceManager) {
+ this.resourceManager = resourceManager;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ final ActorRef sender = getSender();
+
+ if (message instanceof RegisterJobMaster) {
+ RegisterJobMaster registerJobMaster = (RegisterJobMaster) message;
+
+ try {
+ Future<RegistrationResponse> response = resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration());
+ Patterns.pipe(response, getContext().dispatcher()).to(sender());
+ } catch (Exception e) {
+ sender.tell(new Status.Failure(e), getSelf());
+ }
+ } else if (message instanceof RequestSlot) {
+ RequestSlot requestSlot = (RequestSlot) message;
+
+ try {
+ SlotAssignment response = resourceManager.requestSlot(requestSlot.getSlotRequest());
+ sender.tell(new Status.Success(response), getSelf());
+ } catch (Exception e) {
+ sender.tell(new Status.Failure(e), getSelf());
+ }
+ } else {
+ super.onReceive(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
new file mode 100644
index 0000000..1304707
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.resourcemanager;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
+import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+public class ResourceManagerAkkaGateway extends BaseAkkaGateway implements ResourceManagerGateway {
+ private final AskableActorRef actorRef;
+ private final Timeout timeout;
+
+ public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) {
+ this.actorRef = new AskableActorRef(actorRef);
+ this.timeout = timeout;
+ }
+
+ @Override
+ public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration timeout) {
+ return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), new Timeout(timeout))
+ .mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
+ }
+
+ @Override
+ public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+ return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), timeout)
+ .mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
+ }
+
+ @Override
+ public Future<SlotAssignment> requestSlot(SlotRequest slotRequest) {
+ return actorRef.ask(new RequestSlot(slotRequest), timeout)
+ .mapTo(ClassTag$.MODULE$.<SlotAssignment>apply(SlotAssignment.class));
+ }
+
+ @Override
+ public ActorRef getActorRef() {
+ return actorRef.actorRef();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
new file mode 100644
index 0000000..ed522cc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.taskexecutor;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
+import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
+import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+
+public class TaskExecutorAkkaActor extends BaseAkkaActor {
+ private final TaskExecutorGateway taskExecutor;
+
+ public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
+ this.taskExecutor = taskExecutor;
+ }
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ final ActorRef sender = getSender();
+
+ if (message instanceof ExecuteTask) {
+ ExecuteTask executeTask = (ExecuteTask) message;
+
+ taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete(
+ new OnComplete<Acknowledge>() {
+ @Override
+ public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
+ if (failure != null) {
+ sender.tell(new Status.Failure(failure), getSelf());
+ } else {
+ sender.tell(new Status.Success(Acknowledge.get()), getSelf());
+ }
+ }
+ },
+ getContext().dispatcher()
+ );
+ } else if (message instanceof CancelTask) {
+ CancelTask cancelTask = (CancelTask) message;
+
+ taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete(
+ new OnComplete<Acknowledge>() {
+ @Override
+ public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
+ if (failure != null) {
+ sender.tell(new Status.Failure(failure), getSelf());
+ } else {
+ sender.tell(new Status.Success(Acknowledge.get()), getSelf());
+ }
+ }
+ },
+ getContext().dispatcher()
+ );
+ } else {
+ super.onReceive(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
new file mode 100644
index 0000000..7f0a522
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.taskexecutor;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
+import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class TaskExecutorAkkaGateway extends BaseAkkaGateway implements TaskExecutorGateway {
+ private final AskableActorRef actorRef;
+ private final Timeout timeout;
+
+ public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) {
+ this.actorRef = new AskableActorRef(actorRef);
+ this.timeout = timeout;
+ }
+
+ @Override
+ public Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+ return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), timeout)
+ .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+ }
+
+ @Override
+ public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId) {
+ return actorRef.ask(new CancelTask(executionAttemptId), timeout)
+ .mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+ }
+
+ @Override
+ public ActorRef getActorRef() {
+ return actorRef.actorRef();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
new file mode 100644
index 0000000..b81b19c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.jobmaster;
+
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * JobMaster implementation. The job master is responsible for the execution of a single
+ * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+ *
+ * It offers the following methods as part of its rpc interface to interact with the JobMaster
+ * remotely:
+ * <ul>
+ * <li>{@link #registerAtResourceManager(String)} triggers the registration at the resource manager</li>
+ * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
+ * given task</li>
+ * </ul>
+ */
+public class JobMaster extends RpcEndpoint<JobMasterGateway> {
+ /** Execution context for future callbacks */
+ private final ExecutionContext executionContext;
+
+ /** Execution context for scheduled runnables */
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ private final FiniteDuration initialRegistrationTimeout = new FiniteDuration(500, TimeUnit.MILLISECONDS);
+ private final FiniteDuration maxRegistrationTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
+ private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS);
+ private final long failedRegistrationDelay = 10000;
+
+ /** Gateway to connected resource manager, null iff not connected */
+ private ResourceManagerGateway resourceManager = null;
+
+ /** UUID to filter out old registration runs */
+ private UUID currentRegistrationRun;
+
+ public JobMaster(RpcService rpcService, ExecutorService executorService) {
+ super(rpcService);
+ executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+ }
+
+ public ResourceManagerGateway getResourceManager() {
+ return resourceManager;
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // RPC methods
+ //----------------------------------------------------------------------------------------------
+
+ /**
+ * Updates the task execution state for a given task.
+ *
+ * @param taskExecutionState New task execution state for a given task
+ * @return Acknowledge the task execution state update
+ */
+ @RpcMethod
+ public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+ System.out.println("TaskExecutionState: " + taskExecutionState);
+ return Acknowledge.get();
+ }
+
+ /**
+ * Triggers the registration of the job master at the resource manager.
+ *
+ * @param address Address of the resource manager
+ */
+ @RpcMethod
+ public void registerAtResourceManager(final String address) {
+ currentRegistrationRun = UUID.randomUUID();
+
+ Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class);
+
+ handleResourceManagerRegistration(
+ new JobMasterRegistration(getAddress()),
+ 1,
+ resourceManagerFuture,
+ currentRegistrationRun,
+ initialRegistrationTimeout,
+ maxRegistrationTimeout,
+ registrationDuration.fromNow());
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Helper methods
+ //----------------------------------------------------------------------------------------------
+
+ /**
+ * Helper method to handle the resource manager registration process. If a registration attempt
+ * times out, then a new attempt with the doubled time out is initiated. The whole registration
+ * process has a deadline. Once this deadline is overdue without successful registration, the
+ * job master shuts down.
+ *
+ * @param jobMasterRegistration Job master registration info which is sent to the resource
+ * manager
+ * @param attemptNumber Registration attempt number
+ * @param resourceManagerFuture Future of the resource manager gateway
+ * @param registrationRun UUID describing the current registration run
+ * @param timeout Timeout of the last registration attempt
+ * @param maxTimeout Maximum timeout between registration attempts
+ * @param deadline Deadline for the registration
+ */
+ void handleResourceManagerRegistration(
+ final JobMasterRegistration jobMasterRegistration,
+ final int attemptNumber,
+ final Future<ResourceManagerGateway> resourceManagerFuture,
+ final UUID registrationRun,
+ final FiniteDuration timeout,
+ final FiniteDuration maxTimeout,
+ final Deadline deadline) {
+
+ // filter out concurrent registration runs
+ if (registrationRun.equals(currentRegistrationRun)) {
+
+ log.info("Start registration attempt #{}.", attemptNumber);
+
+ if (deadline.isOverdue()) {
+ // we've exceeded our registration deadline. This means that we have to shutdown the JobMaster
+ log.error("Exceeded registration deadline without successfully registering at the ResourceManager.");
+ shutDown();
+ } else {
+ Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() {
+ @Override
+ public Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> apply(ResourceManagerGateway resourceManagerGateway) {
+ return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout).zip(Futures.successful(resourceManagerGateway));
+ }
+ }, executionContext);
+
+ registrationResponseFuture.onComplete(new OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
+ @Override
+ public void onComplete(Throwable failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws Throwable {
+ if (failure != null) {
+ if (failure instanceof TimeoutException) {
+ // we haven't received an answer in the given timeout interval,
+ // so increase it and try again.
+ final FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout);
+
+ handleResourceManagerRegistration(
+ jobMasterRegistration,
+ attemptNumber + 1,
+ resourceManagerFuture,
+ registrationRun,
+ newTimeout,
+ maxTimeout,
+ deadline);
+ } else {
+ log.error("Received unknown error while registering at the ResourceManager.", failure);
+ shutDown();
+ }
+ } else {
+ final RegistrationResponse response = tuple._1();
+ final ResourceManagerGateway gateway = tuple._2();
+
+ if (response.isSuccess()) {
+ finishResourceManagerRegistration(gateway, response.getInstanceID());
+ } else {
+ log.info("The registration was refused. Try again.");
+
+ scheduledExecutorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ // we have to execute scheduled runnable in the main thread
+ // because we need consistency wrt currentRegistrationRun
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ // our registration attempt was refused. Start over.
+ handleResourceManagerRegistration(
+ jobMasterRegistration,
+ 1,
+ resourceManagerFuture,
+ registrationRun,
+ initialRegistrationTimeout,
+ maxTimeout,
+ deadline);
+ }
+ });
+ }
+ }, failedRegistrationDelay, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }, getMainThreadExecutionContext()); // use the main thread execution context to execute the call back in the main thread
+ }
+ } else {
+ log.info("Discard out-dated registration run.");
+ }
+ }
+
+ /**
+ * Finish the resource manager registration by setting the new resource manager gateway.
+ *
+ * @param resourceManager New resource manager gateway
+ * @param instanceID Instance id assigned by the resource manager
+ */
+ void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, InstanceID instanceID) {
+ log.info("Successfully registered at the ResourceManager under instance id {}.", instanceID);
+ this.resourceManager = resourceManager;
+ }
+
+ /**
+ * Return if the job master is connected to a resource manager.
+ *
+ * @return true if the job master is connected to the resource manager
+ */
+ public boolean isConnected() {
+ return resourceManager != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
new file mode 100644
index 0000000..17a4c3a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.jobmaster;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+
+/**
+ * {@link JobMaster} rpc gateway interface
+ */
+public interface JobMasterGateway extends RpcGateway {
+
+ /**
+ * Updates the task execution state for a given task.
+ *
+ * @param taskExecutionState New task execution state for a given task
+ * @return Future acknowledge of the task execution state update
+ */
+ Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+
+ /**
+ * Triggers the registration of the job master at the resource manager.
+ *
+ * @param address Address of the resource manager
+ */
+ void registerAtResourceManager(final String address);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
new file mode 100644
index 0000000..7a2deae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class JobMasterRegistration implements Serializable {
+ private static final long serialVersionUID = 8411214999193765202L;
+
+ private final String address;
+
+ public JobMasterRegistration(String address) {
+ this.address = address;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
new file mode 100644
index 0000000..8ac9e49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.io.Serializable;
+
+public class RegistrationResponse implements Serializable {
+ private static final long serialVersionUID = -2379003255993119993L;
+
+ private final boolean isSuccess;
+ private final InstanceID instanceID;
+
+ public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
+ this.isSuccess = isSuccess;
+ this.instanceID = instanceID;
+ }
+
+ public boolean isSuccess() {
+ return isSuccess;
+ }
+
+ public InstanceID getInstanceID() {
+ return instanceID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f36fb7f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
new file mode 100644
index 0000000..c7e8def
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import akka.dispatch.Mapper;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * ResourceManager implementation. The resource manager is responsible for resource de-/allocation
+ * and bookkeeping.
+ *
+ * It offers the following methods as part of its rpc interface to interact with the him remotely:
+ * <ul>
+ * <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
+ * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
+ * </ul>
+ */
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+ private final ExecutionContext executionContext;
+ private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+
+ public ResourceManager(RpcService rpcService, ExecutorService executorService) {
+ super(rpcService);
+ this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
+ this.jobMasterGateways = new HashMap<>();
+ }
+
+ /**
+ * Register a {@link JobMaster} at the resource manager.
+ *
+ * @param jobMasterRegistration Job master registration information
+ * @return Future registration response
+ */
+ @RpcMethod
+ public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+ Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+
+ return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
+ @Override
+ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
+ InstanceID instanceID;
+
+ if (jobMasterGateways.containsKey(jobMasterGateway)) {
+ instanceID = jobMasterGateways.get(jobMasterGateway);
+ } else {
+ instanceID = new InstanceID();
+ jobMasterGateways.put(jobMasterGateway, instanceID);
+ }
+
+ return new RegistrationResponse(true, instanceID);
+ }
+ }, getMainThreadExecutionContext());
+ }
+
+ /**
+ * Requests a slot from the resource manager.
+ *
+ * @param slotRequest Slot request
+ * @return Slot assignment
+ */
+ @RpcMethod
+ public SlotAssignment requestSlot(SlotRequest slotRequest) {
+ System.out.println("SlotRequest: " + slotRequest);
+ return new SlotAssignment();
+ }
+}