You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/06 21:58:04 UTC
flink git commit: [FLINK-7116] [rpc] Add getPort to RpcService
Repository: flink
Updated Branches:
refs/heads/master 10156c9fc -> 12efc553c
[FLINK-7116] [rpc] Add getPort to RpcService
The RpcService should expose its port it is bound to. That way it is easier to connect to a
remote RpcService.
This closes #4275.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12efc553
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12efc553
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12efc553
Branch: refs/heads/master
Commit: 12efc553c6e1ee88f4234a41e2ac83ce65a2cc4c
Parents: 10156c9
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 6 23:43:16 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jul 6 23:57:21 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/rpc/RpcService.java | 8 ++++++++
.../apache/flink/runtime/rpc/akka/AkkaRpcService.java | 12 ++++++++++++
.../flink/runtime/rpc/TestingSerialRpcService.java | 5 +++++
.../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 5 +++++
4 files changed, 30 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/12efc553/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 93d345d..51b7ca2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -43,6 +43,14 @@ public interface RpcService {
String getAddress();
/**
+ * Return the port under which the rpc service is reachable. If the rpc service cannot be
+ * contacted remotely, then it will return -1.
+ *
+ * @return Port of the rpc service or -1 if local rpc service
+ */
+ int getPort();
+
+ /**
* Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
* be used to communicate with the rpc server. If the connection failed, then the returned
* future is failed with a {@link RpcConnectionException}.
http://git-wip-us.apache.org/repos/asf/flink/blob/12efc553/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index d182b03..e17364f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -87,6 +87,7 @@ public class AkkaRpcService implements RpcService {
private final long maximumFramesize;
private final String address;
+ private final int port;
private final ScheduledExecutor internalScheduledExecutor;
@@ -111,6 +112,12 @@ public class AkkaRpcService implements RpcService {
address = "";
}
+ if (actorSystemAddress.port().isDefined()) {
+ port = (Integer) actorSystemAddress.port().get();
+ } else {
+ port = -1;
+ }
+
internalScheduledExecutor = new InternalScheduledExecutorImpl(actorSystem);
}
@@ -119,6 +126,11 @@ public class AkkaRpcService implements RpcService {
return address;
}
+ @Override
+ public int getPort() {
+ return port;
+ }
+
// this method does not mutate state and is thus thread-safe
@Override
public <C extends RpcGateway> Future<C> connect(final String address, final Class<C> clazz) {
http://git-wip-us.apache.org/repos/asf/flink/blob/12efc553/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 25156e8..ac3f40b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -173,6 +173,11 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
+ public int getPort() {
+ return -1;
+ }
+
+ @Override
public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
RpcGateway gateway = registeredConnections.get(address);
http://git-wip-us.apache.org/repos/asf/flink/blob/12efc553/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 46359da..42f63ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -129,6 +129,11 @@ public class AkkaRpcServiceTest extends TestLogger {
assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress());
}
+ @Test
+ public void testGetPort() {
+ assertEquals(AkkaUtils.getAddress(actorSystem).port().get(), akkaRpcService.getPort());
+ }
+
/**
* Tests that we can wait for the termination of the rpc service
*