You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/06 21:58:04 UTC

flink git commit: [FLINK-7116] [rpc] Add getPort to RpcService

Repository: flink
Updated Branches:
  refs/heads/master 10156c9fc -> 12efc553c


[FLINK-7116] [rpc] Add getPort to RpcService

The RpcService should expose its port it is bound to. That way it is easier to connect to a
remote RpcService.

This closes #4275.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12efc553
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12efc553
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12efc553

Branch: refs/heads/master
Commit: 12efc553c6e1ee88f4234a41e2ac83ce65a2cc4c
Parents: 10156c9
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 6 23:43:16 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 6 23:57:21 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/rpc/RpcService.java   |  8 ++++++++
 .../apache/flink/runtime/rpc/akka/AkkaRpcService.java   | 12 ++++++++++++
 .../flink/runtime/rpc/TestingSerialRpcService.java      |  5 +++++
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java      |  5 +++++
 4 files changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12efc553/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 93d345d..51b7ca2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -43,6 +43,14 @@ public interface RpcService {
 	String getAddress();
 
 	/**
+	 * Return the port under which the rpc service is reachable. If the rpc service cannot be
+	 * contacted remotely, then it will return -1.
+	 *
+	 * @return Port of the rpc service or -1 if local rpc service
+	 */
+	int getPort();
+
+	/**
 	 * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
 	 * be used to communicate with the rpc server. If the connection failed, then the returned
 	 * future is failed with a {@link RpcConnectionException}.

http://git-wip-us.apache.org/repos/asf/flink/blob/12efc553/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index d182b03..e17364f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -87,6 +87,7 @@ public class AkkaRpcService implements RpcService {
 	private final long maximumFramesize;
 
 	private final String address;
+	private final int port;
 
 	private final ScheduledExecutor internalScheduledExecutor;
 
@@ -111,6 +112,12 @@ public class AkkaRpcService implements RpcService {
 			address = "";
 		}
 
+		if (actorSystemAddress.port().isDefined()) {
+			port = (Integer) actorSystemAddress.port().get();
+		} else {
+			port = -1;
+		}
+
 		internalScheduledExecutor = new InternalScheduledExecutorImpl(actorSystem);
 	}
 
@@ -119,6 +126,11 @@ public class AkkaRpcService implements RpcService {
 		return address;
 	}
 
+	@Override
+	public int getPort() {
+		return port;
+	}
+
 	// this method does not mutate state and is thus thread-safe
 	@Override
 	public <C extends RpcGateway> Future<C> connect(final String address, final Class<C> clazz) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12efc553/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 25156e8..ac3f40b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -173,6 +173,11 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
+	public int getPort() {
+		return -1;
+	}
+
+	@Override
 	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12efc553/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 46359da..42f63ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -129,6 +129,11 @@ public class AkkaRpcServiceTest extends TestLogger {
 		assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress());
 	}
 
+	@Test
+	public void testGetPort() {
+		assertEquals(AkkaUtils.getAddress(actorSystem).port().get(), akkaRpcService.getPort());
+	}
+
 	/**
 	 * Tests that we can wait for the termination of the rpc service
 	 *