You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/07 18:32:49 UTC
[7/8] flink git commit: [runtime] Fix TaskManager's BLOB service host
lookup when connecting to the JobManager
[runtime] Fix TaskManager's BLOB service host lookup when connecting to the JobManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b89855a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b89855a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b89855a
Branch: refs/heads/release-0.9.0-milestone-1
Commit: 4b89855aaab50ec785a0c5e0e19124f8f9ea9440
Parents: 09bd1f8
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 7 17:40:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 7 18:13:01 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/taskmanager/TaskManager.scala | 15 ++++++++++++---
.../flink/runtime/taskmanager/TaskManagerTest.java | 2 +-
2 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7d60c00..d6b91ec 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -608,6 +608,16 @@ extends Actor with ActorLogMessages with ActorLogging {
id: InstanceID,
blobPort: Int): Unit = {
+ if (jobManager == null) {
+ throw new NullPointerException("jobManager may not be null")
+ }
+ if (id == null) {
+ throw new NullPointerException("instance ID may not be null")
+ }
+ if (blobPort <= 0 || blobPort > 65535) {
+ throw new IllegalArgumentException("blob port is out of range: " + blobPort)
+ }
+
// sanity check that we are not currently registered with a different JobManager
if (isConnected) {
if (currentJobManager.get == jobManager) {
@@ -644,9 +654,8 @@ extends Actor with ActorLogMessages with ActorLogging {
// start a blob service, if a blob server is specified
if (blobPort > 0) {
- val address = new InetSocketAddress(
- currentJobManager.flatMap(_.path.address.host).getOrElse("localhost"),
- blobPort)
+ val jmHost = jobManager.path.address.host.getOrElse("localhost")
+ val address = new InetSocketAddress(jmHost, blobPort)
LOG.info("Determined BLOB server address to be {}. Starting BLOB cache.", address)
http://git-wip-us.apache.org/repos/asf/flink/blob/4b89855a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e736a55..760b14e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -566,7 +566,7 @@ public class TaskManagerTest {
if (message instanceof RegistrationMessages.RegisterTaskManager) {
final InstanceID iid = new InstanceID();
final ActorRef self = getSelf();
- getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, -1), self);
+ getSender().tell(new RegistrationMessages.AcknowledgeRegistration(self, iid, 12345), self);
}
else if(message instanceof TaskMessages.UpdateTaskExecutionState){
getSender().tell(true, getSelf());