You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/07 18:32:49 UTC

[7/8] flink git commit: [runtime] Fix TaskManager's BLOB service host lookup when connecting to the JobManager

[runtime] Fix TaskManager's BLOB service host lookup when connecting to the JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b89855a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b89855a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b89855a

Branch: refs/heads/release-0.9.0-milestone-1
Commit: 4b89855aaab50ec785a0c5e0e19124f8f9ea9440
Parents: 09bd1f8
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 7 17:40:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 7 18:13:01 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala      | 15 ++++++++++++---
 .../flink/runtime/taskmanager/TaskManagerTest.java   |  2 +-
 2 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7d60c00..d6b91ec 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -608,6 +608,16 @@ extends Actor with ActorLogMessages with ActorLogging {
                                       id: InstanceID,
                                       blobPort: Int): Unit = {
 
+    if (jobManager == null) {
+      throw new NullPointerException("jobManager may not be null")
+    }
+    if (id == null) {
+      throw new NullPointerException("instance ID may not be null")
+    }
+    if (blobPort <= 0 || blobPort > 65535) {
+      throw new IllegalArgumentException("blob port is out of range: " + blobPort)
+    }
+
     // sanity check that we are not currently registered with a different JobManager
     if (isConnected) {
       if (currentJobManager.get == jobManager) {
@@ -644,9 +654,8 @@ extends Actor with ActorLogMessages with ActorLogging {
 
     // start a blob service, if a blob server is specified
     if (blobPort > 0) {
-      val address = new InetSocketAddress(
-        currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
-        blobPort)
+      val jmHost = jobManager.path.address.host.getOrElse("localhost")
+      val address = new InetSocketAddress(jmHost, blobPort)
 
       LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e736a55..760b14e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -566,7 +566,7 @@ public class TaskManagerTest {
 			if (message instanceof RegistrationMessages.RegisterTaskManager) {
 				final InstanceID iid = new InstanceID();
 				final ActorRef self = getSelf();
-				getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self);
+				getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self);
 			}
 			else if(message instanceof TaskMessages.UpdateTaskExecutionState){
 				getSender().tell(true, getSelf());