You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:36:12 UTC
[8/8] flink git commit: [FLINK-4490] [distributed coordination] (part
1) Change InstanceConnectionInfo to TaskManagerLocation
[FLINK-4490] [distributed coordination] (part 1) Change InstanceConnectionInfo to TaskManagerLocation
This adds the ResourceId to the TaskManagerLocation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34cda87a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34cda87a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34cda87a
Branch: refs/heads/master
Commit: 34cda87a6e831c6ea62f1a79df08d1efac718d03
Parents: e227b10
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 29 16:58:31 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200
----------------------------------------------------------------------
.../clusterframework/MesosTaskManager.scala | 7 +-
.../handlers/JobExceptionsHandler.java | 4 +-
.../handlers/JobVertexDetailsHandler.java | 4 +-
.../handlers/JobVertexTaskManagersHandler.java | 4 +-
.../SubtaskExecutionAttemptDetailsHandler.java | 4 +-
.../SubtasksAllAccumulatorsHandler.java | 4 +-
.../handlers/SubtasksTimesHandler.java | 4 +-
...PartialInputChannelDeploymentDescriptor.java | 10 +-
.../flink/runtime/executiongraph/Execution.java | 8 +-
.../runtime/executiongraph/ExecutionVertex.java | 4 +-
.../apache/flink/runtime/instance/Instance.java | 7 +-
.../instance/InstanceConnectionInfo.java | 267 -------------------
.../flink/runtime/instance/InstanceManager.java | 3 +-
.../flink/runtime/io/network/ConnectionID.java | 4 +-
.../taskmanager/TaskManagerLocation.java | 257 ++++++++++++++++++
.../runtime/messages/RegistrationMessages.scala | 12 +-
.../flink/runtime/taskmanager/TaskManager.scala | 15 +-
.../testingUtils/TestingTaskManager.scala | 33 ++-
.../ExecutionGraphMetricsTest.java | 8 +-
.../executiongraph/ExecutionGraphTestUtils.java | 7 +-
.../TerminalStateDeadlockTest.java | 7 +-
.../VertexLocationConstraintTest.java | 6 +-
.../instance/InstanceConnectionInfoTest.java | 200 --------------
.../runtime/instance/InstanceManagerTest.java | 64 +++--
.../flink/runtime/instance/InstanceTest.java | 16 +-
.../flink/runtime/instance/SimpleSlotTest.java | 6 +-
.../scheduler/SchedulerTestUtils.java | 25 +-
.../resourcemanager/ResourceManagerITCase.java | 5 +-
...askManagerComponentsStartupShutdownTest.java | 12 +-
.../taskmanager/TaskManagerLocationTest.java | 201 ++++++++++++++
.../jobmanager/JobManagerRegistrationTest.scala | 14 +-
.../flink/yarn/TestingYarnTaskManager.scala | 20 +-
.../org/apache/flink/yarn/YarnTaskManager.scala | 7 +-
33 files changed, 618 insertions(+), 631 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index d8b6775..19b0c62 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -19,12 +19,11 @@
package org.apache.flink.mesos.runtime.clusterframework
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}
/** An extension of the TaskManager that listens for additional Mesos-related
* messages.
@@ -32,7 +31,7 @@ import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfigurati
class MesosTaskManager(
config: TaskManagerConfiguration,
resourceID: ResourceID,
- connectionInfo: InstanceConnectionInfo,
+ taskManagerLocation: TaskManagerLocation,
memoryManager: MemoryManager,
ioManager: IOManager,
network: NetworkEnvironment,
@@ -41,7 +40,7 @@ class MesosTaskManager(
extends TaskManager(
config,
resourceID,
- connectionInfo,
+ taskManagerLocation,
memoryManager,
ioManager,
network,
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 7b6a361..ce154e3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.util.ExceptionUtils;
@@ -66,7 +66,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
break;
}
- InstanceConnectionInfo location = task.getCurrentAssignedResourceLocation();
+ TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
String locationString = location != null ?
location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index d4e885e..813ecb8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
@@ -61,7 +61,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
final ExecutionState status = vertex.getExecutionState();
- InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+ TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index befc0bf..cbdb87f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
@@ -51,7 +51,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>();
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
- InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+ TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager);
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 3d80b23..a1e6d0e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
@@ -45,7 +45,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
final ExecutionState status = execAttempt.getState();
final long now = System.currentTimeMillis();
- InstanceConnectionInfo location = execAttempt.getAssignedResourceLocation();
+ TaskManagerLocation location = execAttempt.getAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 6d9ce3a..780bd4b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
@@ -52,7 +52,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
int num = 0;
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
- InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+ TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
gen.writeStartObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 03d40dc..9e6276d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.io.StringWriter;
@@ -70,7 +70,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
gen.writeStartObject();
gen.writeNumberField("subtask", num++);
- InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+ TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
gen.writeStringField("host", locationString);
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
index a3cfcd9..e1391a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -44,7 +44,7 @@ public class PartialInputChannelDeploymentDescriptor {
private final ResultPartitionID partitionID;
/** The partition connection info. */
- private final InstanceConnectionInfo partitionConnectionInfo;
+ private final TaskManagerLocation partitionConnectionInfo;
/** The partition connection index. */
private final int partitionConnectionIndex;
@@ -52,7 +52,7 @@ public class PartialInputChannelDeploymentDescriptor {
public PartialInputChannelDeploymentDescriptor(
IntermediateDataSetID resultId,
ResultPartitionID partitionID,
- InstanceConnectionInfo partitionConnectionInfo,
+ TaskManagerLocation partitionConnectionInfo,
int partitionConnectionIndex) {
this.resultId = checkNotNull(resultId);
@@ -71,7 +71,7 @@ public class PartialInputChannelDeploymentDescriptor {
checkNotNull(consumerExecution, "Consumer execution null");
- InstanceConnectionInfo consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
+ TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
checkNotNull(consumerConnectionInfo, "Consumer connection info null");
@@ -107,7 +107,7 @@ public class PartialInputChannelDeploymentDescriptor {
final IntermediateResult result = partition.getIntermediateResult();
final IntermediateDataSetID resultId = result.getId();
- final InstanceConnectionInfo partitionConnectionInfo = producer.getAssignedResourceLocation();
+ final TaskManagerLocation partitionConnectionInfo = producer.getAssignedResourceLocation();
final int partitionConnectionIndex = result.getConnectionIndex();
return new PartialInputChannelDeploymentDescriptor(
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index efddecc..197999c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
@@ -133,7 +133,7 @@ public class Execution {
private volatile Throwable failureCause; // once assigned, never changes
- private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
+ private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution
private ChainedStateHandle<StreamStateHandle> chainedStateHandle;
@@ -147,7 +147,7 @@ public class Execution {
/* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
* by partial accumulators on a late heartbeat*/
- private final SerializableObject accumulatorLock = new SerializableObject();
+ private final Object accumulatorLock = new Object();
/* Continuously updated map of user-defined accumulators */
private volatile Map<String, Accumulator<?, ?>> userAccumulators;
@@ -202,7 +202,7 @@ public class Execution {
return assignedResource;
}
- public InstanceConnectionInfo getAssignedResourceLocation() {
+ public TaskManagerLocation getAssignedResourceLocation() {
return assignedResourceLocation;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b215394..e5a115a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -228,7 +228,7 @@ public class ExecutionVertex {
return currentExecution.getAssignedResource();
}
- public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
+ public TaskManagerLocation getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 119f060..598b32b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ public class Instance {
private final ActorGateway actorGateway;
/** The instance connection information for the data transfer. */
- private final InstanceConnectionInfo connectionInfo;
+ private final TaskManagerLocation connectionInfo;
/** A description of the resources of the task manager */
private final HardwareDescription resources;
@@ -92,7 +93,7 @@ public class Instance {
*/
public Instance(
ActorGateway actorGateway,
- InstanceConnectionInfo connectionInfo,
+ TaskManagerLocation connectionInfo,
ResourceID resourceId,
InstanceID id,
HardwareDescription resources,
@@ -350,7 +351,7 @@ public class Instance {
return actorGateway;
}
- public InstanceConnectionInfo getInstanceConnectionInfo() {
+ public TaskManagerLocation getInstanceConnectionInfo() {
return connectionInfo;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
deleted file mode 100644
index 2830f04..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class encapsulates the connection information of a TaskManager.
- * It describes the host where the TaskManager operates and its server port
- * for data exchange. This class also contains utilities to work with the
- * TaskManager's host name, which is used to localize work assignments.
- */
-public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>, Serializable {
-
- private static final long serialVersionUID = -8254407801276350716L;
-
- private static final Logger LOG = LoggerFactory.getLogger(InstanceConnectionInfo.class);
-
-
- /**
- * The network address the instance's task manager binds its sockets to.
- */
- private InetAddress inetAddress;
-
- /**
- * The port the instance's task manager expects to receive transfer envelopes on.
- */
- private int dataPort;
-
- /**
- * The fully qualified host name of the instance.
- */
- private String fqdnHostName;
-
- /**
- * The hostname, derived from the fully qualified host name.
- */
- private String hostName;
-
-
- /**
- * Constructs a new instance connection info object. The constructor will attempt to retrieve the instance's
- * host name and domain name through the operating system's lookup mechanisms.
- *
- * @param inetAddress
- * the network address the instance's task manager binds its sockets to
- * @param dataPort
- * the port instance's task manager expects to receive transfer envelopes on
- */
- public InstanceConnectionInfo(InetAddress inetAddress, int dataPort) {
- if (inetAddress == null) {
- throw new IllegalArgumentException("Argument inetAddress must not be null");
- }
-
- // -1 indicates a local instance connection info
- if (dataPort != -1 && dataPort <= 0) {
- throw new IllegalArgumentException("Argument dataPort must be greater than zero");
- }
-
- this.dataPort = dataPort;
- this.inetAddress = inetAddress;
-
- // get FQDN hostname on this TaskManager.
- try {
- this.fqdnHostName = this.inetAddress.getCanonicalHostName();
- }
- catch (Throwable t) {
- LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
- "for HDFS files) may be non-local when the canonical hostname is missing.");
- LOG.debug("getCanonicalHostName() Exception:", t);
- this.fqdnHostName = this.inetAddress.getHostAddress();
- }
-
- if (this.fqdnHostName.equals(this.inetAddress.getHostAddress())) {
- // this happens when the name lookup fails, either due to an exception,
- // or because no hostname can be found for the address
- // take IP textual representation
- this.hostName = this.fqdnHostName;
- LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
- + "Local input split assignment (such as for HDFS files) may be impacted.",
- this.inetAddress.getHostAddress());
- }
- else {
- this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);
- }
- }
-
- /**
- * Constructs an empty object.
- */
- public InstanceConnectionInfo() {}
-
-
- /**
- * Returns the port instance's task manager expects to receive transfer envelopes on.
- *
- * @return the port instance's task manager expects to receive transfer envelopes on
- */
- public int dataPort() {
- return this.dataPort;
- }
-
- /**
- * Returns the network address the instance's task manager binds its sockets to.
- *
- * @return the network address the instance's task manager binds its sockets to
- */
- public InetAddress address() {
- return this.inetAddress;
- }
-
- /**
- * Returns the fully-qualified domain name the TaskManager. If the name could not be
- * determined, the return value will be a textual representation of the TaskManager's IP address.
- *
- * @return The fully-qualified domain name of the TaskManager.
- */
- public String getFQDNHostname() {
- return this.fqdnHostName;
- }
-
- /**
- * Gets the hostname of the TaskManager. The hostname derives from the fully qualified
- * domain name (FQDN, see {@link #getFQDNHostname()}):
- * <ul>
- * <li>If the FQDN is the textual IP address, then the hostname is also the IP address</li>
- * <li>If the FQDN has only one segment (such as "localhost", or "host17"), then this is
- * used as the hostname.</li>
- * <li>If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first
- * segment (here "worker3") will be used as the hostname.</li>
- * </ul>
- *
- * @return The hostname of the TaskManager.
- */
- public String getHostname() {
- return hostName;
- }
-
- /**
- * Gets the IP address where the TaskManager operates.
- *
- * @return The IP address.
- */
- public String getInetAdress() {
- return this.inetAddress.toString();
- }
-
- // --------------------------------------------------------------------------------------------
- // Serialization
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void read(DataInputView in) throws IOException {
-
- final int addr_length = in.readInt();
- byte[] address = new byte[addr_length];
- in.readFully(address);
-
- this.dataPort = in.readInt();
-
- this.fqdnHostName = StringUtils.readNullableString(in);
- this.hostName = StringUtils.readNullableString(in);
-
- try {
- this.inetAddress = InetAddress.getByAddress(address);
- } catch (UnknownHostException e) {
- throw new IOException("This lookup should never fail.", e);
- }
- }
-
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- out.writeInt(this.inetAddress.getAddress().length);
- out.write(this.inetAddress.getAddress());
-
- out.writeInt(this.dataPort);
-
- StringUtils.writeNullableString(fqdnHostName, out);
- StringUtils.writeNullableString(hostName, out);
- }
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return getFQDNHostname() + " (dataPort=" + dataPort + ")";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof InstanceConnectionInfo) {
- InstanceConnectionInfo other = (InstanceConnectionInfo) obj;
- return this.dataPort == other.dataPort &&
- this.inetAddress.equals(other.inetAddress);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return this.inetAddress.hashCode() +
- 17*dataPort;
- }
-
- @Override
- public int compareTo(InstanceConnectionInfo o) {
- // decide based on address first
- byte[] thisAddress = this.inetAddress.getAddress();
- byte[] otherAddress = o.inetAddress.getAddress();
-
- if (thisAddress.length < otherAddress.length) {
- return -1;
- } else if (thisAddress.length > otherAddress.length) {
- return 1;
- } else {
- for (int i = 0; i < thisAddress.length; i++) {
- byte tb = thisAddress[i];
- byte ob = otherAddress[i];
- if (tb < ob) {
- return -1;
- } else if (tb > ob) {
- return 1;
- }
- }
- }
-
- // addresses are identical, decide based on ports.
- if (this.dataPort < o.dataPort) {
- return -1;
- } else if (this.dataPort > o.dataPort) {
- return 1;
- } else {
- return 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 0d0d4c7..e7a4537 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import akka.actor.ActorRef;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,7 +148,7 @@ public class InstanceManager {
public InstanceID registerTaskManager(
ActorRef taskManager,
ResourceID resourceID,
- InstanceConnectionInfo connectionInfo,
+ TaskManagerLocation connectionInfo,
HardwareDescription resources,
int numberOfSlots,
UUID leaderSessionID){
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index 0569dae..cc2a19d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.io.Serializable;
import java.net.InetSocketAddress;
@@ -43,7 +43,7 @@ public class ConnectionID implements Serializable {
private final int connectionIndex;
- public ConnectionID(InstanceConnectionInfo connectionInfo, int connectionIndex) {
+ public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) {
this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
new file mode 100644
index 0000000..5a0faa5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import java.net.InetAddress;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.NetUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class encapsulates the connection information of a TaskManager.
+ * It describes the host where the TaskManager operates and its server port
+ * for data exchange. This class also contains utilities to work with the
+ * TaskManager's host name, which is used to localize work assignments.
+ */
+public class TaskManagerLocation implements Comparable<TaskManagerLocation>, java.io.Serializable {
+
+ private static final long serialVersionUID = -8254407801276350716L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLocation.class);
+
+ // ------------------------------------------------------------------------
+
+ /** The ID of the resource in which the TaskManager is started. This can be for example
+ * the YARN container ID, Mesos container ID, or any other unique identifier. */
+ private final ResourceID resourceID;
+
+ /** The network address that the TaskManager binds its sockets to */
+ private final InetAddress inetAddress;
+
+ /** The fully qualified host name of the TaskManager */
+ private final String fqdnHostName;
+
+ /** The pure hostname, derived from the fully qualified host name. */
+ private final String hostName;
+
+ /** The port that the TaskManager receive data transport connection requests at */
+ private final int dataPort;
+
+ /** The toString representation, eagerly constructed and cached to avoid repeated string building */
+ private final String stringRepresentation;
+
+ /**
+ * Constructs a new instance connection info object. The constructor will attempt to retrieve the instance's
+ * host name and domain name through the operating system's lookup mechanisms.
+ *
+ * @param inetAddress
+ * the network address the instance's task manager binds its sockets to
+ * @param dataPort
+ * the port instance's task manager expects to receive transfer envelopes on
+ */
+ public TaskManagerLocation(ResourceID resourceID, InetAddress inetAddress, int dataPort) {
+ // -1 indicates a local instance connection info
+ checkArgument(dataPort > 0 || dataPort == -1, "dataPort must be > 0, or -1 (local)");
+
+ this.resourceID = checkNotNull(resourceID);
+ this.inetAddress = checkNotNull(inetAddress);
+ this.dataPort = dataPort;
+
+ // get FQDN hostname on this TaskManager.
+ String fqdnHostName;
+ try {
+ fqdnHostName = this.inetAddress.getCanonicalHostName();
+ }
+ catch (Throwable t) {
+ LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
+ "for HDFS files) may be non-local when the canonical hostname is missing.");
+ LOG.debug("getCanonicalHostName() Exception:", t);
+ fqdnHostName = this.inetAddress.getHostAddress();
+ }
+ this.fqdnHostName = fqdnHostName;
+
+ if (this.fqdnHostName.equals(this.inetAddress.getHostAddress())) {
+ // this happens when the name lookup fails, either due to an exception,
+ // or because no hostname can be found for the address
+ // take IP textual representation
+ this.hostName = this.fqdnHostName;
+ LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
+ + "Local input split assignment (such as for HDFS files) may be impacted.",
+ this.inetAddress.getHostAddress());
+ }
+ else {
+ this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);
+ }
+
+ this.stringRepresentation = String.format(
+ "TaskManager (%s) @ %s (dataPort=%d)", resourceID, fqdnHostName, dataPort);
+ }
+
+ // ------------------------------------------------------------------------
+ // Getters
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the ID of the resource in which the TaskManager is started. The format of this depends
+ * on how the TaskManager is started:
+ * <ul>
+ * <li>If the TaskManager is started via YARN, this is the YARN container ID.</li>
+ * <li>If the TaskManager is started via Mesos, this is the Mesos container ID.</li>
+ * <li>If the TaskManager is started in standalone mode, or via a MiniCluster, this is a random ID.</li>
+ * <li>Other deployment modes can set the resource ID in other ways.</li>
+ * </ul>
+ *
+ * @return The ID of the resource in which the TaskManager is started
+ */
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
+ /**
+ * Returns the port instance's task manager expects to receive transfer envelopes on.
+ *
+ * @return the port instance's task manager expects to receive transfer envelopes on
+ */
+ public int dataPort() {
+ return dataPort;
+ }
+
+ /**
+ * Returns the network address the instance's task manager binds its sockets to.
+ *
+ * @return the network address the instance's task manager binds its sockets to
+ */
+ public InetAddress address() {
+ return inetAddress;
+ }
+
+ /**
+ * Gets the IP address where the TaskManager operates.
+ *
+ * @return The IP address.
+ */
+ public String addressString() {
+ return inetAddress.toString();
+ }
+
+ /**
+ * Returns the fully-qualified domain name the TaskManager. If the name could not be
+ * determined, the return value will be a textual representation of the TaskManager's IP address.
+ *
+ * @return The fully-qualified domain name of the TaskManager.
+ */
+ public String getFQDNHostname() {
+ return fqdnHostName;
+ }
+
+ /**
+ * Gets the hostname of the TaskManager. The hostname derives from the fully qualified
+ * domain name (FQDN, see {@link #getFQDNHostname()}):
+ * <ul>
+ * <li>If the FQDN is the textual IP address, then the hostname is also the IP address</li>
+ * <li>If the FQDN has only one segment (such as "localhost", or "host17"), then this is
+ * used as the hostname.</li>
+ * <li>If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first
+ * segment (here "worker3") will be used as the hostname.</li>
+ * </ul>
+ *
+ * @return The hostname of the TaskManager.
+ */
+ public String getHostname() {
+ return hostName;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return stringRepresentation;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ else if (obj != null && obj.getClass() == TaskManagerLocation.class) {
+ TaskManagerLocation that = (TaskManagerLocation) obj;
+ return this.resourceID.equals(that.resourceID) &&
+ this.inetAddress.equals(that.inetAddress) &&
+ this.dataPort == that.dataPort;
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return resourceID.hashCode() +
+ 17 * inetAddress.hashCode() +
+ 129 * dataPort;
+ }
+
+ @Override
+ public int compareTo(@Nonnull TaskManagerLocation o) {
+ // decide based on address first
+ int resourceIdCmp = this.resourceID.getResourceIdString().compareTo(o.resourceID.getResourceIdString());
+ if (resourceIdCmp != 0) {
+ return resourceIdCmp;
+ }
+
+ // decide based on ip address next
+ byte[] thisAddress = this.inetAddress.getAddress();
+ byte[] otherAddress = o.inetAddress.getAddress();
+
+ if (thisAddress.length < otherAddress.length) {
+ return -1;
+ } else if (thisAddress.length > otherAddress.length) {
+ return 1;
+ } else {
+ for (int i = 0; i < thisAddress.length; i++) {
+ byte tb = thisAddress[i];
+ byte ob = otherAddress[i];
+ if (tb < ob) {
+ return -1;
+ } else if (tb > ob) {
+ return 1;
+ }
+ }
+ }
+
+ // addresses are identical, decide based on ports.
+ if (this.dataPort < o.dataPort) {
+ return -1;
+ } else if (this.dataPort > o.dataPort) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index d362164..5648bc6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.messages
import java.util.UUID
-import akka.actor.ActorRef
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{HardwareDescription, InstanceID}
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation
import scala.concurrent.duration.{Deadline, FiniteDuration}
@@ -63,10 +63,10 @@ object RegistrationMessages {
* @param numberOfSlots The number of processing slots offered by the TaskManager.
*/
case class RegisterTaskManager(
- resourceId: ResourceID,
- connectionInfo: InstanceConnectionInfo,
- resources: HardwareDescription,
- numberOfSlots: Int)
+ resourceId: ResourceID,
+ connectionInfo: TaskManagerLocation,
+ resources: HardwareDescription,
+ numberOfSlots: Int)
extends RegistrationMessage
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 3154826..84750a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -50,7 +50,7 @@ import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.filecache.FileCache
-import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceConnectionInfo, InstanceID}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID}
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
@@ -127,7 +127,7 @@ import scala.util.{Failure, Success}
class TaskManager(
protected val config: TaskManagerConfiguration,
protected val resourceID: ResourceID,
- protected val connectionInfo: InstanceConnectionInfo,
+ protected val location: TaskManagerLocation,
protected val memoryManager: MemoryManager,
protected val ioManager: IOManager,
protected val network: NetworkEnvironment,
@@ -189,7 +189,7 @@ class TaskManager(
var leaderSessionID: Option[UUID] = None
private val runtimeInfo = new TaskManagerRuntimeInfo(
- connectionInfo.getHostname(),
+ location.getHostname(),
new UnmodifiableConfiguration(config.configuration),
config.tmpDirPaths)
@@ -209,7 +209,7 @@ class TaskManager(
*/
override def preStart(): Unit = {
log.info(s"Starting TaskManager actor at ${self.path.toSerializationFormat}.")
- log.info(s"TaskManager data connection information: $connectionInfo")
+ log.info(s"TaskManager data connection information: $location")
log.info(s"TaskManager has $numberOfSlots task slot(s).")
// log the initial memory utilization
@@ -601,7 +601,7 @@ class TaskManager(
jobManager ! decorateMessage(
RegisterTaskManager(
resourceID,
- connectionInfo,
+ location,
resources,
numberOfSlots)
)
@@ -1884,7 +1884,8 @@ object TaskManager {
network.start()
- val connectionInfo = new InstanceConnectionInfo(
+ val taskManagerLocation = new TaskManagerLocation(
+ resourceID,
taskManagerAddress.getAddress(),
network.getConnectionManager().getDataPort())
@@ -1994,7 +1995,7 @@ object TaskManager {
taskManagerClass,
taskManagerConfig,
resourceID,
- connectionInfo,
+ taskManagerLocation,
memoryManager,
ioManager,
network,
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 2597753..9b5a147 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,26 +19,25 @@
package org.apache.flink.runtime.testingUtils
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration}
import scala.language.postfixOps
/** Subclass of the [[TaskManager]] to support testing messages
*/
class TestingTaskManager(
- config: TaskManagerConfiguration,
- resourceID: ResourceID,
- connectionInfo: InstanceConnectionInfo,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService)
+ config: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ leaderRetrievalService: LeaderRetrievalService)
extends TaskManager(
config,
resourceID,
@@ -51,13 +50,13 @@ class TestingTaskManager(
with TestingTaskManagerLike {
def this(
- config: TaskManagerConfiguration,
- connectionInfo: InstanceConnectionInfo,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService) {
+ config: TaskManagerConfiguration,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ leaderRetrievalService: LeaderRetrievalService) {
this(
config,
ResourceID.generate(),
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 2b8b867..cf7cf58 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -110,7 +110,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
Instance instance = mock(Instance.class);
- InstanceConnectionInfo instanceConnectionInfo = mock(InstanceConnectionInfo.class);
+ TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
Slot rootSlot = mock(Slot.class);
@@ -123,9 +123,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
- when(instance.getInstanceConnectionInfo()).thenReturn(instanceConnectionInfo);
+ when(instance.getInstanceConnectionInfo()).thenReturn(taskManagerLocation);
when(instance.getActorGateway()).thenReturn(actorGateway);
- when(instanceConnectionInfo.getHostname()).thenReturn("localhost");
+ when(taskManagerLocation.getHostname()).thenReturn("localhost");
when(rootSlot.getSlotNumber()).thenReturn(0);
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 903d5f9..cddb6cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -108,11 +108,12 @@ public class ExecutionGraphTestUtils {
}
public static Instance getInstance(final ActorGateway gateway, final int numberOfSlots) throws Exception {
+ ResourceID resourceID = ResourceID.generate();
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
- InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+ TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
- return new Instance(gateway, connection, ResourceID.generate(), new InstanceID(), hardwareDescription, numberOfSlots);
+ return new Instance(gateway, connection, resourceID, new InstanceID(), hardwareDescription, numberOfSlots);
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 2a690d9..a71faf6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -79,12 +79,13 @@ public class TerminalStateDeadlockTest {
this.execGraphSchedulerField.setAccessible(true);
// the dummy resource
+ ResourceID resourceId = ResourceID.generate();
InetAddress address = InetAddress.getByName("127.0.0.1");
- InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);
+ TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
Instance instance = new Instance(DummyActorGateway.INSTANCE, ci,
- ResourceID.generate(), new InstanceID(), resources, 4);
+ resourceId, new InstanceID(), resources, 4);
this.resource = instance.allocateSimpleSlot(new JobID());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 4ee06b36..91472ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -443,10 +443,10 @@ public class VertexLocationConstraintTest {
public static Instance getInstance(byte[] ipAddress, int dataPort, String hostname) throws Exception {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
- InstanceConnectionInfo connection = mock(InstanceConnectionInfo.class);
+ TaskManagerLocation connection = mock(TaskManagerLocation.class);
when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
when(connection.dataPort()).thenReturn(dataPort);
- when(connection.getInetAdress()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
+ when(connection.addressString()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
when(connection.getHostname()).thenReturn(hostname);
when(connection.getFQDNHostname()).thenReturn(hostname);
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
deleted file mode 100644
index 3a9488d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.net.InetAddress;
-
-import org.apache.flink.util.InstantiationUtil;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the InstanceConnectionInfo, which identifies the location and connection
- * information of a TaskManager.
- */
-public class InstanceConnectionInfoTest {
-
- @Test
- public void testEqualsHashAndCompareTo() {
- try {
- // we mock the addresses to save the times of the reverse name lookups
- InetAddress address1 = mock(InetAddress.class);
- when(address1.getCanonicalHostName()).thenReturn("localhost");
- when(address1.getHostName()).thenReturn("localhost");
- when(address1.getHostAddress()).thenReturn("127.0.0.1");
- when(address1.getAddress()).thenReturn(new byte[] {127, 0, 0, 1} );
-
- InetAddress address2 = mock(InetAddress.class);
- when(address2.getCanonicalHostName()).thenReturn("testhost1");
- when(address2.getHostName()).thenReturn("testhost1");
- when(address2.getHostAddress()).thenReturn("0.0.0.0");
- when(address2.getAddress()).thenReturn(new byte[] {0, 0, 0, 0} );
-
- InetAddress address3 = mock(InetAddress.class);
- when(address3.getCanonicalHostName()).thenReturn("testhost2");
- when(address3.getHostName()).thenReturn("testhost2");
- when(address3.getHostAddress()).thenReturn("192.168.0.1");
- when(address3.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, 0, 1} );
-
- // one == four != two != three
- InstanceConnectionInfo one = new InstanceConnectionInfo(address1, 19871);
- InstanceConnectionInfo two = new InstanceConnectionInfo(address2, 19871);
- InstanceConnectionInfo three = new InstanceConnectionInfo(address3, 10871);
- InstanceConnectionInfo four = new InstanceConnectionInfo(address1, 19871);
-
- assertTrue(one.equals(four));
- assertTrue(!one.equals(two));
- assertTrue(!one.equals(three));
- assertTrue(!two.equals(three));
- assertTrue(!three.equals(four));
-
- assertTrue(one.compareTo(four) == 0);
- assertTrue(four.compareTo(one) == 0);
- assertTrue(one.compareTo(two) != 0);
- assertTrue(one.compareTo(three) != 0);
- assertTrue(two.compareTo(three) != 0);
- assertTrue(three.compareTo(four) != 0);
-
- {
- int val = one.compareTo(two);
- assertTrue(two.compareTo(one) == -val);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSerialization() {
- try {
- // without resolved hostname
- {
- InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888);
-
- InstanceConnectionInfo copy = InstantiationUtil.createCopyWritable(original);
- assertEquals(original, copy);
-
- InstanceConnectionInfo serCopy = InstantiationUtil.clone(original);
- assertEquals(original, serCopy);
- }
-
- // with resolved hostname
- {
- InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
- original.getFQDNHostname();
-
- InstanceConnectionInfo copy = InstantiationUtil.createCopyWritable(original);
- assertEquals(original, copy);
-
- InstanceConnectionInfo serCopy = InstantiationUtil.clone(original);
- assertEquals(original, serCopy);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGetFQDNHostname() {
- try {
- InstanceConnectionInfo info1 = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
- assertNotNull(info1.getFQDNHostname());
-
- InstanceConnectionInfo info2 = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888);
- assertNotNull(info2.getFQDNHostname());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGetHostname0() {
- try {
- InetAddress address = mock(InetAddress.class);
- when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com");
- when(address.getHostName()).thenReturn("worker2.cluster.mycompany.com");
- when(address.getHostAddress()).thenReturn("127.0.0.1");
-
- final InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871);
- Assert.assertEquals("worker2", info.getHostname());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGetHostname1() {
- try {
- InetAddress address = mock(InetAddress.class);
- when(address.getCanonicalHostName()).thenReturn("worker10");
- when(address.getHostName()).thenReturn("worker10");
- when(address.getHostAddress()).thenReturn("127.0.0.1");
-
- InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871);
- Assert.assertEquals("worker10", info.getHostname());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testGetHostname2() {
- try {
- final String addressString = "192.168.254.254";
-
- // we mock the addresses to save the times of the reverse name lookups
- InetAddress address = mock(InetAddress.class);
- when(address.getCanonicalHostName()).thenReturn("192.168.254.254");
- when(address.getHostName()).thenReturn("192.168.254.254");
- when(address.getHostAddress()).thenReturn("192.168.254.254");
- when(address.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, (byte) 254, (byte) 254} );
-
- InstanceConnectionInfo info = new InstanceConnectionInfo(address, 54152);
-
- assertNotNull(info.getFQDNHostname());
- assertTrue(info.getFQDNHostname().equals(addressString));
-
- assertNotNull(info.getHostname());
- assertTrue(info.getHostname().equals(addressString));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index ff5e2ab..f1ed960 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -18,24 +18,11 @@
package org.apache.flink.runtime.instance;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.UUID;
-
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
+
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -44,6 +31,18 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
/**
* Tests for {@link org.apache.flink.runtime.instance.InstanceManager}.
*/
@@ -76,13 +75,13 @@ public class InstanceManagerTest{
InetAddress address = InetAddress.getByName("127.0.0.1");
// register three instances
- InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort);
- InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 15);
- InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 30);
-
ResourceID resID1 = ResourceID.generate();
ResourceID resID2 = ResourceID.generate();
ResourceID resID3 = ResourceID.generate();
+
+ TaskManagerLocation ici1 = new TaskManagerLocation(resID1, address, dataPort);
+ TaskManagerLocation ici2 = new TaskManagerLocation(resID2, address, dataPort + 15);
+ TaskManagerLocation ici3 = new TaskManagerLocation(resID3, address, dataPort + 30);
final JavaTestKit probe1 = new JavaTestKit(system);
final JavaTestKit probe2 = new JavaTestKit(system);
@@ -99,16 +98,16 @@ public class InstanceManagerTest{
assertEquals(8, cm.getTotalNumberOfSlots());
Collection<Instance> instances = cm.getAllRegisteredInstances();
- Set<InstanceConnectionInfo> instanceConnectionInfos = new
- HashSet<InstanceConnectionInfo>();
+ Set<TaskManagerLocation> taskManagerLocations = new
+ HashSet<TaskManagerLocation>();
for(Instance instance: instances){
- instanceConnectionInfos.add(instance.getInstanceConnectionInfo());
+ taskManagerLocations.add(instance.getInstanceConnectionInfo());
}
- assertTrue(instanceConnectionInfos.contains(ici1));
- assertTrue(instanceConnectionInfos.contains(ici2));
- assertTrue(instanceConnectionInfos.contains(ici3));
+ assertTrue(taskManagerLocations.contains(ici1));
+ assertTrue(taskManagerLocations.contains(ici2));
+ assertTrue(taskManagerLocations.contains(ici3));
cm.shutdown();
}
@@ -131,7 +130,7 @@ public class InstanceManagerTest{
HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
InetAddress address = InetAddress.getByName("127.0.0.1");
- InstanceConnectionInfo ici = new InstanceConnectionInfo(address, dataPort);
+ TaskManagerLocation ici = new TaskManagerLocation(resID1, address, dataPort);
JavaTestKit probe = new JavaTestKit(system);
cm.registerTaskManager(probe.getRef(), resID1,
@@ -141,13 +140,12 @@ public class InstanceManagerTest{
assertEquals(1, cm.getTotalNumberOfSlots());
try {
- cm.registerTaskManager(probe.getRef(), resID2,
- ici, resources, 1, leaderSessionID);
+ cm.registerTaskManager(probe.getRef(), resID2, ici, resources, 1, leaderSessionID);
} catch (Exception e) {
// good
}
- // check for correct number of registerede instances
+ // check for correct number of registered instances
assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
assertEquals(1, cm.getTotalNumberOfSlots());
@@ -176,9 +174,9 @@ public class InstanceManagerTest{
InetAddress address = InetAddress.getByName("127.0.0.1");
// register three instances
- InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort);
- InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 1);
- InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 2);
+ TaskManagerLocation ici1 = new TaskManagerLocation(resID1, address, dataPort);
+ TaskManagerLocation ici2 = new TaskManagerLocation(resID2, address, dataPort + 1);
+ TaskManagerLocation ici3 = new TaskManagerLocation(resID3, address, dataPort + 2);
JavaTestKit probe1 = new JavaTestKit(system);
JavaTestKit probe2 = new JavaTestKit(system);
@@ -240,7 +238,7 @@ public class InstanceManagerTest{
ResourceID resID = ResourceID.generate();
HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
InetAddress address = InetAddress.getByName("127.0.0.1");
- InstanceConnectionInfo ici = new InstanceConnectionInfo(address, 20000);
+ TaskManagerLocation ici = new TaskManagerLocation(resID, address, 20000);
JavaTestKit probe = new JavaTestKit(system);
cm.registerTaskManager(probe.getRef(), resID,
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index faa679b..82d3723 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -25,6 +25,7 @@ import java.net.InetAddress;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.junit.Test;
/**
@@ -35,12 +36,13 @@ public class InstanceTest {
@Test
public void testAllocatingAndCancellingSlots() {
try {
+ ResourceID resourceID = ResourceID.generate();
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
- InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+ TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
- ResourceID.generate(), new InstanceID(), hardwareDescription, 4);
+ resourceID, new InstanceID(), hardwareDescription, 4);
assertEquals(4, instance.getTotalNumberOfSlots());
assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -97,12 +99,13 @@ public class InstanceTest {
@Test
public void testInstanceDies() {
try {
+ ResourceID resourceID = ResourceID.generate();
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
- InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+ TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
- ResourceID.generate(), new InstanceID(), hardwareDescription, 3);
+ resourceID, new InstanceID(), hardwareDescription, 3);
assertEquals(3, instance.getNumberOfAvailableSlots());
@@ -128,12 +131,13 @@ public class InstanceTest {
@Test
public void testCancelAllSlots() {
try {
+ ResourceID resourceID = ResourceID.generate();
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
- InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+ TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
- ResourceID.generate(), new InstanceID(), hardwareDescription, 3);
+ resourceID, new InstanceID(), hardwareDescription, 3);
assertEquals(3, instance.getNumberOfAvailableSlots());
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 459a3ed..82c2a74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.junit.Test;
import org.mockito.Matchers;
@@ -143,12 +144,13 @@ public class SimpleSlotTest {
}
public static SimpleSlot getSlot() throws Exception {
+ ResourceID resourceID = ResourceID.generate();
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
- InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);
+ TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
- ResourceID.generate(), new InstanceID(), hardwareDescription, 1);
+ resourceID, new InstanceID(), hardwareDescription, 1);
return instance.allocateSimpleSlot(new JobID());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 983d6e6..99360e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,7 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -52,7 +51,8 @@ public class SchedulerTestUtils {
throw new IllegalArgumentException();
}
- InetAddress address;
+ final ResourceID resourceID = ResourceID.generate();
+ final InetAddress address;
try {
address = InetAddress.getByName("127.0.0.1");
}
@@ -62,12 +62,12 @@ public class SchedulerTestUtils {
int dataPort = port.getAndIncrement();
- InstanceConnectionInfo ci = new InstanceConnectionInfo(address, dataPort);
+ TaskManagerLocation ci = new TaskManagerLocation(resourceID, address, dataPort);
final long GB = 1024L*1024*1024;
HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
- return new Instance(DummyActorGateway.INSTANCE, ci, ResourceID.generate(),
+ return new Instance(DummyActorGateway.INSTANCE, ci, resourceID,
new InstanceID(), resources, numSlots);
}
@@ -143,19 +143,4 @@ public class SchedulerTestUtils {
return set.size() == obj.length;
}
-
- public static boolean areSameSets(Collection<Object> set1, Collection<Object> set2) {
- if (set1 == null || set2 == null) {
- throw new IllegalArgumentException();
- }
-
- HashSet<Object> set = new HashSet<Object>(set1);
- for (Object o : set2) {
- if (!set.remove(o)) {
- return false;
- }
- }
-
- return set.isEmpty();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index ca09634..3307568 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -24,14 +24,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -83,7 +82,7 @@ public class ResourceManagerITCase extends TestLogger {
jobManager.tell(
new RegistrationMessages.RegisterTaskManager(
resourceID,
- Mockito.mock(InstanceConnectionInfo.class),
+ Mockito.mock(TaskManagerLocation.class),
null,
1),
me);
http://git-wip-us.apache.org/repos/asf/flink/blob/34cda87a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 3371c49..bda4174 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -25,6 +25,7 @@ import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -46,12 +46,12 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
-
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+
import org.junit.Test;
+
import scala.Option;
import scala.concurrent.duration.FiniteDuration;
@@ -105,7 +105,9 @@ public class TaskManagerComponentsStartupShutdownTest {
32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 0,
Option.<NettyConfig>empty(), 0, 0);
- final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
+ ResourceID taskManagerId = ResourceID.generate();
+
+ final TaskManagerLocation connectionInfo = new TaskManagerLocation(taskManagerId, InetAddress.getLocalHost(), 10000);
final MemoryManager memManager = new MemoryManager(32 * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
final IOManager ioManager = new IOManagerAsync(TMP_DIR);
@@ -130,7 +132,7 @@ public class TaskManagerComponentsStartupShutdownTest {
final Props tmProps = Props.create(
TaskManager.class,
tmConfig,
- ResourceID.generate(),
+ taskManagerId,
connectionInfo,
memManager,
ioManager,