You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:16 UTC

[11/50] [abbrv] flink git commit: [FLINK-4443] [rpc] Add support for rpc gateway and rpc endpoint inheritance

[FLINK-4443] [rpc] Add support for rpc gateway and rpc endpoint inheritance

This commit extends the RpcCompletenessTest such that it can now check for inherited
remote procedure calls. All methods defined at the RpcGateway are considered native.
This means that they need no RpcEndpoint counterpart because they are implemented by
the RpcGateway implementation.

This closes #2401.

update comments

remove native method annotation

add line break


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2991cdb8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2991cdb8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2991cdb8

Branch: refs/heads/flip-6
Commit: 2991cdb8738cee467324cd4eb71741cd73295d48
Parents: b99085e
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Sun Aug 21 00:46:51 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:42 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/rpc/RpcMethod.java |  2 ++
 .../TestingHighAvailabilityServices.java        | 19 +++++++++++
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 33 ++++++++++++++++++--
 3 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2991cdb8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
index 875e557..e4b0e94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rpc;
 
 import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
@@ -29,6 +30,7 @@ import java.lang.annotation.Target;
  * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set of
  * gateway methods in the corresponding gateway implementation are identical.
  */
+@Inherited
 @Target(ElementType.METHOD)
 @Retention(RetentionPolicy.RUNTIME)
 public @interface RpcMethod {

http://git-wip-us.apache.org/repos/asf/flink/blob/2991cdb8/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3a9f943..4d654a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 /**
@@ -28,6 +30,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
 
+	private volatile LeaderElectionService jobMasterLeaderElectionService;
+
 
 	// ------------------------------------------------------------------------
 	//  Setters for mock / testing implementations
@@ -36,6 +40,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	public void setResourceManagerLeaderRetriever(LeaderRetrievalService resourceManagerLeaderRetriever) {
 		this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
 	}
+
+	public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) {
+		this.jobMasterLeaderElectionService = leaderElectionService;
+	}
 	
 	// ------------------------------------------------------------------------
 	//  HA Services Methods
@@ -50,4 +58,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 			throw new IllegalStateException("ResourceManagerLeaderRetriever has not been set");
 		}
 	}
+
+	@Override
+	public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+		LeaderElectionService service = jobMasterLeaderElectionService;
+
+		if (service != null) {
+			return service;
+		} else {
+			throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2991cdb8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index b8aad62..b431eb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -68,8 +68,8 @@ public class RpcCompletenessTest extends TestLogger {
 
 	@SuppressWarnings("rawtypes")
 	private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
-		Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
-		Method[] serverMethods = rpcEndpoint.getDeclaredMethods();
+		Method[] gatewayMethods = getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]);
+		Method[] serverMethods = rpcEndpoint.getMethods();
 
 		Map<String, Set<Method>> rpcMethods = new HashMap<>();
 		Set<Method> unmatchedRpcMethods = new HashSet<>();
@@ -340,4 +340,33 @@ public class RpcCompletenessTest extends TestLogger {
 			throw new RuntimeException("Could not retrive basic type information for primitive type " + primitveType + '.');
 		}
 	}
+
+	/**
+	 * Extract all rpc methods defined by the gateway interface
+	 *
+	 * @param interfaceClass the given rpc gateway interface
+	 * @return all methods defined by the given interface
+	 */
+	private List<Method> getRpcMethodsFromGateway(Class<? extends RpcGateway> interfaceClass) {
+		if(!interfaceClass.isInterface()) {
+			fail(interfaceClass.getName() + "is not a interface");
+		}
+
+		ArrayList<Method> allMethods = new ArrayList<>();
+		// Methods defined in RpcGateway are native method
+		if(interfaceClass.equals(RpcGateway.class)) {
+			return allMethods;
+		}
+
+		// Get all methods declared in current interface
+		for(Method method : interfaceClass.getDeclaredMethods()) {
+			allMethods.add(method);
+		}
+
+		// Get all method inherited from super interface
+		for(Class superClass : interfaceClass.getInterfaces()) {
+			allMethods.addAll(getRpcMethodsFromGateway(superClass));
+		}
+		return allMethods;
+	}
 }