You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:24 UTC

[04/50] [abbrv] flink git commit: [FLINK-4414] [cluster] Add getAddress method to RpcGateway

[FLINK-4414] [cluster] Add getAddress method to RpcGateway

The RpcGateway.getAddress method allows to retrieve the fully qualified address of the
associated RpcEndpoint.

This closes #2392.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ecd52640
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ecd52640
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ecd52640

Branch: refs/heads/flip-6
Commit: ecd5264023e959f35eb83fba4dfb392a8549db21
Parents: b12e66b
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 18 16:34:47 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:38 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  6 +-----
 .../apache/flink/runtime/rpc/RpcGateway.java    |  7 +++++++
 .../apache/flink/runtime/rpc/RpcService.java    | 11 ----------
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 14 +++++++++++--
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 21 ++++++--------------
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 16 +++++++++++++++
 6 files changed, 42 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ecd52640/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index a28bc14..7b3f8a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -69,9 +69,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	/** Self gateway which can be used to schedule asynchronous calls on yourself */
 	private final C self;
 
-	/** the fully qualified address of the this RPC endpoint */
-	private final String selfAddress;
-
 	/** The main thread execution context to be used to execute future callbacks in the main thread
 	 * of the executing rpc server. */
 	private final ExecutionContext mainThreadExecutionContext;
@@ -92,7 +89,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 		this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass());
 		this.self = rpcService.startServer(this);
 		
-		this.selfAddress = rpcService.getAddress(self);
 		this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
 	}
 
@@ -156,7 +152,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * @return Fully qualified address of the underlying RPC endpoint
 	 */
 	public String getAddress() {
-		return selfAddress;
+		return self.getAddress();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ecd52640/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
index e3a16b4..81075ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -22,4 +22,11 @@ package org.apache.flink.runtime.rpc;
  * Rpc gateway interface which has to be implemented by Rpc gateways.
  */
 public interface RpcGateway {
+
+	/**
+	 * Returns the fully qualified address under which the associated rpc endpoint is reachable.
+	 *
+	 * @return Fully qualified address under which the associated rpc endpoint is reachable
+	 */
+	String getAddress();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecd52640/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index fabdb05..bc0f5cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -65,17 +65,6 @@ public interface RpcService {
 	void stopService();
 
 	/**
-	 * Get the fully qualified address of the underlying rpc server represented by the self gateway.
-	 * It must be possible to connect from a remote host to the rpc server via the returned fully
-	 * qualified address.
-	 *
-	 * @param selfGateway Self gateway associated with the underlying rpc server
-	 * @param <C> Type of the rpc gateway
-	 * @return Fully qualified address
-	 */
-	<C extends RpcGateway> String getAddress(C selfGateway);
-
-	/**
 	 * Gets the execution context, provided by this RPC service. This execution
 	 * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)}
 	 * methods of Futures.

http://git-wip-us.apache.org/repos/asf/flink/blob/ecd52640/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 524bf74..bfa04f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -23,6 +23,7 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
@@ -55,6 +56,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable {
 	private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
 
+	private final String address;
+
 	private final ActorRef rpcEndpoint;
 
 	// whether the actor ref is local and thus no message serialization is needed
@@ -65,7 +68,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 
 	private final long maximumFramesize;
 
-	AkkaInvocationHandler(ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) {
+	AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) {
+		this.address = Preconditions.checkNotNull(address);
 		this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
 		this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
 		this.timeout = Preconditions.checkNotNull(timeout);
@@ -79,7 +83,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 		Object result;
 
 		if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) ||
-			declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class)) {
+			declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
+			declaringClass.equals(RpcGateway.class)) {
 			result = method.invoke(this, args);
 		} else {
 			String methodName = method.getName();
@@ -290,4 +295,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 
 		return false;
 	}
+
+	@Override
+	public String getAddress() {
+		return address;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecd52640/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index d987c2f..00a6932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -102,7 +102,9 @@ public class AkkaRpcService implements RpcService {
 			public C apply(Object obj) {
 				ActorRef actorRef = ((ActorIdentity) obj).getRef();
 
-				InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout, maximumFramesize);
+				final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+				InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
 
 				// Rather than using the System ClassLoader directly, we derive the ClassLoader
 				// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -135,7 +137,9 @@ public class AkkaRpcService implements RpcService {
 
 		LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
 
-		InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout, maximumFramesize);
+		final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+		InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
 
 		// Rather than using the System ClassLoader directly, we derive the ClassLoader
 		// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -197,19 +201,6 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
-	public String getAddress(RpcGateway selfGateway) {
-		checkState(!stopped, "RpcService is stopped");
-
-		if (selfGateway instanceof AkkaGateway) {
-			ActorRef actorRef = ((AkkaGateway) selfGateway).getRpcEndpoint();
-			return AkkaUtils.getAkkaURL(actorSystem, actorRef);
-		} else {
-			String className = AkkaGateway.class.getName();
-			throw new IllegalArgumentException("Cannot get address for non " + className + '.');
-		}
-	}
-
-	@Override
 	public ExecutionContext getExecutionContext() {
 		return actorSystem.dispatcher();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecd52640/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 1653fac..82d13f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -34,6 +34,7 @@ import scala.concurrent.Future;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 public class AkkaRpcActorTest extends TestLogger {
@@ -57,6 +58,21 @@ public class AkkaRpcActorTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that the rpc endpoint and the associated rpc gateway have the same addresses.
+	 * @throws Exception
+	 */
+	@Test
+	public void testAddressResolution() throws Exception {
+		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
+
+		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
+
+		DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, timeout.duration());
+
+		assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress());
+	}
+
+	/**
 	 * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding
 	 * {@link RpcEndpoint} has been started.
 	 */