You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:49:01 UTC
[41/50] [abbrv] flink git commit: [FLINK-4703] RpcCompletenessTest:
Add support for type arguments and subclasses
[FLINK-4703] RpcCompletenessTest: Add support for type arguments and subclasses
This closes #2561
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f699c421
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f699c421
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f699c421
Branch: refs/heads/flip-6
Commit: f699c4215604dfde384839379b8f76af06e01163
Parents: 406dccb
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 28 12:39:30 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:45 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rpc/RpcEndpoint.java | 23 +++++-
.../flink/runtime/rpc/RpcCompletenessTest.java | 80 ++++++++++++++++++--
2 files changed, 94 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f699c421/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 4e5e49a..79961f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -85,9 +85,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
// IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer
// requires that selfGatewayType has been initialized
- this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass());
+ this.selfGatewayType = determineSelfGatewayType();
this.self = rpcService.startServer(this);
-
+
this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self);
}
@@ -255,4 +255,23 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
gateway.runAsync(runnable);
}
}
+
+ /**
+ * Determines the self gateway type specified in one of the subclasses which extend this class.
+ * May traverse multiple class hierarchies until a Gateway type is found as a first type argument.
+ * @return Class<C> The determined self gateway type
+ */
+ private Class<C> determineSelfGatewayType() {
+
+ // determine self gateway type
+ Class c = getClass();
+ Class<C> determinedSelfGatewayType;
+ do {
+ determinedSelfGatewayType = ReflectionUtil.getTemplateType1(c);
+ // check if super class contains self gateway type in next loop
+ c = c.getSuperclass();
+ } while (!RpcGateway.class.isAssignableFrom(determinedSelfGatewayType));
+
+ return determinedSelfGatewayType;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f699c421/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 53355e8..e7143ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -26,9 +26,14 @@ import org.apache.flink.util.ReflectionUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -41,8 +46,33 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Test which ensures that all classes of subtype {@link RpcEndpoint} implement
+ * the methods specified in the generic gateway type argument.
+ *
+ * {@code
+ * RpcEndpoint<GatewayTypeParameter extends RpcGateway>
+ * }
+ *
+ * Note, that the class hierarchy can also be nested. In this case the type argument
+ * always has to be the first argument, e.g. {@code
+ *
+ * // RpcClass needs to implement RpcGatewayClass' methods
+ * RpcClass extends RpcEndpoint<RpcGatewayClass>
+ *
+ * // RpcClass2 or its subclass needs to implement RpcGatewayClass' methods
+ * RpcClass<GatewayTypeParameter extends RpcGateway,...> extends RpcEndpoint<GatewayTypeParameter>
+ * RpcClass2 extends RpcClass<RpcGatewayClass,...>
+ *
+ * // needless to say, this can even be nested further
+ * ...
+ * }
+ *
+ */
public class RpcCompletenessTest extends TestLogger {
+ private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class);
+
private static final Class<?> futureClass = Future.class;
private static final Class<?> timeoutClass = Time.class;
@@ -55,16 +85,52 @@ public class RpcCompletenessTest extends TestLogger {
Class<? extends RpcEndpoint> c;
- for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
+ mainloop:
+ for (Class<? extends RpcEndpoint> rpcEndpoint : classes) {
c = rpcEndpoint;
- Class<?> rpcGatewayType = ReflectionUtil.getTemplateType1(c);
+ LOG.debug("-------------");
+ LOG.debug("c: {}", c);
- if (rpcGatewayType != null) {
- checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
- } else {
- fail("Could not retrieve the rpc gateway class for the given rpc endpoint class " + rpcEndpoint.getName());
+ // skip abstract classes
+ if (Modifier.isAbstract(c.getModifiers())) {
+ LOG.debug("Skipping abstract class");
+ continue;
}
+
+ // check for type parameter bound to RpcGateway
+ // skip if one is found because a subclass will provide the concrete argument
+ TypeVariable<? extends Class<? extends RpcEndpoint>>[] typeParameters = c.getTypeParameters();
+ LOG.debug("Checking {} parameters.", typeParameters.length);
+ for (int i = 0; i < typeParameters.length; i++) {
+ for (Type bound : typeParameters[i].getBounds()) {
+ LOG.debug("checking bound {} of type parameter {}", bound, typeParameters[i]);
+ if (bound.toString().equals("interface " + RpcGateway.class.getName())) {
+ if (i > 0) {
+ fail("Type parameter for RpcGateway should come first in " + c);
+ }
+ LOG.debug("Skipping class with type parameter bound to RpcGateway.");
+ // Type parameter is bound to RpcGateway which a subclass will provide
+ continue mainloop;
+ }
+ }
+ }
+
+ // check if this class or any super class contains the RpcGateway argument
+ Class<?> rpcGatewayType;
+ do {
+ LOG.debug("checking type argument of class: {}", c);
+ rpcGatewayType = ReflectionUtil.getTemplateType1(c);
+ LOG.debug("type argument is: {}", rpcGatewayType);
+
+ c = (Class<? extends RpcEndpoint>) c.getSuperclass();
+
+ } while (!RpcGateway.class.isAssignableFrom(rpcGatewayType));
+
+ LOG.debug("Checking RRC completeness of endpoint '{}' with gateway '{}'",
+ rpcEndpoint.getSimpleName(), rpcGatewayType.getSimpleName());
+
+ checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
}
}
@@ -352,7 +418,7 @@ public class RpcCompletenessTest extends TestLogger {
*/
private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass) {
if(!interfaceClass.isInterface()) {
- fail(interfaceClass.getName() + "is not a interface");
+ fail(interfaceClass.getName() + " is not a interface");
}
ArrayList<Method> allMethods = new ArrayList<>();