You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:10 UTC

[28/50] [abbrv] flink git commit: [FLINK-4687] [rpc] Add getAddress to RpcService

[FLINK-4687] [rpc] Add getAddress to RpcService

This closes #2551.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07512e06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07512e06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07512e06

Branch: refs/heads/flip-6
Commit: 07512e06acfc4bb3c48f1286ce52478c64ffb259
Parents: 59d9e67
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 26 18:01:47 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:41 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/rpc/RpcService.java    |  8 ++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java      | 16 ++++++++++++++++
 .../apache/flink/runtime/rpc/TestingRpcService.java |  5 +++--
 .../flink/runtime/rpc/TestingSerialRpcService.java  |  6 ++++++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java    |  8 ++++----
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java  |  5 +++++
 6 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 437e08b..96844ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,6 +33,14 @@ import java.util.concurrent.TimeUnit;
 public interface RpcService {
 
 	/**
+	 * Return the address under which the rpc service can be reached. If the rpc service cannot be
+	 * contacted remotely, then it will return an empty string.
+	 *
+	 * @return Address of the rpc service or empty string if local rpc service
+	 */
+	String getAddress();
+
+	/**
 	 * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can
 	 * be used to communicate with the rpc server. If the connection failed, then the returned
 	 * future is failed with a {@link RpcConnectionException}.

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index cee19c4..6825557 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -22,6 +22,7 @@ import akka.actor.ActorIdentity;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -75,6 +76,8 @@ public class AkkaRpcService implements RpcService {
 	private final Set<ActorRef> actors = new HashSet<>(4);
 	private final long maximumFramesize;
 
+	private final String address;
+
 	private volatile boolean stopped;
 
 	public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
@@ -87,6 +90,19 @@ public class AkkaRpcService implements RpcService {
 			// only local communication
 			maximumFramesize = Long.MAX_VALUE;
 		}
+
+		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+
+		if (actorSystemAddress.host().isDefined()) {
+			address = actorSystemAddress.host().get();
+		} else {
+			address = "";
+		}
+	}
+
+	@Override
+	public String getAddress() {
+		return address;
 	}
 
 	// this method does not mutate state and is thus thread-safe

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index f164056..47c9e24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 
+import java.net.UnknownHostException;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,14 +58,14 @@ public class TestingRpcService extends AkkaRpcService {
 	/**
 	 * Creates a new {@code TestingRpcService}. 
 	 */
-	public TestingRpcService() {
+	public TestingRpcService() throws UnknownHostException {
 		this(new Configuration());
 	}
 
 	/**
 	 * Creates a new {@code TestingRpcService}, using the given configuration. 
 	 */
-	public TestingRpcService(Configuration configuration) {
+	public TestingRpcService(Configuration configuration) throws UnknownHostException {
 		super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10));
 
 		this.registeredConnections = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index c58ea20..5b8e6e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -30,6 +30,7 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.net.InetAddress;
 import java.util.BitSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -121,6 +122,11 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
+	public String getAddress() {
+		return "";
+	}
+
+	@Override
 	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 1e8c9a6..5d76024 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -133,7 +133,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
 
-		WrongRpcGateway gateway = Await.result(futureGateway, timeout.duration());
+		WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit());
 
 		// since it is a tell operation we won't receive a RpcConnectionException, it's only logged
 		gateway.tell("foobar");
@@ -141,10 +141,10 @@ public class AkkaRpcActorTest extends TestLogger {
 		Future<Boolean> result = gateway.barfoo();
 
 		try {
-			Await.result(result, timeout.duration());
+			result.get(timeout.getSize(), timeout.getUnit());
 			fail("We expected a RpcConnectionException.");
-		} catch (RpcConnectionException rpcConnectionException) {
-			// we expect this exception here
+		} catch (ExecutionException executionException) {
+			assertTrue(executionException.getCause() instanceof RpcConnectionException);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/07512e06/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 5550cb5..3388011 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -115,4 +115,9 @@ public class AkkaRpcServiceTest extends TestLogger {
 		assertEquals(expected, actual);
 		assertTrue(latch.isTriggered());
 	}
+
+	@Test
+	public void testGetAddress() {
+		assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress());
+	}
 }