You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:50 UTC
[17/50] [abbrv] flink git commit: [FLINK-4346] [rpc] Add new RPC
abstraction
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
new file mode 100644
index 0000000..464a261
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * {@link ResourceManager} rpc gateway interface.
+ */
+public interface ResourceManagerGateway extends RpcGateway {
+
+ /**
+ * Register a {@link JobMaster} at the resource manager.
+ *
+ * @param jobMasterRegistration Job master registration information
+ * @param timeout Timeout for the future to complete
+ * @return Future registration response
+ */
+ Future<RegistrationResponse> registerJobMaster(
+ JobMasterRegistration jobMasterRegistration,
+ @RpcTimeout FiniteDuration timeout);
+
+ /**
+ * Register a {@link JobMaster} at the resource manager.
+ *
+ * @param jobMasterRegistration Job master registration information
+ * @return Future registration response
+ */
+ Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
+
+ /**
+ * Requests a slot from the resource manager.
+ *
+ * @param slotRequest Slot request
+ * @return Future slot assignment
+ */
+ Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
new file mode 100644
index 0000000..86cd8b7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotAssignment implements Serializable{
+ private static final long serialVersionUID = -6990813455942742322L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
new file mode 100644
index 0000000..d8fe268
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotRequest implements Serializable{
+ private static final long serialVersionUID = -6586877187990445986L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
new file mode 100644
index 0000000..cdfc3bd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import akka.dispatch.ExecutionContexts$;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * TaskExecutor implementation. The task executor is responsible for the execution of multiple
+ * {@link org.apache.flink.runtime.taskmanager.Task}.
+ *
+ * It offers the following methods as part of its rpc interface to interact with him remotely:
+ * <ul>
+ * <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given task on the TaskExecutor</li>
+ * <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task identified by the {@link ExecutionAttemptID}</li>
+ * </ul>
+ */
+public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
+ private final ExecutionContext executionContext;
+ private final Set<ExecutionAttemptID> tasks = new HashSet<>();
+
+ public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
+ super(rpcService);
+ this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService);
+ }
+
+ /**
+ * Execute the given task on the task executor. The task is described by the provided
+ * {@link TaskDeploymentDescriptor}.
+ *
+ * @param taskDeploymentDescriptor Descriptor for the task to be executed
+ * @return Acknowledge the start of the task execution
+ */
+ @RpcMethod
+ public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+ tasks.add(taskDeploymentDescriptor.getExecutionId());
+ return Acknowledge.get();
+ }
+
+ /**
+ * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
+ * the method throws an {@link Exception}.
+ *
+ * @param executionAttemptId Execution attempt ID identifying the task to be canceled.
+ * @return Acknowledge the task canceling
+ * @throws Exception if the task with the given execution attempt id could not be found
+ */
+ @RpcMethod
+ public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) throws Exception {
+ if (tasks.contains(executionAttemptId)) {
+ return Acknowledge.get();
+ } else {
+ throw new Exception("Could not find task.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
new file mode 100644
index 0000000..450423e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import scala.concurrent.Future;
+
+/**
+ * {@link TaskExecutor} rpc gateway interface
+ */
+public interface TaskExecutorGateway extends RpcGateway {
+ /**
+ * Execute the given task on the task executor. The task is described by the provided
+ * {@link TaskDeploymentDescriptor}.
+ *
+ * @param taskDeploymentDescriptor Descriptor for the task to be executed
+ * @return Future acknowledge of the start of the task execution
+ */
+ Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor);
+
+ /**
+ * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then
+ * the method throws an {@link Exception}.
+ *
+ * @param executionAttemptId Execution attempt ID identifying the task to be canceled.
+ * @return Future acknowledge of the task canceling
+ */
+ Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
new file mode 100644
index 0000000..0ded25e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.reflections.Reflections;
+import scala.concurrent.Future;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RpcCompletenessTest extends TestLogger {
+ private static final Class<?> futureClass = Future.class;
+
+ @Test
+ public void testRpcCompleteness() {
+ Reflections reflections = new Reflections("org.apache.flink");
+
+ Set<Class<? extends RpcEndpoint>> classes = reflections.getSubTypesOf(RpcEndpoint.class);
+
+ Class<? extends RpcEndpoint> c;
+
+ for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
+ c = rpcEndpoint;
+ Type superClass = c.getGenericSuperclass();
+
+ Class<?> rpcGatewayType = extractTypeParameter(superClass, 0);
+
+ if (rpcGatewayType != null) {
+ checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
+ } else {
+ fail("Could not retrieve the rpc gateway class for the given rpc endpoint class " + rpcEndpoint.getName());
+ }
+ }
+ }
+
+ private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
+ Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
+ Method[] serverMethods = rpcEndpoint.getDeclaredMethods();
+
+ Map<String, Set<Method>> rpcMethods = new HashMap<>();
+ Set<Method> unmatchedRpcMethods = new HashSet<>();
+
+ for (Method serverMethod : serverMethods) {
+ if (serverMethod.isAnnotationPresent(RpcMethod.class)) {
+ if (rpcMethods.containsKey(serverMethod.getName())) {
+ Set<Method> methods = rpcMethods.get(serverMethod.getName());
+ methods.add(serverMethod);
+
+ rpcMethods.put(serverMethod.getName(), methods);
+ } else {
+ Set<Method> methods = new HashSet<>();
+ methods.add(serverMethod);
+
+ rpcMethods.put(serverMethod.getName(), methods);
+ }
+
+ unmatchedRpcMethods.add(serverMethod);
+ }
+ }
+
+ for (Method gatewayMethod : gatewayMethods) {
+ assertTrue(
+ "The rpc endpoint " + rpcEndpoint.getName() + " does not contain a RpcMethod " +
+ "annotated method with the same name and signature " +
+ generateEndpointMethodSignature(gatewayMethod) + ".",
+ rpcMethods.containsKey(gatewayMethod.getName()));
+
+ checkGatewayMethod(gatewayMethod);
+
+ if (!matchGatewayMethodWithEndpoint(gatewayMethod, rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) {
+ fail("Could not find a RpcMethod annotated method in rpc endpoint " +
+ rpcEndpoint.getName() + " matching the rpc gateway method " +
+ generateEndpointMethodSignature(gatewayMethod) + " defined in the rpc gateway " +
+ rpcGateway.getName() + ".");
+ }
+ }
+
+ if (!unmatchedRpcMethods.isEmpty()) {
+ StringBuilder builder = new StringBuilder();
+
+ for (Method unmatchedRpcMethod : unmatchedRpcMethods) {
+ builder.append(unmatchedRpcMethod).append("\n");
+ }
+
+ fail("The rpc endpoint " + rpcEndpoint.getName() + " contains rpc methods which " +
+ "are not matched to gateway methods of " + rpcGateway.getName() + ":\n" +
+ builder.toString());
+ }
+ }
+
+ /**
+ * Checks whether the gateway method fulfills the gateway method requirements.
+ * <ul>
+ * <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li>
+ * <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
+ * </ul>
+ *
+ * @param gatewayMethod Gateway method to check
+ */
+ private void checkGatewayMethod(Method gatewayMethod) {
+ if (!gatewayMethod.getReturnType().equals(Void.TYPE)) {
+ assertTrue(
+ "The return type of method " + gatewayMethod.getName() + " in the rpc gateway " +
+ gatewayMethod.getDeclaringClass().getName() + " is non void and not a " +
+ "future. Non-void return types have to be returned as a future.",
+ gatewayMethod.getReturnType().equals(futureClass));
+ }
+
+ Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations();
+ int rpcTimeoutParameters = 0;
+
+ for (Annotation[] parameterAnnotation : parameterAnnotations) {
+ for (Annotation annotation : parameterAnnotation) {
+ if (annotation.equals(RpcTimeout.class)) {
+ rpcTimeoutParameters++;
+ }
+ }
+ }
+
+ assertTrue("The gateway method " + gatewayMethod + " must have at most one RpcTimeout " +
+ "annotated parameter.", rpcTimeoutParameters <= 1);
+ }
+
+ /**
+ * Checks whether we find a matching overloaded version for the gateway method among the methods
+ * with the same name in the rpc endpoint.
+ *
+ * @param gatewayMethod Gateway method
+ * @param endpointMethods Set of rpc methods on the rpc endpoint with the same name as the gateway
+ * method
+ * @param unmatchedRpcMethods Set of unmatched rpc methods on the endpoint side (so far)
+ */
+ private boolean matchGatewayMethodWithEndpoint(Method gatewayMethod, Set<Method> endpointMethods, Set<Method> unmatchedRpcMethods) {
+ for (Method endpointMethod : endpointMethods) {
+ if (checkMethod(gatewayMethod, endpointMethod)) {
+ unmatchedRpcMethods.remove(endpointMethod);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean checkMethod(Method gatewayMethod, Method endpointMethod) {
+ Class<?>[] gatewayParameterTypes = gatewayMethod.getParameterTypes();
+ Annotation[][] gatewayParameterAnnotations = gatewayMethod.getParameterAnnotations();
+
+ Class<?>[] endpointParameterTypes = endpointMethod.getParameterTypes();
+
+ List<Class<?>> filteredGatewayParameterTypes = new ArrayList<>();
+
+ assertEquals(gatewayParameterTypes.length, gatewayParameterAnnotations.length);
+
+ // filter out the RpcTimeout parameters
+ for (int i = 0; i < gatewayParameterTypes.length; i++) {
+ if (!isRpcTimeout(gatewayParameterAnnotations[i])) {
+ filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
+ }
+ }
+
+ if (filteredGatewayParameterTypes.size() != endpointParameterTypes.length) {
+ return false;
+ } else {
+ // check the parameter types
+ for (int i = 0; i < filteredGatewayParameterTypes.size(); i++) {
+ if (!checkType(filteredGatewayParameterTypes.get(i), endpointParameterTypes[i])) {
+ return false;
+ }
+ }
+
+ // check the return types
+ if (endpointMethod.getReturnType() == void.class) {
+ if (gatewayMethod.getReturnType() != void.class) {
+ return false;
+ }
+ } else {
+ // has return value. The gateway method should be wrapped in a future
+ Class<?> futureClass = gatewayMethod.getReturnType();
+
+ // sanity check that the return type of a gateway method must be void or a future
+ if (!futureClass.equals(RpcCompletenessTest.futureClass)) {
+ return false;
+ } else {
+ Class<?> valueClass = extractTypeParameter(futureClass, 0);
+
+ if (endpointMethod.getReturnType().equals(futureClass)) {
+ Class<?> rpcEndpointValueClass = extractTypeParameter(endpointMethod.getReturnType(), 0);
+
+ // check if we have the same future value types
+ if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) {
+ return false;
+ }
+ } else {
+ if (valueClass != null && !checkType(valueClass, endpointMethod.getReturnType())) {
+ return false;
+ }
+ }
+ }
+ }
+
+ return gatewayMethod.getName().equals(endpointMethod.getName());
+ }
+ }
+
+ private boolean checkType(Class<?> firstType, Class<?> secondType) {
+ return firstType.equals(secondType);
+ }
+
+ /**
+ * Generates from a gateway rpc method signature the corresponding rpc endpoint signature.
+ *
+ * For example the {@link RpcTimeout} annotation adds an additional parameter to the gateway
+ * signature which is not relevant on the server side.
+ *
+ * @param method Method to generate the signature string for
+ * @return String of the respective server side rpc method signature
+ */
+ private String generateEndpointMethodSignature(Method method) {
+ StringBuilder builder = new StringBuilder();
+
+ if (method.getReturnType().equals(Void.TYPE)) {
+ builder.append("void").append(" ");
+ } else if (method.getReturnType().equals(futureClass)) {
+ Class<?> valueClass = extractTypeParameter(method.getGenericReturnType(), 0);
+
+ builder
+ .append(futureClass.getSimpleName())
+ .append("<")
+ .append(valueClass != null ? valueClass.getSimpleName() : "")
+ .append(">");
+
+ if (valueClass != null) {
+ builder.append("/").append(valueClass.getSimpleName());
+ }
+
+ builder.append(" ");
+ } else {
+ return "Invalid rpc method signature.";
+ }
+
+ builder.append(method.getName()).append("(");
+
+ Class<?>[] parameterTypes = method.getParameterTypes();
+ Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+
+ assertEquals(parameterTypes.length, parameterAnnotations.length);
+
+ for (int i = 0; i < parameterTypes.length; i++) {
+ // filter out the RpcTimeout parameters
+ if (!isRpcTimeout(parameterAnnotations[i])) {
+ builder.append(parameterTypes[i].getName());
+
+ if (i < parameterTypes.length -1) {
+ builder.append(", ");
+ }
+ }
+ }
+
+ builder.append(")");
+
+ return builder.toString();
+ }
+
+ private Class<?> extractTypeParameter(Type genericType, int position) {
+ if (genericType instanceof ParameterizedType) {
+ ParameterizedType parameterizedType = (ParameterizedType) genericType;
+
+ Type[] typeArguments = parameterizedType.getActualTypeArguments();
+
+ if (position < 0 || position >= typeArguments.length) {
+ throw new IndexOutOfBoundsException("The generic type " +
+ parameterizedType.getRawType() + " only has " + typeArguments.length +
+ " type arguments.");
+ } else {
+ Type typeArgument = typeArguments[position];
+
+ if (typeArgument instanceof Class<?>) {
+ return (Class<?>) typeArgument;
+ } else {
+ return null;
+ }
+ }
+ } else {
+ return null;
+ }
+ }
+
+ private boolean isRpcTimeout(Annotation[] annotations) {
+ for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(RpcTimeout.class)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
new file mode 100644
index 0000000..c5bac94
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AkkaRpcServiceTest extends TestLogger {
+
+ /**
+ * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the
+ * {@link AkkaRpcService}.
+ */
+ @Test
+ public void testJobMasterResourceManagerRegistration() throws Exception {
+ Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
+ ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+ ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+ AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout);
+ AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout);
+ ExecutorService executorService = new ForkJoinPool();
+
+ ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
+ JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
+
+ resourceManager.start();
+
+ ResourceManagerGateway rm = resourceManager.getSelf();
+
+ assertTrue(rm instanceof AkkaGateway);
+
+ AkkaGateway akkaClient = (AkkaGateway) rm;
+
+ jobMaster.start();
+ jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef()));
+
+ // wait for successful registration
+ FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
+ Deadline deadline = timeout.fromNow();
+
+ while (deadline.hasTimeLeft() && !jobMaster.isConnected()) {
+ Thread.sleep(100);
+ }
+
+ assertFalse(deadline.isOverdue());
+
+ jobMaster.shutDown();
+ resourceManager.shutDown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
new file mode 100644
index 0000000..c143527
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class TaskExecutorTest extends TestLogger {
+
+ /**
+ * Tests that we can deploy and cancel a task on the TaskExecutor without exceptions
+ */
+ @Test
+ public void testTaskExecution() throws Exception {
+ RpcService testingRpcService = mock(RpcService.class);
+ DirectExecutorService directExecutorService = null;
+ TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+
+ TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+ new JobID(),
+ "Test job",
+ new JobVertexID(),
+ new ExecutionAttemptID(),
+ new SerializedValue<ExecutionConfig>(null),
+ "Test task",
+ 0,
+ 1,
+ 0,
+ new Configuration(),
+ new Configuration(),
+ "Invokable",
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0
+ );
+
+ Acknowledge ack = taskExecutor.executeTask(tdd);
+
+ ack = taskExecutor.cancelTask(tdd.getExecutionId());
+ }
+
+ /**
+ * Tests that cancelling a non-existing task will return an exception
+ */
+ @Test(expected=Exception.class)
+ public void testWrongTaskCancellation() throws Exception {
+ RpcService testingRpcService = mock(RpcService.class);
+ DirectExecutorService directExecutorService = null;
+ TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+
+ taskExecutor.cancelTask(new ExecutionAttemptID());
+
+ fail("The cancellation should have thrown an exception.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
new file mode 100644
index 0000000..1d7c971
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DirectExecutorService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class DirectExecutorService implements ExecutorService {
+ private boolean _shutdown = false;
+
+ @Override
+ public void shutdown() {
+ _shutdown = true;
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ _shutdown = true;
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return _shutdown;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return _shutdown;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return _shutdown;
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ try {
+ T result = task.call();
+
+ return new CompletedFuture<>(result, null);
+ } catch (Exception e) {
+ return new CompletedFuture<>(null, e);
+ }
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ task.run();
+
+ return new CompletedFuture<>(result, null);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ task.run();
+ return new CompletedFuture<>(null, null);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ ArrayList<Future<T>> result = new ArrayList<>();
+
+ for (Callable<T> task : tasks) {
+ try {
+ result.add(new CompletedFuture<T>(task.call(), null));
+ } catch (Exception e) {
+ result.add(new CompletedFuture<T>(null, e));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ long end = System.currentTimeMillis() + unit.toMillis(timeout);
+ Iterator<? extends Callable<T>> iterator = tasks.iterator();
+ ArrayList<Future<T>> result = new ArrayList<>();
+
+ while (end > System.currentTimeMillis() && iterator.hasNext()) {
+ Callable<T> callable = iterator.next();
+
+ try {
+ result.add(new CompletedFuture<T>(callable.call(), null));
+ } catch (Exception e) {
+ result.add(new CompletedFuture<T>(null, e));
+ }
+ }
+
+ while(iterator.hasNext()) {
+ iterator.next();
+ result.add(new Future<T>() {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return true;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ throw new CancellationException("Task has been cancelled.");
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new CancellationException("Task has been cancelled.");
+ }
+ });
+ }
+
+ return result;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ Exception exception = null;
+
+ for (Callable<T> task : tasks) {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ // try next task
+ exception = e;
+ }
+ }
+
+ throw new ExecutionException("No tasks finished successfully.", exception);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long end = System.currentTimeMillis() + unit.toMillis(timeout);
+ Exception exception = null;
+
+ Iterator<? extends Callable<T>> iterator = tasks.iterator();
+
+ while (end > System.currentTimeMillis() && iterator.hasNext()) {
+ Callable<T> callable = iterator.next();
+
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ // ignore exception and try next
+ exception = e;
+ }
+ }
+
+ if (iterator.hasNext()) {
+ throw new TimeoutException("Could not finish execution of tasks within time.");
+ } else {
+ throw new ExecutionException("No tasks finished successfully.", exception);
+ }
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ public static class CompletedFuture<V> implements Future<V> {
+ private final V value;
+ private final Exception exception;
+
+ public CompletedFuture(V value, Exception exception) {
+ this.value = value;
+ this.exception = exception;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ if (exception != null) {
+ throw new ExecutionException(exception);
+ } else {
+ return value;
+ }
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index b09db1f..3202a9f 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -202,7 +202,6 @@ under the License.
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
- <version>0.9.10</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/04bcb715/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b3148a..5d7cf91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -413,6 +413,13 @@ under the License.
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <version>0.9.10</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>