You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:11 UTC
[06/50] [abbrv] flink git commit: [FLINK-4414] [cluster] Add
getAddress method to RpcGateway
[FLINK-4414] [cluster] Add getAddress method to RpcGateway
The RpcGateway.getAddress method allows to retrieve the fully qualified address of the
associated RpcEndpoint.
This closes #2392.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31121122
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31121122
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31121122
Branch: refs/heads/flip-6
Commit: 3112112233100ec0389e5f0e316767c9d2b7790a
Parents: bdd5bd0
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 18 16:34:47 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:41 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rpc/RpcEndpoint.java | 6 +-----
.../apache/flink/runtime/rpc/RpcGateway.java | 7 +++++++
.../apache/flink/runtime/rpc/RpcService.java | 11 ----------
.../runtime/rpc/akka/AkkaInvocationHandler.java | 14 +++++++++++--
.../flink/runtime/rpc/akka/AkkaRpcService.java | 21 ++++++--------------
.../runtime/rpc/akka/AkkaRpcActorTest.java | 16 +++++++++++++++
6 files changed, 42 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/31121122/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index a28bc14..7b3f8a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -69,9 +69,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
/** Self gateway which can be used to schedule asynchronous calls on yourself */
private final C self;
- /** the fully qualified address of the this RPC endpoint */
- private final String selfAddress;
-
/** The main thread execution context to be used to execute future callbacks in the main thread
* of the executing rpc server. */
private final ExecutionContext mainThreadExecutionContext;
@@ -92,7 +89,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass());
this.self = rpcService.startServer(this);
- this.selfAddress = rpcService.getAddress(self);
this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
}
@@ -156,7 +152,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* @return Fully qualified address of the underlying RPC endpoint
*/
public String getAddress() {
- return selfAddress;
+ return self.getAddress();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/31121122/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
index e3a16b4..81075ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -22,4 +22,11 @@ package org.apache.flink.runtime.rpc;
* Rpc gateway interface which has to be implemented by Rpc gateways.
*/
public interface RpcGateway {
+
+ /**
+ * Returns the fully qualified address under which the associated rpc endpoint is reachable.
+ *
+ * @return Fully qualified address under which the associated rpc endpoint is reachable
+ */
+ String getAddress();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31121122/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index fabdb05..bc0f5cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -65,17 +65,6 @@ public interface RpcService {
void stopService();
/**
- * Get the fully qualified address of the underlying rpc server represented by the self gateway.
- * It must be possible to connect from a remote host to the rpc server via the returned fully
- * qualified address.
- *
- * @param selfGateway Self gateway associated with the underlying rpc server
- * @param <C> Type of the rpc gateway
- * @return Fully qualified address
- */
- <C extends RpcGateway> String getAddress(C selfGateway);
-
- /**
* Gets the execution context, provided by this RPC service. This execution
* context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)}
* methods of Futures.
http://git-wip-us.apache.org/repos/asf/flink/blob/31121122/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 524bf74..bfa04f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -23,6 +23,7 @@ import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
@@ -55,6 +56,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable {
private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
+ private final String address;
+
private final ActorRef rpcEndpoint;
// whether the actor ref is local and thus no message serialization is needed
@@ -65,7 +68,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
private final long maximumFramesize;
- AkkaInvocationHandler(ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) {
+ AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) {
+ this.address = Preconditions.checkNotNull(address);
this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
this.timeout = Preconditions.checkNotNull(timeout);
@@ -79,7 +83,8 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
Object result;
if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) ||
- declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class)) {
+ declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
+ declaringClass.equals(RpcGateway.class)) {
result = method.invoke(this, args);
} else {
String methodName = method.getName();
@@ -290,4 +295,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
return false;
}
+
+ @Override
+ public String getAddress() {
+ return address;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31121122/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index d987c2f..00a6932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -102,7 +102,9 @@ public class AkkaRpcService implements RpcService {
public C apply(Object obj) {
ActorRef actorRef = ((ActorIdentity) obj).getRef();
- InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout, maximumFramesize);
+ final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+ InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -135,7 +137,9 @@ public class AkkaRpcService implements RpcService {
LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
- InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout, maximumFramesize);
+ final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+ InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -197,19 +201,6 @@ public class AkkaRpcService implements RpcService {
}
@Override
- public String getAddress(RpcGateway selfGateway) {
- checkState(!stopped, "RpcService is stopped");
-
- if (selfGateway instanceof AkkaGateway) {
- ActorRef actorRef = ((AkkaGateway) selfGateway).getRpcEndpoint();
- return AkkaUtils.getAkkaURL(actorSystem, actorRef);
- } else {
- String className = AkkaGateway.class.getName();
- throw new IllegalArgumentException("Cannot get address for non " + className + '.');
- }
- }
-
- @Override
public ExecutionContext getExecutionContext() {
return actorSystem.dispatcher();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/31121122/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 1653fac..82d13f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -34,6 +34,7 @@ import scala.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class AkkaRpcActorTest extends TestLogger {
@@ -57,6 +58,21 @@ public class AkkaRpcActorTest extends TestLogger {
}
/**
+ * Tests that the rpc endpoint and the associated rpc gateway have the same addresses.
+ * @throws Exception
+ */
+ @Test
+ public void testAddressResolution() throws Exception {
+ DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
+
+ Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
+
+ DummyRpcGateway rpcGateway = Await.result(futureRpcGateway, timeout.duration());
+
+ assertEquals(rpcEndpoint.getAddress(), rpcGateway.getAddress());
+ }
+
+ /**
* Tests that the {@link AkkaRpcActor} stashes messages until the corresponding
* {@link RpcEndpoint} has been started.
*/