You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/16 09:49:53 UTC

flink git commit: [FLINK-6064][flip6] fix BlobServer connection in TaskExecutor

Repository: flink
Updated Branches:
  refs/heads/master 264f6df8e -> 97f178894


[FLINK-6064][flip6] fix BlobServer connection in TaskExecutor

The hostname used for the BlobServer was set to the akka address which is
invalid for this use. Instead, this adds the hostname to the RpcGateway /
AkkaInvocationHandler so that this information is available to the TaskExecutor.

This closes #3551.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97f17889
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97f17889
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97f17889

Branch: refs/heads/master
Commit: 97f1788941b3a1a0710f530ddd83bad713098d56
Parents: 264f6df
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Mar 15 16:09:38 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Mar 16 10:48:39 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcGateway.java    |  9 +++++++-
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 16 ++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 23 ++++++++++++++++----
 .../runtime/taskexecutor/TaskExecutor.java      |  6 ++---
 .../flink/runtime/rpc/TestingGatewayBase.java   |  5 +++++
 .../runtime/rpc/TestingSerialRpcService.java    |  7 ++++++
 .../taskexecutor/TaskExecutorITCase.java        |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  6 ++---
 8 files changed, 62 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
index 81075ee..aa1d102 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -26,7 +26,14 @@ public interface RpcGateway {
 	/**
 	 * Returns the fully qualified address under which the associated rpc endpoint is reachable.
 	 *
-	 * @return Fully qualified address under which the associated rpc endpoint is reachable
+	 * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable
 	 */
 	String getAddress();
+
+	/**
+	 * Returns the fully qualified hostname under which the associated rpc endpoint is reachable.
+	 *
+	 * @return Fully qualified hostname under which the associated rpc endpoint is reachable
+	 */
+	String getHostname();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index e211b27..56505f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -57,8 +57,17 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable, SelfGateway {
 	private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class);
 
+	/**
+	 * The Akka (RPC) address of {@link #rpcEndpoint} including host and port of the ActorSystem in
+	 * which the actor is running.
+	 */
 	private final String address;
 
+	/**
+	 * Hostname of the host, {@link #rpcEndpoint} is running on.
+	 */
+	private final String hostname;
+
 	private final ActorRef rpcEndpoint;
 
 	// whether the actor ref is local and thus no message serialization is needed
@@ -74,12 +83,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 
 	AkkaInvocationHandler(
 			String address,
+			String hostname,
 			ActorRef rpcEndpoint,
 			Time timeout,
 			long maximumFramesize,
 			Future<Void> terminationFuture) {
 
 		this.address = Preconditions.checkNotNull(address);
+		this.hostname = Preconditions.checkNotNull(hostname);
 		this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
 		this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
 		this.timeout = Preconditions.checkNotNull(timeout);
@@ -314,6 +325,11 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 	}
 
 	@Override
+	public String getHostname() {
+		return hostname;
+	}
+
+	@Override
 	public Future<Void> getTerminationFuture() {
 		return terminationFuture;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 4298021..f5ccdbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -29,7 +29,6 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
-
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -39,16 +38,16 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
-import org.apache.flink.runtime.rpc.SelfGateway;
-import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.SelfGateway;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.Nonnull;
@@ -143,9 +142,17 @@ public class AkkaRpcService implements RpcService {
 					ActorRef actorRef = actorIdentity.getRef();
 
 					final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+					final String hostname;
+					Option<String> host = actorRef.path().address().host();
+					if (host.isEmpty()) {
+						hostname = "localhost";
+					} else {
+						hostname = host.get();
+					}
 
 					InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
 						address,
+						hostname,
 						actorRef,
 						timeout,
 						maximumFramesize,
@@ -187,9 +194,17 @@ public class AkkaRpcService implements RpcService {
 		LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
 
 		final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+		final String hostname;
+		Option<String> host = actorRef.path().address().host();
+		if (host.isEmpty()) {
+			hostname = "localhost";
+		} else {
+			hostname = host.get();
+		}
 
 		InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
 			address,
+			hostname,
 			actorRef,
 			timeout,
 			maximumFramesize,

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8db1d5b..df5765a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -788,17 +788,17 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) {
 		Preconditions.checkNotNull(jobManagerLeaderId);
 		Preconditions.checkNotNull(jobMasterGateway);
-		Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob port is out of range.");
+		Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");
 
 		TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway);
 
 		CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
 
-		InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
+		InetSocketAddress blobServerAddress = new InetSocketAddress(jobMasterGateway.getHostname(), blobPort);
 
 		final LibraryCacheManager libraryCacheManager;
 		try {
-			final BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration(), haServices);
+			final BlobCache blobCache = new BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices);
 			libraryCacheManager = new BlobLibraryCacheManager(
 				blobCache,
 				taskManagerConfiguration.getCleanupInterval());

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index caf5e81..03fe84b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -68,6 +68,11 @@ public abstract class TestingGatewayBase implements RpcGateway {
 		return address;
 	}
 
+	@Override
+	public String getHostname() {
+		return address;
+	}
+
 	// ------------------------------------------------------------------------
 	//  utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 6280a46..25156e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -308,6 +308,13 @@ public class TestingSerialRpcService implements RpcService {
 			return address;
 		}
 
+		// this is not a real hostname but the address above is also not a real akka RPC address
+		// and we keep it that way until actually needed by a test case
+		@Override
+		public String getHostname() {
+			return address;
+		}
+
 		@Override
 		public void start() {
 			// do nothing

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 898584c..076d126 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -146,7 +146,7 @@ public class TaskExecutorITCase {
 
 		when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
 			.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
-		when(jmGateway.getAddress()).thenReturn(jmAddress);
+		when(jmGateway.getHostname()).thenReturn(jmAddress);
 
 
 		rpcService.registerGateway(rmAddress, resourceManager.getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/97f17889/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 31bf9b8..d413a01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -438,7 +438,7 @@ public class TaskExecutorTest extends TestLogger {
 				eq(jobManagerLeaderId),
 				any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
-		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -551,7 +551,7 @@ public class TaskExecutorTest extends TestLogger {
 				eq(jobManagerLeaderId),
 				any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
-		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 		when(jobMasterGateway.offerSlots(
 				any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
@@ -754,7 +754,7 @@ public class TaskExecutorTest extends TestLogger {
 			eq(jobManagerLeaderId),
 			any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
-		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);