You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:50 UTC

[17/50] [abbrv] flink git commit: [FLINK-4346] [rpc] Add new RPC abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
new file mode 100644
index 0000000..464a261
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * {@link ResourceManager} rpc gateway interface.
+ */
+public interface ResourceManagerGateway extends RpcGateway {
+
+	/**
+	 * Register a {@link JobMaster} at the resource manager.
+	 *
+	 * @param jobMasterRegistration Job master registration information
+	 * @param timeout Timeout for the future to complete
+	 * @return Future registration response
+	 */
+	Future<RegistrationResponse> registerJobMaster(
+		JobMasterRegistration jobMasterRegistration,
+		@RpcTimeout FiniteDuration timeout);
+
+	/**
+	 * Register a {@link JobMaster} at the resource manager.
+	 *
+	 * @param jobMasterRegistration Job master registration information
+	 * @return Future registration response
+	 */
+	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
+
+	/**
+	 * Requests a slot from the resource manager.
+	 *
+	 * @param slotRequest Slot request
+	 * @return Future slot assignment
+	 */
+	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
new file mode 100644
index 0000000..86cd8b7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotAssignment implements Serializable{
+	private static final long serialVersionUID = -6990813455942742322L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
new file mode 100644
index 0000000..d8fe268
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotRequest implements Serializable{
+	private static final long serialVersionUID = -6586877187990445986L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
new file mode 100644
index 0000000..cdfc3bd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import akka.dispatch.ExecutionContexts$;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * TaskExecutor implementation. The task executor is responsible for the execution of multiple
+ * {@link org.apache.flink.runtime.taskmanager.Task}.
+ *
+ * It offers the following methods as part of its rpc interface to interact with him remotely:
+ * <ul>
+ *     <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given task on the TaskExecutor</li>
+ *     <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task identified by the {@link ExecutionAttemptID}</li>
+ * </ul>
+ */
+public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+	private final ExecutionContext executionContext;
+	private final Set<ExecutionAttemptID> tasks = new HashSet<>();
+
+	public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
+		super(rpcService);
+		this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService);
+	}
+
+	/**
+	 * Execute the given task on the task executor. The task is described by the provided
+	 * {@link TaskDeploymentDescriptor}.
+	 *
+	 * @param taskDeploymentDescriptor Descriptor for the task to be executed
+	 * @return Acknowledge the start of the task execution
+	 */
+	@RpcMethod
+	public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+		tasks.add(taskDeploymentDescriptor.getExecutionId());
+		return Acknowledge.get();
+	}
+
+	/**
+	 * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
+	 * the method throws an {@link Exception}.
+	 *
+	 * @param executionAttemptId Execution attempt ID identifying the task to be canceled.
+	 * @return Acknowledge the task canceling
+	 * @throws Exception if the task with the given execution attempt id could not be found
+	 */
+	@RpcMethod
+	public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) throws Exception {
+		if (tasks.contains(executionAttemptId)) {
+			return Acknowledge.get();
+		} else {
+			throw new Exception("Could not find task.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
new file mode 100644
index 0000000..450423e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import scala.concurrent.Future;
+
+/**
+ * {@link TaskExecutor} rpc gateway interface
+ */
+public interface TaskExecutorGateway extends RpcGateway {
+	/**
+	 * Execute the given task on the task executor. The task is described by the provided
+	 * {@link TaskDeploymentDescriptor}.
+	 *
+	 * @param taskDeploymentDescriptor Descriptor for the task to be executed
+	 * @return Future acknowledge of the start of the task execution
+	 */
+	Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor);
+
+	/**
+	 * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
+	 * the method throws an {@link Exception}.
+	 *
+	 * @param executionAttemptId Execution attempt ID identifying the task to be canceled.
+	 * @return Future acknowledge of the task canceling
+	 */
+	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
new file mode 100644
index 0000000..0ded25e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.reflections.Reflections;
+import scala.concurrent.Future;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RpcCompletenessTest extends TestLogger {
+	private static final Class<?> futureClass = Future.class;
+
+	@Test
+	public void testRpcCompleteness() {
+		Reflections reflections = new Reflections("org.apache.flink");
+
+		Set<Class<? extends RpcEndpoint>> classes = reflections.getSubTypesOf(RpcEndpoint.class);
+
+		Class<? extends RpcEndpoint> c;
+
+		for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
+			c = rpcEndpoint;
+			Type superClass = c.getGenericSuperclass();
+
+			Class<?> rpcGatewayType = extractTypeParameter(superClass, 0);
+
+			if (rpcGatewayType != null) {
+				checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
+			} else {
+				fail("Could not retrieve the rpc gateway class for the given rpc endpoint class " + rpcEndpoint.getName());
+			}
+		}
+	}
+
+	private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
+		Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
+		Method[] serverMethods = rpcEndpoint.getDeclaredMethods();
+
+		Map<String, Set<Method>> rpcMethods = new HashMap<>();
+		Set<Method> unmatchedRpcMethods = new HashSet<>();
+
+		for (Method serverMethod : serverMethods) {
+			if (serverMethod.isAnnotationPresent(RpcMethod.class)) {
+				if (rpcMethods.containsKey(serverMethod.getName())) {
+					Set<Method> methods = rpcMethods.get(serverMethod.getName());
+					methods.add(serverMethod);
+
+					rpcMethods.put(serverMethod.getName(), methods);
+				} else {
+					Set<Method> methods = new HashSet<>();
+					methods.add(serverMethod);
+
+					rpcMethods.put(serverMethod.getName(), methods);
+				}
+
+				unmatchedRpcMethods.add(serverMethod);
+			}
+		}
+
+		for (Method gatewayMethod : gatewayMethods) {
+			assertTrue(
+				"The rpc endpoint " + rpcEndpoint.getName() + " does not contain a RpcMethod " +
+					"annotated method with the same name and signature " +
+					generateEndpointMethodSignature(gatewayMethod) + ".",
+				rpcMethods.containsKey(gatewayMethod.getName()));
+
+			checkGatewayMethod(gatewayMethod);
+
+			if (!matchGatewayMethodWithEndpoint(gatewayMethod, rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) {
+				fail("Could not find a RpcMethod annotated method in rpc endpoint " +
+					rpcEndpoint.getName() + " matching the rpc gateway method " +
+					generateEndpointMethodSignature(gatewayMethod) + " defined in the rpc gateway " +
+					rpcGateway.getName() + ".");
+			}
+		}
+
+		if (!unmatchedRpcMethods.isEmpty()) {
+			StringBuilder builder = new StringBuilder();
+
+			for (Method unmatchedRpcMethod : unmatchedRpcMethods) {
+				builder.append(unmatchedRpcMethod).append("\n");
+			}
+
+			fail("The rpc endpoint " + rpcEndpoint.getName() + " contains rpc methods which " +
+				"are not matched to gateway methods of " + rpcGateway.getName() + ":\n" +
+				builder.toString());
+		}
+	}
+
+	/**
+	 * Checks whether the gateway method fulfills the gateway method requirements.
+	 * <ul>
+	 *     <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li>
+	 *     <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
+	 * </ul>
+	 *
+	 * @param gatewayMethod Gateway method to check
+	 */
+	private void checkGatewayMethod(Method gatewayMethod) {
+		if (!gatewayMethod.getReturnType().equals(Void.TYPE)) {
+			assertTrue(
+				"The return type of method " + gatewayMethod.getName() + " in the rpc gateway " +
+					gatewayMethod.getDeclaringClass().getName() + " is non void and not a " +
+					"future. Non-void return types have to be returned as a future.",
+				gatewayMethod.getReturnType().equals(futureClass));
+		}
+
+		Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations();
+		int rpcTimeoutParameters = 0;
+
+		for (Annotation[] parameterAnnotation : parameterAnnotations) {
+			for (Annotation annotation : parameterAnnotation) {
+				if (annotation.equals(RpcTimeout.class)) {
+					rpcTimeoutParameters++;
+				}
+			}
+		}
+
+		assertTrue("The gateway method " + gatewayMethod + " must have at most one RpcTimeout " +
+			"annotated parameter.", rpcTimeoutParameters <= 1);
+	}
+
+	/**
+	 * Checks whether we find a matching overloaded version for the gateway method among the methods
+	 * with the same name in the rpc endpoint.
+	 *
+	 * @param gatewayMethod Gateway method
+	 * @param endpointMethods Set of rpc methods on the rpc endpoint with the same name as the gateway
+	 *                   method
+	 * @param unmatchedRpcMethods Set of unmatched rpc methods on the endpoint side (so far)
+	 */
+	private boolean matchGatewayMethodWithEndpoint(Method gatewayMethod, Set<Method> endpointMethods, Set<Method> unmatchedRpcMethods) {
+		for (Method endpointMethod : endpointMethods) {
+			if (checkMethod(gatewayMethod, endpointMethod)) {
+				unmatchedRpcMethods.remove(endpointMethod);
+				return true;
+			}
+		}
+
+		return false;
+	}
+
+	private boolean checkMethod(Method gatewayMethod, Method endpointMethod) {
+		Class<?>[] gatewayParameterTypes = gatewayMethod.getParameterTypes();
+		Annotation[][] gatewayParameterAnnotations = gatewayMethod.getParameterAnnotations();
+
+		Class<?>[] endpointParameterTypes = endpointMethod.getParameterTypes();
+
+		List<Class<?>> filteredGatewayParameterTypes = new ArrayList<>();
+
+		assertEquals(gatewayParameterTypes.length, gatewayParameterAnnotations.length);
+
+		// filter out the RpcTimeout parameters
+		for (int i = 0; i < gatewayParameterTypes.length; i++) {
+			if (!isRpcTimeout(gatewayParameterAnnotations[i])) {
+				filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
+			}
+		}
+
+		if (filteredGatewayParameterTypes.size() != endpointParameterTypes.length) {
+			return false;
+		} else {
+			// check the parameter types
+			for (int i = 0; i < filteredGatewayParameterTypes.size(); i++) {
+				if (!checkType(filteredGatewayParameterTypes.get(i), endpointParameterTypes[i])) {
+					return false;
+				}
+			}
+
+			// check the return types
+			if (endpointMethod.getReturnType() == void.class) {
+				if (gatewayMethod.getReturnType() != void.class) {
+					return false;
+				}
+			} else {
+				// has return value. The gateway method should be wrapped in a future
+				Class<?> futureClass = gatewayMethod.getReturnType();
+
+				// sanity check that the return type of a gateway method must be void or a future
+				if (!futureClass.equals(RpcCompletenessTest.futureClass)) {
+					return false;
+				} else {
+					Class<?> valueClass = extractTypeParameter(futureClass, 0);
+
+					if (endpointMethod.getReturnType().equals(futureClass)) {
+						Class<?> rpcEndpointValueClass = extractTypeParameter(endpointMethod.getReturnType(), 0);
+
+						// check if we have the same future value types
+						if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) {
+							return false;
+						}
+					} else {
+						if (valueClass != null && !checkType(valueClass, endpointMethod.getReturnType())) {
+							return false;
+						}
+					}
+				}
+			}
+
+			return gatewayMethod.getName().equals(endpointMethod.getName());
+		}
+	}
+
+	private boolean checkType(Class<?> firstType, Class<?> secondType) {
+		return firstType.equals(secondType);
+	}
+
+	/**
+	 * Generates from a gateway rpc method signature the corresponding rpc endpoint signature.
+	 *
+	 * For example the {@link RpcTimeout} annotation adds an additional parameter to the gateway
+	 * signature which is not relevant on the server side.
+	 *
+	 * @param method Method to generate the signature string for
+	 * @return String of the respective server side rpc method signature
+	 */
+	private String generateEndpointMethodSignature(Method method) {
+		StringBuilder builder = new StringBuilder();
+
+		if (method.getReturnType().equals(Void.TYPE)) {
+			builder.append("void").append(" ");
+		} else if (method.getReturnType().equals(futureClass)) {
+			Class<?> valueClass = extractTypeParameter(method.getGenericReturnType(), 0);
+
+			builder
+				.append(futureClass.getSimpleName())
+				.append("<")
+				.append(valueClass != null ? valueClass.getSimpleName() : "")
+				.append(">");
+
+			if (valueClass != null) {
+				builder.append("/").append(valueClass.getSimpleName());
+			}
+
+			builder.append(" ");
+		} else {
+			return "Invalid rpc method signature.";
+		}
+
+		builder.append(method.getName()).append("(");
+
+		Class<?>[] parameterTypes = method.getParameterTypes();
+		Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+
+		assertEquals(parameterTypes.length, parameterAnnotations.length);
+
+		for (int i = 0; i < parameterTypes.length; i++) {
+			// filter out the RpcTimeout parameters
+			if (!isRpcTimeout(parameterAnnotations[i])) {
+				builder.append(parameterTypes[i].getName());
+
+				if (i < parameterTypes.length -1) {
+					builder.append(", ");
+				}
+			}
+		}
+
+		builder.append(")");
+
+		return builder.toString();
+	}
+
+	private Class<?> extractTypeParameter(Type genericType, int position) {
+		if (genericType instanceof ParameterizedType) {
+			ParameterizedType parameterizedType = (ParameterizedType) genericType;
+
+			Type[] typeArguments = parameterizedType.getActualTypeArguments();
+
+			if (position < 0 || position >= typeArguments.length) {
+				throw new IndexOutOfBoundsException("The generic type " +
+					parameterizedType.getRawType() + " only has " + typeArguments.length +
+					" type arguments.");
+			} else {
+				Type typeArgument = typeArguments[position];
+
+				if (typeArgument instanceof Class<?>) {
+					return (Class<?>) typeArgument;
+				} else {
+					return null;
+				}
+			}
+		} else {
+			return null;
+		}
+	}
+
+	private boolean isRpcTimeout(Annotation[] annotations) {
+		for (Annotation annotation : annotations) {
+			if (annotation.annotationType().equals(RpcTimeout.class)) {
+				return true;
+			}
+		}
+
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
new file mode 100644
index 0000000..c5bac94
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AkkaRpcServiceTest extends TestLogger {
+
+	/**
+	 * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the
+	 * {@link AkkaRpcService}.
+	 */
+	@Test
+	public void testJobMasterResourceManagerRegistration() throws Exception {
+		Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
+		ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout);
+		AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout);
+		ExecutorService executorService = new ForkJoinPool();
+
+		ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
+		JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
+
+		resourceManager.start();
+
+		ResourceManagerGateway rm = resourceManager.getSelf();
+
+		assertTrue(rm instanceof AkkaGateway);
+
+		AkkaGateway akkaClient = (AkkaGateway) rm;
+
+		jobMaster.start();
+		jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef()));
+
+		// wait for successful registration
+		FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
+		Deadline deadline = timeout.fromNow();
+
+		while (deadline.hasTimeLeft() && !jobMaster.isConnected()) {
+			Thread.sleep(100);
+		}
+
+		assertFalse(deadline.isOverdue());
+
+		jobMaster.shutDown();
+		resourceManager.shutDown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
new file mode 100644
index 0000000..c143527
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class TaskExecutorTest extends TestLogger {
+
+	/**
+	 * Tests that we can deploy and cancel a task on the TaskExecutor without exceptions
+	 */
+	@Test
+	public void testTaskExecution() throws Exception {
+		RpcService testingRpcService = mock(RpcService.class);
+		DirectExecutorService directExecutorService = null;
+		TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+
+		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+			new JobID(),
+			"Test job",
+			new JobVertexID(),
+			new ExecutionAttemptID(),
+			new SerializedValue<ExecutionConfig>(null),
+			"Test task",
+			0,
+			1,
+			0,
+			new Configuration(),
+			new Configuration(),
+			"Invokable",
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList(),
+			0
+		);
+
+		Acknowledge ack = taskExecutor.executeTask(tdd);
+
+		ack = taskExecutor.cancelTask(tdd.getExecutionId());
+	}
+
+	/**
+	 * Tests that cancelling a non-existing task will return an exception
+	 */
+	@Test(expected=Exception.class)
+	public void testWrongTaskCancellation() throws Exception {
+		RpcService testingRpcService = mock(RpcService.class);
+		DirectExecutorService directExecutorService = null;
+		TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+
+		taskExecutor.cancelTask(new ExecutionAttemptID());
+
+		fail("The cancellation should have thrown an exception.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
new file mode 100644
index 0000000..1d7c971
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class DirectExecutorService implements ExecutorService {
+	private boolean _shutdown = false;
+
+	@Override
+	public void shutdown() {
+		_shutdown = true;
+	}
+
+	@Override
+	public List<Runnable> shutdownNow() {
+		_shutdown = true;
+		return Collections.emptyList();
+	}
+
+	@Override
+	public boolean isShutdown() {
+		return _shutdown;
+	}
+
+	@Override
+	public boolean isTerminated() {
+		return _shutdown;
+	}
+
+	@Override
+	public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+		return _shutdown;
+	}
+
+	@Override
+	public <T> Future<T> submit(Callable<T> task) {
+		try {
+			T result = task.call();
+
+			return new CompletedFuture<>(result, null);
+		} catch (Exception e) {
+			return new CompletedFuture<>(null, e);
+		}
+	}
+
+	@Override
+	public <T> Future<T> submit(Runnable task, T result) {
+		task.run();
+
+		return new CompletedFuture<>(result, null);
+	}
+
+	@Override
+	public Future<?> submit(Runnable task) {
+		task.run();
+		return new CompletedFuture<>(null, null);
+	}
+
+	@Override
+	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+		ArrayList<Future<T>> result = new ArrayList<>();
+
+		for (Callable<T> task : tasks) {
+			try {
+				result.add(new CompletedFuture<T>(task.call(), null));
+			} catch (Exception e) {
+				result.add(new CompletedFuture<T>(null, e));
+			}
+		}
+		return result;
+	}
+
+	@Override
+	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+		long end = System.currentTimeMillis() + unit.toMillis(timeout);
+		Iterator<? extends Callable<T>> iterator = tasks.iterator();
+		ArrayList<Future<T>> result = new ArrayList<>();
+
+		while (end > System.currentTimeMillis() && iterator.hasNext()) {
+			Callable<T> callable = iterator.next();
+
+			try {
+				result.add(new CompletedFuture<T>(callable.call(), null));
+			} catch (Exception e) {
+				result.add(new CompletedFuture<T>(null, e));
+			}
+		}
+
+		while(iterator.hasNext()) {
+			iterator.next();
+			result.add(new Future<T>() {
+				@Override
+				public boolean cancel(boolean mayInterruptIfRunning) {
+					return false;
+				}
+
+				@Override
+				public boolean isCancelled() {
+					return true;
+				}
+
+				@Override
+				public boolean isDone() {
+					return false;
+				}
+
+				@Override
+				public T get() throws InterruptedException, ExecutionException {
+					throw new CancellationException("Task has been cancelled.");
+				}
+
+				@Override
+				public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+					throw new CancellationException("Task has been cancelled.");
+				}
+			});
+		}
+
+		return result;
+	}
+
+	@Override
+	public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+		Exception exception = null;
+
+		for (Callable<T> task : tasks) {
+			try {
+				return task.call();
+			} catch (Exception e) {
+				// try next task
+				exception = e;
+			}
+		}
+
+		throw new ExecutionException("No tasks finished successfully.", exception);
+	}
+
+	@Override
+	public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+		long end = System.currentTimeMillis() + unit.toMillis(timeout);
+		Exception exception = null;
+
+		Iterator<? extends Callable<T>> iterator = tasks.iterator();
+
+		while (end > System.currentTimeMillis() && iterator.hasNext()) {
+			Callable<T> callable = iterator.next();
+
+			try {
+				return callable.call();
+			} catch (Exception e) {
+				// ignore exception and try next
+				exception = e;
+			}
+		}
+
+		if (iterator.hasNext()) {
+			throw new TimeoutException("Could not finish execution of tasks within time.");
+		} else {
+			throw new ExecutionException("No tasks finished successfully.", exception);
+		}
+	}
+
+	@Override
+	public void execute(Runnable command) {
+		command.run();
+	}
+
+	public static class CompletedFuture<V> implements Future<V> {
+		private final V value;
+		private final Exception exception;
+
+		public CompletedFuture(V value, Exception exception) {
+			this.value = value;
+			this.exception = exception;
+		}
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			return false;
+		}
+
+		@Override
+		public boolean isCancelled() {
+			return false;
+		}
+
+		@Override
+		public boolean isDone() {
+			return true;
+		}
+
+		@Override
+		public V get() throws InterruptedException, ExecutionException {
+			if (exception != null) {
+				throw new ExecutionException(exception);
+			} else {
+				return value;
+			}
+		}
+
+		@Override
+		public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+			return get();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index b09db1f..3202a9f 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -202,7 +202,6 @@ under the License.
 		<dependency>
 			<groupId>org.reflections</groupId>
 			<artifactId>reflections</artifactId>
-			<version>0.9.10</version>
 		</dependency>
 
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b3148a..5d7cf91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -413,6 +413,13 @@ under the License.
 				<artifactId>jackson-annotations</artifactId>
 				<version>${jackson.version}</version>
 			</dependency>
+
+			<dependency>
+				<groupId>org.reflections</groupId>
+				<artifactId>reflections</artifactId>
+				<version>0.9.10</version>
+				<scope>test</scope>
+			</dependency>
 		</dependencies>
 	</dependencyManagement>