You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:49:01 UTC

[41/50] [abbrv] flink git commit: [FLINK-4703] RpcCompletenessTest: Add support for type arguments and subclasses

[FLINK-4703] RpcCompletenessTest: Add support for type arguments and subclasses

This closes #2561


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f699c421
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f699c421
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f699c421

Branch: refs/heads/flip-6
Commit: f699c4215604dfde384839379b8f76af06e01163
Parents: 406dccb
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 28 12:39:30 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:45 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 23 +++++-
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 80 ++++++++++++++++++--
 2 files changed, 94 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f699c421/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 4e5e49a..79961f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -85,9 +85,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 
 		// IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer
 		// requires that selfGatewayType has been initialized
-		this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass());
+		this.selfGatewayType = determineSelfGatewayType();
 		this.self = rpcService.startServer(this);
-		
+
 		this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self);
 	}
 
@@ -255,4 +255,23 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 			gateway.runAsync(runnable);
 		}
 	}
+
+	/**
+	 * Determines the self gateway type specified in one of the subclasses which extend this class.
+	 * May traverse multiple class hierarchies until a Gateway type is found as a first type argument.
+	 * @return Class<C> The determined self gateway type
+	 */
+	private Class<C> determineSelfGatewayType() {
+
+		// determine self gateway type
+		Class c = getClass();
+		Class<C> determinedSelfGatewayType;
+		do {
+			determinedSelfGatewayType = ReflectionUtil.getTemplateType1(c);
+			// check if super class contains self gateway type in next loop
+			c = c.getSuperclass();
+		} while (!RpcGateway.class.isAssignableFrom(determinedSelfGatewayType));
+
+		return determinedSelfGatewayType;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f699c421/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 53355e8..e7143ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -26,9 +26,14 @@ import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,8 +46,33 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Test which ensures that all classes of subtype {@link RpcEndpoint} implement
+ * the methods specified in the generic gateway type argument.
+ *
+ * {@code
+ * 	    RpcEndpoint<GatewayTypeParameter extends RpcGateway>
+ * }
+ *
+ * Note, that the class hierarchy can also be nested. In this case the type argument
+ * always has to be the first argument, e.g. {@code
+ *
+ * 	    // RpcClass needs to implement RpcGatewayClass' methods
+ * 	    RpcClass extends RpcEndpoint<RpcGatewayClass>
+ *
+ * 	    // RpcClass2 or its subclass needs to implement RpcGatewayClass' methods
+ *      RpcClass<GatewayTypeParameter extends RpcGateway,...> extends RpcEndpoint<GatewayTypeParameter>
+ *      RpcClass2 extends RpcClass<RpcGatewayClass,...>
+ *
+ *      // needless to say, this can even be nested further
+ *      ...
+ * }
+ *
+ */
 public class RpcCompletenessTest extends TestLogger {
 
+	private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class);
+
 	private static final Class<?> futureClass = Future.class;
 	private static final Class<?> timeoutClass = Time.class;
 
@@ -55,16 +85,52 @@ public class RpcCompletenessTest extends TestLogger {
 
 		Class<? extends RpcEndpoint> c;
 
-		for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
+		mainloop:
+		for (Class<? extends RpcEndpoint> rpcEndpoint : classes) {
 			c = rpcEndpoint;
 
-			Class<?> rpcGatewayType = ReflectionUtil.getTemplateType1(c);
+			LOG.debug("-------------");
+			LOG.debug("c: {}", c);
 
-			if (rpcGatewayType != null) {
-				checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
-			} else {
-				fail("Could not retrieve the rpc gateway class for the given rpc endpoint class " + rpcEndpoint.getName());
+			// skip abstract classes
+			if (Modifier.isAbstract(c.getModifiers())) {
+				LOG.debug("Skipping abstract class");
+				continue;
 			}
+
+			// check for type parameter bound to RpcGateway
+			// skip if one is found because a subclass will provide the concrete argument
+			TypeVariable<? extends Class<? extends RpcEndpoint>>[] typeParameters = c.getTypeParameters();
+			LOG.debug("Checking {} parameters.", typeParameters.length);
+			for (int i = 0; i < typeParameters.length; i++) {
+				for (Type bound : typeParameters[i].getBounds()) {
+					LOG.debug("checking bound {} of type parameter {}", bound, typeParameters[i]);
+					if (bound.toString().equals("interface " + RpcGateway.class.getName())) {
+						if (i > 0) {
+							fail("Type parameter for RpcGateway should come first in " + c);
+						}
+						LOG.debug("Skipping class with type parameter bound to RpcGateway.");
+						// Type parameter is bound to RpcGateway which a subclass will provide
+						continue mainloop;
+					}
+				}
+			}
+
+			// check if this class or any super class contains the RpcGateway argument
+			Class<?> rpcGatewayType;
+			do {
+				LOG.debug("checking type argument of class: {}", c);
+				rpcGatewayType = ReflectionUtil.getTemplateType1(c);
+				LOG.debug("type argument is: {}", rpcGatewayType);
+
+				c = (Class<? extends RpcEndpoint>) c.getSuperclass();
+
+			} while (!RpcGateway.class.isAssignableFrom(rpcGatewayType));
+
+			LOG.debug("Checking RRC completeness of endpoint '{}' with gateway '{}'",
+				rpcEndpoint.getSimpleName(), rpcGatewayType.getSimpleName());
+
+			checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
 		}
 	}
 
@@ -352,7 +418,7 @@ public class RpcCompletenessTest extends TestLogger {
 	 */
 	private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass) {
 		if(!interfaceClass.isInterface()) {
-			fail(interfaceClass.getName() + "is not a interface");
+			fail(interfaceClass.getName() + " is not a interface");
 		}
 
 		ArrayList<Method> allMethods = new ArrayList<>();