You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/08/10 09:49:19 UTC

flink git commit: Re-enable RpcCompletenessTest again

Repository: flink
Updated Branches:
  refs/heads/flip-6 f1b45d320 -> 3ee29cecb


Re-enable RpcCompletenessTest again

The RpcCompletenessTest now checks more thoroughly if the RpcProtocol and the
corresponding RpcGateway define the same interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ee29cec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ee29cec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ee29cec

Branch: refs/heads/flip-6
Commit: 3ee29cecb1073c14f3289b579b521bbcf943bfb4
Parents: f1b45d3
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Aug 10 11:35:32 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Aug 10 11:35:32 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcTimeout.java    |  34 +++
 .../apache/flink/runtime/rpc/WithTimeout.java   |  36 ---
 .../resourcemanager/ResourceManagerGateway.java |   7 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 281 ++++++++++++++-----
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   6 +-
 .../rpc/taskexecutor/TaskExecutorTest.java      |   1 -
 6 files changed, 250 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ee29cec/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
new file mode 100644
index 0000000..3d36d47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for {@link RpcGateway} methods to specify an additional timeout parameter for the
+ * returned future to be completed. The rest of the provided parameters is passed to the remote rpc
+ * server for the rpc.
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcTimeout {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee29cec/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/WithTimeout.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/WithTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/WithTimeout.java
deleted file mode 100644
index 6cb9cd3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/WithTimeout.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation for {@link RpcGateway} methods to specify an additional timeout parameter for the
- * returned future to be completed. The annotation takes a parameter name which designates the
- * timeout parameter. The rest of the provided parameters is passed to the remote rpc server for
- * the rpc.
- */
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface WithTimeout {
-	String value();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee29cec/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
index 6a1c4e7..464a261 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.rpc.resourcemanager;
 
 import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.WithTimeout;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -36,8 +36,9 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param timeout Timeout for the future to complete
 	 * @return Future registration response
 	 */
-	@WithTimeout("timeout")
-	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration timeout);
+	Future<RegistrationResponse> registerJobMaster(
+		JobMasterRegistration jobMasterRegistration,
+		@RpcTimeout FiniteDuration timeout);
 
 	/**
 	 * Register a {@link JobMaster} at the resource manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee29cec/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 37563a3..948ee85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -19,16 +19,17 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.util.TestLogger;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.reflections.Reflections;
 import scala.concurrent.Future;
 
+import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,116 +39,167 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class RpcCompletenessTest extends TestLogger {
+	private static final Class<?> futureClass = Future.class;
 
-	@Ignore
 	@Test
 	public void testRpcCompleteness() {
 		Reflections reflections = new Reflections("org.apache.flink");
 
 		Set<Class<? extends RpcProtocol>> classes = reflections.getSubTypesOf(RpcProtocol.class);
 
-		Class<? extends RpcProtocol> c = null;
+		Class<? extends RpcProtocol> c;
 
-		for (Class<? extends RpcProtocol> rpcServer :classes){
-			c = rpcServer;
+		for (Class<? extends RpcProtocol> rpcProtocol :classes){
+			c = rpcProtocol;
 			Type superClass = c.getGenericSuperclass();
 
-			boolean foundRpcServerInterface = false;
+			Class<?> rpcGatewayType = extractTypeParameter(superClass, 0);
 
-			if (superClass instanceof ParameterizedType) {
-				ParameterizedType parameterizedType = (ParameterizedType) superClass;
-
-				if (parameterizedType.getRawType() == RpcProtocol.class) {
-					foundRpcServerInterface = true;
-					Type[] typeArguments = parameterizedType.getActualTypeArguments();
-
-					assertEquals(1, typeArguments.length);
-					assertTrue(typeArguments[0] instanceof Class<?>);
-
-					Type rpcGatewayType = typeArguments[0];
-
-					assertTrue(rpcGatewayType instanceof Class);
-
-					checkCompleteness(rpcServer, (Class<?>) rpcGatewayType);
-				}
+			if (rpcGatewayType != null) {
+				checkCompleteness(rpcProtocol, (Class<? extends RpcGateway>) rpcGatewayType);
+			} else {
+				fail("Could not retrieve the rpc gateway class for the given rpc protocol class " + rpcProtocol.getName());
 			}
-
-			assertTrue("The class " + rpcServer + " does not implement the " + RpcProtocol.class + " interface.", foundRpcServerInterface);
 		}
 	}
 
-	private void checkCompleteness(Class<?> rpcServer, Class<?> rpcGateway) {
+	private void checkCompleteness(Class<? extends RpcProtocol> rpcProtocol, Class<? extends RpcGateway> rpcGateway) {
 		Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
-		Method[] serverMethods = rpcServer.getDeclaredMethods();
+		Method[] serverMethods = rpcProtocol.getDeclaredMethods();
 
-		Map<String, List<Method>> rpcMethods = new HashMap<>();
-		int numberServerRpcMethods = 0;
+		Map<String, Set<Method>> rpcMethods = new HashMap<>();
+		Set<Method> unmatchedRpcMethods = new HashSet<>();
 
 		for (Method serverMethod : serverMethods) {
 			if (serverMethod.isAnnotationPresent(RpcMethod.class)) {
 				if (rpcMethods.containsKey(serverMethod.getName())) {
-					List<Method> methods = rpcMethods.get(serverMethod.getName());
+					Set<Method> methods = rpcMethods.get(serverMethod.getName());
 					methods.add(serverMethod);
 
 					rpcMethods.put(serverMethod.getName(), methods);
 				} else {
-					List<Method> methods = new ArrayList<>();
+					Set<Method> methods = new HashSet<>();
 					methods.add(serverMethod);
 
 					rpcMethods.put(serverMethod.getName(), methods);
 				}
 
-				numberServerRpcMethods++;
+				unmatchedRpcMethods.add(serverMethod);
 			}
 		}
 
-		assertEquals(
-			"Server class " + rpcServer + " does not have the same number of rpc methods than " +
-				"the gateway class " + rpcGateway ,
-			gatewayMethods.length,
-			numberServerRpcMethods);
-
 		for (Method gatewayMethod : gatewayMethods) {
-			assertTrue(rpcMethods.containsKey(gatewayMethod.getName()));
+			assertTrue(
+				"The rpc protocol " + rpcProtocol.getName() + " does not contain a RpcMethod " +
+					"annotated method with the same name and signature " +
+					generateProtocolMethodSignature(gatewayMethod) + ".",
+				rpcMethods.containsKey(gatewayMethod.getName()));
+
+			checkGatewayMethod(gatewayMethod);
+
+			if (!matchGatewayMethodWithProtocol(gatewayMethod, rpcMethods.get(gatewayMethod.getName()), unmatchedRpcMethods)) {
+				fail("Could not find a RpcMethod annotated method in rpc protocol " +
+					rpcProtocol.getName() + " matching the rpc gateway method " +
+					generateProtocolMethodSignature(gatewayMethod) + " defined in the rpc gateway " +
+					rpcGateway.getName() + ".");
+			}
+		}
+
+		if (!unmatchedRpcMethods.isEmpty()) {
+			StringBuilder builder = new StringBuilder();
 
-			checkGatewayMethod(gatewayMethod, rpcMethods.get(gatewayMethod.getName()));
+			for (Method unmatchedRpcMethod : unmatchedRpcMethods) {
+				builder.append(unmatchedRpcMethod).append("\n");
+			}
+
+			fail("The rpc protocol " + rpcProtocol.getName() + " contains rpc methods which " +
+				"are not matched to gateway methods of " + rpcGateway.getName() + ":\n" +
+				builder.toString());
 		}
 	}
 
 	/**
+	 * Checks whether the gateway method fulfills the gateway method requirements.
+	 * <ul>
+	 *     <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li>
+	 *     <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
+	 * </ul>
+	 *
+	 * @param gatewayMethod Gateway method to check
+	 */
+	private void checkGatewayMethod(Method gatewayMethod) {
+		if (!gatewayMethod.getReturnType().equals(Void.TYPE)) {
+			assertTrue(
+				"The return type of method " + gatewayMethod.getName() + " in the rpc gateway " +
+					gatewayMethod.getDeclaringClass().getName() + " is non void and not a " +
+					"future. Non-void return types have to be returned as a future.",
+				gatewayMethod.getReturnType().equals(futureClass));
+		}
+
+		Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations();
+		int rpcTimeoutParameters = 0;
+
+		for (Annotation[] parameterAnnotation : parameterAnnotations) {
+			for (Annotation annotation : parameterAnnotation) {
+				if (annotation.equals(RpcTimeout.class)) {
+					rpcTimeoutParameters++;
+				}
+			}
+		}
+
+		assertTrue("The gateway method " + gatewayMethod + " must have at most one RpcTimeout " +
+			"annotated parameter.", rpcTimeoutParameters <= 1);
+	}
+
+	/**
 	 * Checks whether we find a matching overloaded version for the gateway method among the methods
-	 * with the same name in the rpc server.
+	 * with the same name in the rpc protocol.
 	 *
 	 * @param gatewayMethod Gateway method
-	 * @param rpcMethods List of rpc methods on the rpc server with the same name as the gateway
+	 * @param protocolMethods Set of rpc methods on the rpc server with the same name as the gateway
 	 *                   method
+	 * @param unmatchedRpcMethods Set of unmatched rpc methods on the protocol side (so far)
 	 */
-	private void checkGatewayMethod(Method gatewayMethod, List<Method> rpcMethods) {
-		for (Method rpcMethod : rpcMethods) {
-			if (checkMethod(gatewayMethod, rpcMethod)) {
-				return;
+	private boolean matchGatewayMethodWithProtocol(Method gatewayMethod, Set<Method> protocolMethods, Set<Method> unmatchedRpcMethods) {
+		for (Method protocolMethod : protocolMethods) {
+			if (checkMethod(gatewayMethod, protocolMethod)) {
+				unmatchedRpcMethods.remove(protocolMethod);
+				return true;
 			}
 		}
 
-		fail("Could not find rpc method which is compatible to " + gatewayMethod);
+		return false;
 	}
 
-	private boolean checkMethod(Method gatewayMethod, Method rpcMethod) {
-		Class<?>[] firstParameterTypes = gatewayMethod.getParameterTypes();
-		Class<?>[] secondParameterTypes = rpcMethod.getParameterTypes();
+	private boolean checkMethod(Method gatewayMethod, Method protocolMethod) {
+		Class<?>[] gatewayParameterTypes = gatewayMethod.getParameterTypes();
+		Annotation[][] gatewayParameterAnnotations = gatewayMethod.getParameterAnnotations();
+
+		Class<?>[] protocolParameterTypes = protocolMethod.getParameterTypes();
+
+		List<Class<?>> filteredGatewayParameterTypes = new ArrayList<>();
 
-		if (firstParameterTypes.length != secondParameterTypes.length) {
+		assertEquals(gatewayParameterTypes.length, gatewayParameterAnnotations.length);
+
+		// filter out the RpcTimeout parameters
+		for (int i = 0; i < gatewayParameterTypes.length; i++) {
+			if (!isRpcTimeout(gatewayParameterAnnotations[i])) {
+				filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
+			}
+		}
+
+		if (filteredGatewayParameterTypes.size() != protocolParameterTypes.length) {
 			return false;
 		} else {
 			// check the parameter types
-			for (int i = 0; i < firstParameterTypes.length; i++) {
-				if (!checkType(firstParameterTypes[i], secondParameterTypes[i])) {
+			for (int i = 0; i < filteredGatewayParameterTypes.size(); i++) {
+				if (!checkType(filteredGatewayParameterTypes.get(i), protocolParameterTypes[i])) {
 					return false;
 				}
 			}
 
 			// check the return types
-			if (rpcMethod.getReturnType() == void.class) {
+			if (protocolMethod.getReturnType() == void.class) {
 				if (gatewayMethod.getReturnType() != void.class) {
 					return false;
 				}
@@ -155,40 +207,121 @@ public class RpcCompletenessTest extends TestLogger {
 				// has return value. The gateway method should be wrapped in a future
 				Class<?> futureClass = gatewayMethod.getReturnType();
 
-				if (futureClass != Future.class) {
+				// sanity check that the return type of a gateway method must be void or a future
+				if (!futureClass.equals(RpcCompletenessTest.futureClass)) {
 					return false;
-				}
-
-				Type futureType = gatewayMethod.getGenericReturnType();
-
-				if (futureType instanceof ParameterizedType) {
-					ParameterizedType parameterizedType = (ParameterizedType) futureType;
-
-					Type[] typeArguments = parameterizedType.getActualTypeArguments();
+				} else {
+					Class<?> valueClass = extractTypeParameter(futureClass, 0);
 
-					// check that we only have one type argument
-					if (typeArguments.length == 1) {
-						Type typeArgument = typeArguments[0];
+					if (protocolMethod.getReturnType().equals(futureClass)) {
+						Class<?> rpcProtocolValueClass = extractTypeParameter(protocolMethod.getReturnType(), 0);
 
-						// check that the type argument is a Class
-						if (typeArgument instanceof Class<?>) {
-							if (!checkType((Class<?>) typeArgument, rpcMethod.getReturnType())) {
-								return false;
-							}
+						// check if we have the same future value types
+						if (valueClass != null && rpcProtocolValueClass != null && !checkType(valueClass, rpcProtocolValueClass)) {
+							return false;
 						}
 					} else {
-						return false;
+						if (valueClass != null && !checkType(valueClass, protocolMethod.getReturnType())) {
+							return false;
+						}
 					}
 				}
+			}
+
+			return gatewayMethod.getName().equals(protocolMethod.getName());
+		}
+	}
 
+	private boolean checkType(Class<?> firstType, Class<?> secondType) {
+		return firstType.equals(secondType);
+	}
 
+	/**
+	 * Generates from a gateway rpc method signature the corresponding rpc protocol signature.
+	 *
+	 * For example the {@link RpcTimeout} annotation adds an additional parameter to the gateway
+	 * signature which is not relevant on the server side.
+	 *
+	 * @param method Method to generate the signature string for
+	 * @return String of the respective server side rpc method signature
+	 */
+	private String generateProtocolMethodSignature(Method method) {
+		StringBuilder builder = new StringBuilder();
+
+		if (method.getReturnType().equals(Void.TYPE)) {
+			builder.append("void").append(" ");
+		} else if (method.getReturnType().equals(futureClass)) {
+			Class<?> valueClass = extractTypeParameter(method.getGenericReturnType(), 0);
+
+			builder
+				.append(futureClass.getSimpleName())
+				.append("<")
+				.append(valueClass != null ? valueClass.getSimpleName() : "")
+				.append(">");
+
+			if (valueClass != null) {
+				builder.append("/").append(valueClass.getSimpleName());
 			}
 
-			return gatewayMethod.getName().equals(rpcMethod.getName());
+			builder.append(" ");
+		} else {
+			return "Invalid rpc method signature.";
 		}
+
+		builder.append(method.getName()).append("(");
+
+		Class<?>[] parameterTypes = method.getParameterTypes();
+		Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+
+		assertEquals(parameterTypes.length, parameterAnnotations.length);
+
+		for (int i = 0; i < parameterTypes.length; i++) {
+			// filter out the RpcTimeout parameters
+			if (!isRpcTimeout(parameterAnnotations[i])) {
+				builder.append(parameterTypes[i].getName());
+
+				if (i < parameterTypes.length -1) {
+					builder.append(", ");
+				}
+			}
+		}
+
+		builder.append(")");
+
+		return builder.toString();
 	}
 
-	private boolean checkType(Class<?> firstType, Class<?> secondType) {
-		return firstType == secondType;
+	private Class<?> extractTypeParameter(Type genericType, int position) {
+		if (genericType instanceof ParameterizedType) {
+			ParameterizedType parameterizedType = (ParameterizedType) genericType;
+
+			Type[] typeArguments = parameterizedType.getActualTypeArguments();
+
+			if (position < 0 || position >= typeArguments.length) {
+				throw new IndexOutOfBoundsException("The generic type " +
+					parameterizedType.getRawType() + " only has " + typeArguments.length +
+					" type arguments.");
+			} else {
+				Type typeArgument = typeArguments[position];
+
+				if (typeArgument instanceof Class<?>) {
+					return (Class<?>) typeArgument;
+				} else {
+					return null;
+				}
+			}
+		} else {
+			return null;
+		}
+	}
+
+	private boolean isRpcTimeout(Annotation[] annotations) {
+		for (Annotation annotation : annotations) {
+			if (annotation.annotationType().equals(RpcTimeout.class)) {
+				return true;
+			}
+		}
+
+		return false;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee29cec/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index c0b01f4..c5bac94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -38,8 +38,12 @@ import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {
 
+	/**
+	 * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the
+	 * {@link AkkaRpcService}.
+	 */
 	@Test
-	public void testAkkaRpcService() throws Exception {
+	public void testJobMasterResourceManagerRegistration() throws Exception {
 		Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
 		ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();

http://git-wip-us.apache.org/repos/asf/flink/blob/3ee29cec/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index 8e5c154..c143527 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -78,7 +78,6 @@ public class TaskExecutorTest extends TestLogger {
 
 	/**
 	 * Tests that cancelling a non-existing task will return an exception
-	 * @throws Exception
 	 */
 	@Test(expected=Exception.class)
 	public void testWrongTaskCancellation() throws Exception {