You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:21:52 UTC
[12/50] [abbrv] flink git commit: [FLINK-4687] [rpc] Add getAddress
to RpcService
[FLINK-4687] [rpc] Add getAddress to RpcService
This closes #2551.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09d6384e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09d6384e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09d6384e
Branch: refs/heads/flip-6
Commit: 09d6384e2b1c5b3b24afc3ec9b25673d2d2b7381
Parents: a9e6a46
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 26 18:01:47 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:46:23 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/rpc/RpcService.java | 8 ++++++++
.../flink/runtime/rpc/akka/AkkaRpcService.java | 16 ++++++++++++++++
.../apache/flink/runtime/rpc/TestingRpcService.java | 5 +++--
.../flink/runtime/rpc/TestingSerialRpcService.java | 6 ++++++
.../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 8 ++++----
.../flink/runtime/rpc/akka/AkkaRpcServiceTest.java | 5 +++++
6 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09d6384e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 437e08b..96844ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,6 +33,14 @@ import java.util.concurrent.TimeUnit;
public interface RpcService {
/**
+ * Return the address under which the rpc service can be reached. If the rpc service cannot be
+ * contacted remotely, then it will return an empty string.
+ *
+ * @return Address of the rpc service or empty string if local rpc service
+ */
+ String getAddress();
+
+ /**
* Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
* be used to communicate with the rpc server. If the connection failed, then the returned
* future is failed with a {@link RpcConnectionException}.
http://git-wip-us.apache.org/repos/asf/flink/blob/09d6384e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index cee19c4..6825557 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -22,6 +22,7 @@ import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.Address;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
@@ -75,6 +76,8 @@ public class AkkaRpcService implements RpcService {
private final Set<ActorRef> actors = new HashSet<>(4);
private final long maximumFramesize;
+ private final String address;
+
private volatile boolean stopped;
public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
@@ -87,6 +90,19 @@ public class AkkaRpcService implements RpcService {
// only local communication
maximumFramesize = Long.MAX_VALUE;
}
+
+ Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+
+ if (actorSystemAddress.host().isDefined()) {
+ address = actorSystemAddress.host().get();
+ } else {
+ address = "";
+ }
+ }
+
+ @Override
+ public String getAddress() {
+ return address;
}
// this method does not mutate state and is thus thread-safe
http://git-wip-us.apache.org/repos/asf/flink/blob/09d6384e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index f164056..47c9e24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,14 +58,14 @@ public class TestingRpcService extends AkkaRpcService {
/**
* Creates a new {@code TestingRpcService}.
*/
- public TestingRpcService() {
+ public TestingRpcService() throws UnknownHostException {
this(new Configuration());
}
/**
* Creates a new {@code TestingRpcService}, using the given configuration.
*/
- public TestingRpcService(Configuration configuration) {
+ public TestingRpcService(Configuration configuration) throws UnknownHostException {
super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
this.registeredConnections = new ConcurrentHashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/09d6384e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index c58ea20..5b8e6e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -30,6 +30,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.net.InetAddress;
import java.util.BitSet;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -121,6 +122,11 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
+ public String getAddress() {
+ return "";
+ }
+
+ @Override
public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
RpcGateway gateway = registeredConnections.get(address);
http://git-wip-us.apache.org/repos/asf/flink/blob/09d6384e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 1e8c9a6..5d76024 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -133,7 +133,7 @@ public class AkkaRpcActorTest extends TestLogger {
Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
- WrongRpcGateway gateway = Await.result(futureGateway, timeout.duration());
+ WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit());
// since it is a tell operation we won't receive a RpcConnectionException, it's only logged
gateway.tell("foobar");
@@ -141,10 +141,10 @@ public class AkkaRpcActorTest extends TestLogger {
Future<Boolean> result = gateway.barfoo();
try {
- Await.result(result, timeout.duration());
+ result.get(timeout.getSize(), timeout.getUnit());
fail("We expected a RpcConnectionException.");
- } catch (RpcConnectionException rpcConnectionException) {
- // we expect this exception here
+ } catch (ExecutionException executionException) {
+ assertTrue(executionException.getCause() instanceof RpcConnectionException);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09d6384e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 5550cb5..3388011 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -115,4 +115,9 @@ public class AkkaRpcServiceTest extends TestLogger {
assertEquals(expected, actual);
assertTrue(latch.isTriggered());
}
+
+ @Test
+ public void testGetAddress() {
+ assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress());
+ }
}