You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:36:10 UTC
[6/8] flink git commit: [FLINK-4490] [distributed coordination] (part
3) Rename methods on 'Instance' to have more intuitive names
[FLINK-4490] [distributed coordination] (part 3) Rename methods on 'Instance' to have more intuitive names
getResourceID() --> getTaskManagerID()
getInstanceConnectionInfo() --> getTaskManagerLocation()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eac6088a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eac6088a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eac6088a
Branch: refs/heads/master
Commit: eac6088a75e813a015b778f4cfc4cce0cf2a53ce
Parents: aaa474a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 31 13:59:01 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200
----------------------------------------------------------------------
.../handlers/TaskManagersHandler.java | 2 +-
...PartialInputChannelDeploymentDescriptor.java | 21 +-
.../apache/flink/runtime/instance/Instance.java | 28 +-
.../flink/runtime/instance/InstanceManager.java | 20 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 18 +-
.../flink/runtime/jobmanager/JobManager.scala | 3 +-
.../ExecutionGraphMetricsTest.java | 404 ++++++++++---------
.../executiongraph/ExecutionGraphTestUtils.java | 2 +-
.../TerminalStateDeadlockTest.java | 3 +-
.../runtime/instance/InstanceManagerTest.java | 31 +-
.../flink/runtime/instance/InstanceTest.java | 6 +-
.../flink/runtime/instance/SharedSlotsTest.java | 24 +-
.../flink/runtime/instance/SimpleSlotTest.java | 2 +-
.../partition/SpilledSubpartitionViewTest.java | 4 +-
.../scheduler/CoLocationConstraintTest.java | 6 +-
.../ScheduleWithCoLocationHintTest.java | 22 +-
.../scheduler/SchedulerSlotSharingTest.java | 30 +-
.../scheduler/SchedulerTestUtils.java | 5 +-
.../scheduler/SlotAllocationFutureTest.java | 12 +-
19 files changed, 319 insertions(+), 324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index b60cd10..b5e9088 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -85,7 +85,7 @@ public class TaskManagersHandler implements RequestHandler {
gen.writeStartObject();
gen.writeStringField("id", instance.getId().toString());
gen.writeStringField("path", instance.getActorGateway().path());
- gen.writeNumberField("dataPort", instance.getInstanceConnectionInfo().dataPort());
+ gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
index e1391a4..0eac39d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
@@ -44,7 +44,7 @@ public class PartialInputChannelDeploymentDescriptor {
private final ResultPartitionID partitionID;
/** The partition connection info. */
- private final TaskManagerLocation partitionConnectionInfo;
+ private final TaskManagerLocation partitionTaskManagerLocation;
/** The partition connection index. */
private final int partitionConnectionIndex;
@@ -52,12 +52,12 @@ public class PartialInputChannelDeploymentDescriptor {
public PartialInputChannelDeploymentDescriptor(
IntermediateDataSetID resultId,
ResultPartitionID partitionID,
- TaskManagerLocation partitionConnectionInfo,
+ TaskManagerLocation partitionTaskManagerLocation,
int partitionConnectionIndex) {
this.resultId = checkNotNull(resultId);
this.partitionID = checkNotNull(partitionID);
- this.partitionConnectionInfo = checkNotNull(partitionConnectionInfo);
+ this.partitionTaskManagerLocation = checkNotNull(partitionTaskManagerLocation);
this.partitionConnectionIndex = partitionConnectionIndex;
}
@@ -66,23 +66,20 @@ public class PartialInputChannelDeploymentDescriptor {
*
* @see InputChannelDeploymentDescriptor
*/
- public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(
- Execution consumerExecution) {
+ public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(Execution consumerExecution) {
+ checkNotNull(consumerExecution, "consumerExecution");
- checkNotNull(consumerExecution, "Consumer execution null");
-
- TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
-
- checkNotNull(consumerConnectionInfo, "Consumer connection info null");
+ TaskManagerLocation consumerLocation = consumerExecution.getAssignedResourceLocation();
+ checkNotNull(consumerLocation, "Consumer connection info null");
final ResultPartitionLocation partitionLocation;
- if (consumerConnectionInfo.equals(partitionConnectionInfo)) {
+ if (consumerLocation.equals(partitionTaskManagerLocation)) {
partitionLocation = ResultPartitionLocation.createLocal();
}
else {
partitionLocation = ResultPartitionLocation.createRemote(
- new ConnectionID(partitionConnectionInfo, partitionConnectionIndex));
+ new ConnectionID(partitionTaskManagerLocation, partitionConnectionIndex));
}
return new InputChannelDeploymentDescriptor(partitionID, partitionLocation);
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index fe46895..4a8139b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -52,14 +52,11 @@ public class Instance implements SlotOwner {
private final ActorGateway actorGateway;
/** The instance connection information for the data transfer. */
- private final TaskManagerLocation connectionInfo;
+ private final TaskManagerLocation location;
/** A description of the resources of the task manager */
private final HardwareDescription resources;
- /** The ID identifies the resource the task manager runs on */
- private final ResourceID resourceId;
-
/** The ID identifying the taskManager. */
private final InstanceID instanceId;
@@ -90,22 +87,19 @@ public class Instance implements SlotOwner {
* Constructs an instance reflecting a registered TaskManager.
*
* @param actorGateway The actor gateway to communicate with the remote instance
- * @param connectionInfo The remote connection where the task manager receives requests.
- * @param resourceId The resource id which denotes the resource the task manager uses.
+ * @param location The remote connection where the task manager receives requests.
* @param id The id under which the taskManager is registered.
* @param resources The resources available on the machine.
* @param numberOfSlots The number of task slots offered by this taskManager.
*/
public Instance(
ActorGateway actorGateway,
- TaskManagerLocation connectionInfo,
- ResourceID resourceId,
+ TaskManagerLocation location,
InstanceID id,
HardwareDescription resources,
int numberOfSlots) {
this.actorGateway = actorGateway;
- this.connectionInfo = connectionInfo;
- this.resourceId = resourceId;
+ this.location = location;
this.instanceId = id;
this.resources = resources;
this.numberOfSlots = numberOfSlots;
@@ -120,8 +114,8 @@ public class Instance implements SlotOwner {
// Properties
// --------------------------------------------------------------------------------------------
- public ResourceID getResourceId() {
- return resourceId;
+ public ResourceID getTaskManagerID() {
+ return location.getResourceID();
}
public InstanceID getId() {
@@ -246,7 +240,7 @@ public class Instance implements SlotOwner {
return null;
}
else {
- SimpleSlot slot = new SimpleSlot(jobID, this, connectionInfo, nextSlot, actorGateway);
+ SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, actorGateway);
allocatedSlots.add(slot);
return slot;
}
@@ -284,7 +278,7 @@ public class Instance implements SlotOwner {
}
else {
SharedSlot slot = new SharedSlot(
- jobID, this, connectionInfo, nextSlot, actorGateway, sharingGroupAssignment);
+ jobID, this, location, nextSlot, actorGateway, sharingGroupAssignment);
allocatedSlots.add(slot);
return slot;
}
@@ -355,8 +349,8 @@ public class Instance implements SlotOwner {
return actorGateway;
}
- public TaskManagerLocation getInstanceConnectionInfo() {
- return connectionInfo;
+ public TaskManagerLocation getTaskManagerLocation() {
+ return location;
}
public int getNumberOfAvailableSlots() {
@@ -405,7 +399,7 @@ public class Instance implements SlotOwner {
@Override
public String toString() {
- return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
+ return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(),
numberOfSlots, (actorGateway != null ? actorGateway.path() : "No instance gateway"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index e7a4537..0c7e187 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -138,8 +138,7 @@ public class InstanceManager {
* for the job execution.
*
* @param taskManager ActorRef to the TaskManager which wants to be registered
- * @param resourceID The resource id of the TaskManager
- * @param connectionInfo ConnectionInfo of the TaskManager
+ * @param taskManagerLocation Location info of the TaskManager
* @param resources Hardware description of the TaskManager
* @param numberOfSlots Number of available slots on the TaskManager
* @param leaderSessionID The current leader session ID of the JobManager
@@ -147,12 +146,12 @@ public class InstanceManager {
*/
public InstanceID registerTaskManager(
ActorRef taskManager,
- ResourceID resourceID,
- TaskManagerLocation connectionInfo,
+ TaskManagerLocation taskManagerLocation,
HardwareDescription resources,
int numberOfSlots,
- UUID leaderSessionID){
- synchronized(this.lock){
+ UUID leaderSessionID) {
+
+ synchronized (this.lock) {
if (this.isShutdown) {
throw new IllegalStateException("InstanceManager is shut down.");
}
@@ -174,12 +173,11 @@ public class InstanceManager {
InstanceID instanceID = new InstanceID();
- Instance host = new Instance(actorGateway, connectionInfo, resourceID, instanceID,
- resources, numberOfSlots);
+ Instance host = new Instance(actorGateway, taskManagerLocation, instanceID, resources, numberOfSlots);
registeredHostsById.put(instanceID, host);
registeredHostsByConnection.put(taskManager, host);
- registeredHostsByResource.put(resourceID, host);
+ registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
totalNumberOfAliveTaskSlots += numberOfSlots;
@@ -187,7 +185,7 @@ public class InstanceManager {
LOG.info(String.format("Registered TaskManager at %s (%s) as %s. " +
"Current number of registered hosts is %d. " +
"Current number of alive task slots is %d.",
- connectionInfo.getHostname(),
+ taskManagerLocation.getHostname(),
taskManager.path(),
instanceID,
registeredHostsById.size(),
@@ -217,7 +215,7 @@ public class InstanceManager {
registeredHostsByConnection.remove(host);
registeredHostsById.remove(instance.getId());
- registeredHostsByResource.remove(instance.getResourceId());
+ registeredHostsByResource.remove(instance.getTaskManagerID());
if (terminated) {
deadHosts.add(instance.getActorGateway().actor());
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b481b55..734972d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -363,7 +363,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
// if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) {
- this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
+ this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
}
if (slot != null) {
@@ -425,7 +425,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
// if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) {
- this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
+ this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
}
if (sharedSlot != null) {
@@ -469,7 +469,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
while (this.newlyAvailableInstances.size() > 0) {
Instance queuedInstance = this.newlyAvailableInstances.poll();
if (queuedInstance != null) {
- this.instancesWithAvailableResources.put(queuedInstance.getResourceId(), queuedInstance);
+ this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);
}
}
@@ -583,7 +583,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
}
else {
- this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
+ this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
}
}
}
@@ -649,7 +649,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
instance.setSlotAvailabilityListener(this);
// store the instance in the by-host-lookup
- String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
+ String instanceHostName = instance.getTaskManagerLocation().getHostname();
Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
if (instanceSet == null) {
instanceSet = new HashSet<Instance>();
@@ -658,7 +658,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
instanceSet.add(instance);
// add it to the available resources and let potential waiters know
- this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
+ this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
// add all slots as available
for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
@@ -693,9 +693,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
allInstances.remove(instance);
- instancesWithAvailableResources.remove(instance.getResourceId());
+ instancesWithAvailableResources.remove(instance.getTaskManagerID());
- String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
+ String instanceHostName = instance.getTaskManagerLocation().getHostname();
Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
if (instanceSet != null) {
instanceSet.remove(instance);
@@ -795,7 +795,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
while ((instance = newlyAvailableInstances.poll()) != null) {
if (instance.hasResourcesAvailable()) {
- instancesWithAvailableResources.put(instance.getResourceId(), instance);
+ instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2a0ecc2..88af604 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -349,7 +349,7 @@ class JobManager(
currentResourceManager = Option(msg.resourceManager())
val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
- instance => instance.getResourceId).toList.asJava
+ instance => instance.getTaskManagerID).toList.asJava
// confirm registration and send known task managers with their resource ids
sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))
@@ -425,7 +425,6 @@ class JobManager(
try {
val instanceID = instanceManager.registerTaskManager(
taskManager,
- resourceId,
connectionInfo,
hardwareInformation,
numberOfSlots,
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d8bd6cb..d5520fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -47,8 +48,10 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import org.mockito.Matchers;
+
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
@@ -60,7 +63,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -76,200 +80,210 @@ public class ExecutionGraphMetricsTest extends TestLogger {
*/
@Test
public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException {
- // setup execution graph with mocked scheduling logic
- int parallelism = 1;
-
- JobVertex jobVertex = new JobVertex("TestVertex");
- jobVertex.setParallelism(parallelism);
- jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
- JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
-
- Configuration config = new Configuration();
- config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
-
- Configuration jobConfig = new Configuration();
-
- FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
-
- MetricRegistry metricRegistry = new MetricRegistry(config);
-
- assertTrue(metricRegistry.getReporters().size() == 1);
-
- MetricReporter reporter = metricRegistry.getReporters().get(0);
-
- assertTrue(reporter instanceof TestingReporter);
-
- TestingReporter testingReporter = (TestingReporter) reporter;
-
- MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost");
-
- Scheduler scheduler = mock(Scheduler.class);
-
- SimpleSlot simpleSlot = mock(SimpleSlot.class);
-
- Instance instance = mock(Instance.class);
-
- TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
-
- Slot rootSlot = mock(Slot.class);
-
- ActorGateway actorGateway = mock(ActorGateway.class);
-
- when(simpleSlot.isAlive()).thenReturn(true);
- when(simpleSlot.getTaskManagerID()).thenReturn(instance.getResourceId());
- when(simpleSlot.getTaskManagerLocation()).thenReturn(instance.getInstanceConnectionInfo());
- when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
- when(simpleSlot.getRoot()).thenReturn(rootSlot);
-
- when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
-
- when(instance.getInstanceConnectionInfo()).thenReturn(taskManagerLocation);
- when(instance.getActorGateway()).thenReturn(actorGateway);
- when(taskManagerLocation.getHostname()).thenReturn("localhost");
-
- when(rootSlot.getSlotNumber()).thenReturn(0);
-
- when(actorGateway.ask(Matchers.any(Object.class), Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge()));
-
- TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
-
- ExecutionGraph executionGraph = new ExecutionGraph(
- ExecutionContext$.MODULE$.fromExecutor(new ForkJoinPool()),
- jobGraph.getJobID(),
- jobGraph.getName(),
- jobConfig,
- new SerializedValue<ExecutionConfig>(null),
- timeout,
- testingRestartStrategy,
- Collections.<BlobKey>emptyList(),
- Collections.<URL>emptyList(),
- getClass().getClassLoader(),
- metricGroup);
-
- // get restarting time metric
- Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
-
- assertNotNull(metric);
- assertTrue(metric instanceof Gauge);
-
- @SuppressWarnings("unchecked")
- Gauge<Long> restartingTime = (Gauge<Long>) metric;
-
- // check that the restarting time is 0 since it's the initial start
- assertTrue(0L == restartingTime.getValue());
-
- executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
- // start execution
- executionGraph.scheduleForExecution(scheduler);
-
- assertTrue(0L == restartingTime.getValue());
-
- List<ExecutionAttemptID> executionIDs = new ArrayList<>();
-
- for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
- executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
- }
-
- // tell execution graph that the tasks are in state running --> job status switches to state running
- for (ExecutionAttemptID executionID : executionIDs) {
- executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
- }
-
- assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
- assertTrue(0L == restartingTime.getValue());
-
- // fail the job so that it goes into state restarting
- for (ExecutionAttemptID executionID : executionIDs) {
- executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
- }
-
- assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-
- long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-
- // wait some time so that the restarting time gauge shows a value different from 0
- Thread.sleep(50);
-
- long previousRestartingTime = restartingTime.getValue();
-
- // check that the restarting time is monotonically increasing
- for (int i = 0; i < 10; i++) {
- long currentRestartingTime = restartingTime.getValue();
-
- assertTrue(currentRestartingTime >= previousRestartingTime);
- previousRestartingTime = currentRestartingTime;
- }
-
- // check that we have measured some restarting time
- assertTrue(previousRestartingTime > 0);
-
- // restart job
- testingRestartStrategy.restartExecutionGraph();
-
- executionIDs.clear();
-
- for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
- executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
- }
-
- for (ExecutionAttemptID executionID : executionIDs) {
- executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
- }
-
- assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
- assertTrue(firstRestartingTimestamp != 0);
-
- previousRestartingTime = restartingTime.getValue();
-
- // check that the restarting time does not increase after we've reached the running state
- for (int i = 0; i < 10; i++) {
- long currentRestartingTime = restartingTime.getValue();
-
- assertTrue(currentRestartingTime == previousRestartingTime);
- previousRestartingTime = currentRestartingTime;
- }
-
- // fail job again
- for (ExecutionAttemptID executionID : executionIDs) {
- executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
- }
-
- assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-
- long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-
- assertTrue(firstRestartingTimestamp != secondRestartingTimestamp);
-
- Thread.sleep(50);
-
- previousRestartingTime = restartingTime.getValue();
-
- // check that the restarting time is increasing again
- for (int i = 0; i < 10; i++) {
- long currentRestartingTime = restartingTime.getValue();
-
- assertTrue(currentRestartingTime >= previousRestartingTime);
- previousRestartingTime = currentRestartingTime;
- }
-
- assertTrue(previousRestartingTime > 0);
-
- // now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
- executionGraph.fail(new Exception());
-
- assertEquals(JobStatus.FAILED, executionGraph.getState());
-
- previousRestartingTime = restartingTime.getValue();
-
- for (int i = 0; i < 10; i++) {
- long currentRestartingTime = restartingTime.getValue();
-
- assertTrue(currentRestartingTime == previousRestartingTime);
- previousRestartingTime = currentRestartingTime;
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ try {
+ // setup execution graph with mocked scheduling logic
+ int parallelism = 1;
+
+ JobVertex jobVertex = new JobVertex("TestVertex");
+ jobVertex.setParallelism(parallelism);
+ jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+ JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
+
+ Configuration config = new Configuration();
+ config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
+
+ Configuration jobConfig = new Configuration();
+
+ FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ MetricRegistry metricRegistry = new MetricRegistry(config);
+
+ assertTrue(metricRegistry.getReporters().size() == 1);
+
+ MetricReporter reporter = metricRegistry.getReporters().get(0);
+
+ assertTrue(reporter instanceof TestingReporter);
+
+ TestingReporter testingReporter = (TestingReporter) reporter;
+
+ MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost");
+
+ Scheduler scheduler = mock(Scheduler.class);
+
+ ResourceID taskManagerId = ResourceID.generate();
+
+ TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+ when(taskManagerLocation.getResourceID()).thenReturn(taskManagerId);
+ when(taskManagerLocation.getHostname()).thenReturn("localhost");
+
+ ActorGateway actorGateway = mock(ActorGateway.class);
+
+ Instance instance = mock(Instance.class);
+ when(instance.getTaskManagerLocation()).thenReturn(taskManagerLocation);
+ when(instance.getTaskManagerID()).thenReturn(taskManagerId);
+ when(instance.getActorGateway()).thenReturn(actorGateway);
+
+ Slot rootSlot = mock(Slot.class);
+
+ SimpleSlot simpleSlot = mock(SimpleSlot.class);
+ when(simpleSlot.isAlive()).thenReturn(true);
+ when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
+ when(simpleSlot.getTaskManagerID()).thenReturn(taskManagerId);
+ when(simpleSlot.getTaskManagerActorGateway()).thenReturn(actorGateway);
+ when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
+ when(simpleSlot.getRoot()).thenReturn(rootSlot);
+
+ when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
+
+
+
+ when(rootSlot.getSlotNumber()).thenReturn(0);
+
+ when(actorGateway.ask(Matchers.any(Object.class), Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge()));
+
+ TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
+
+ ExecutionGraph executionGraph = new ExecutionGraph(
+ ExecutionContext$.MODULE$.fromExecutor(executor),
+ jobGraph.getJobID(),
+ jobGraph.getName(),
+ jobConfig,
+ new SerializedValue<ExecutionConfig>(null),
+ timeout,
+ testingRestartStrategy,
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ getClass().getClassLoader(),
+ metricGroup);
+
+ // get restarting time metric
+ Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
+
+ assertNotNull(metric);
+ assertTrue(metric instanceof Gauge);
+
+ @SuppressWarnings("unchecked")
+ Gauge<Long> restartingTime = (Gauge<Long>) metric;
+
+ // check that the restarting time is 0 since it's the initial start
+ assertTrue(0L == restartingTime.getValue());
+
+ executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ // start execution
+ executionGraph.scheduleForExecution(scheduler);
+
+ assertTrue(0L == restartingTime.getValue());
+
+ List<ExecutionAttemptID> executionIDs = new ArrayList<>();
+
+ for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
+ executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
+ }
+
+ // tell execution graph that the tasks are in state running --> job status switches to state running
+ for (ExecutionAttemptID executionID : executionIDs) {
+ executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
+ }
+
+ assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+ assertTrue(0L == restartingTime.getValue());
+
+ // fail the job so that it goes into state restarting
+ for (ExecutionAttemptID executionID : executionIDs) {
+ executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
+ }
+
+ assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+ long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
+
+ // wait some time so that the restarting time gauge shows a value different from 0
+ Thread.sleep(50);
+
+ long previousRestartingTime = restartingTime.getValue();
+
+ // check that the restarting time is monotonically increasing
+ for (int i = 0; i < 10; i++) {
+ long currentRestartingTime = restartingTime.getValue();
+
+ assertTrue(currentRestartingTime >= previousRestartingTime);
+ previousRestartingTime = currentRestartingTime;
+ }
+
+ // check that we have measured some restarting time
+ assertTrue(previousRestartingTime > 0);
+
+ // restart job
+ testingRestartStrategy.restartExecutionGraph();
+
+ executionIDs.clear();
+
+ for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) {
+ executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
+ }
+
+ for (ExecutionAttemptID executionID : executionIDs) {
+ executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
+ }
+
+ assertEquals(JobStatus.RUNNING, executionGraph.getState());
+
+ assertTrue(firstRestartingTimestamp != 0);
+
+ previousRestartingTime = restartingTime.getValue();
+
+ // check that the restarting time does not increase after we've reached the running state
+ for (int i = 0; i < 10; i++) {
+ long currentRestartingTime = restartingTime.getValue();
+
+ assertTrue(currentRestartingTime == previousRestartingTime);
+ previousRestartingTime = currentRestartingTime;
+ }
+
+ // fail job again
+ for (ExecutionAttemptID executionID : executionIDs) {
+ executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception()));
+ }
+
+ assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+ long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
+
+ assertTrue(firstRestartingTimestamp != secondRestartingTimestamp);
+
+ Thread.sleep(50);
+
+ previousRestartingTime = restartingTime.getValue();
+
+ // check that the restarting time is increasing again
+ for (int i = 0; i < 10; i++) {
+ long currentRestartingTime = restartingTime.getValue();
+
+ assertTrue(currentRestartingTime >= previousRestartingTime);
+ previousRestartingTime = currentRestartingTime;
+ }
+
+ assertTrue(previousRestartingTime > 0);
+
+ // now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
+ executionGraph.fail(new Exception());
+
+ assertEquals(JobStatus.FAILED, executionGraph.getState());
+
+ previousRestartingTime = restartingTime.getValue();
+
+ for (int i = 0; i < 10; i++) {
+ long currentRestartingTime = restartingTime.getValue();
+
+ assertTrue(currentRestartingTime == previousRestartingTime);
+ previousRestartingTime = currentRestartingTime;
+ }
+ } finally {
+ executor.shutdownNow();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index cddb6cb..df47c3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -113,7 +113,7 @@ public class ExecutionGraphTestUtils {
InetAddress address = InetAddress.getByName("127.0.0.1");
TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
- return new Instance(gateway, connection, resourceID, new InstanceID(), hardwareDescription, numberOfSlots);
+ return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots);
}
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index a71faf6..870ae05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -84,8 +84,7 @@ public class TerminalStateDeadlockTest {
TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345);
HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000);
- Instance instance = new Instance(DummyActorGateway.INSTANCE, ci,
- resourceId, new InstanceID(), resources, 4);
+ Instance instance = new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4);
this.resource = instance.allocateSimpleSlot(new JobID());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index f1ed960..f3747c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -87,12 +87,9 @@ public class InstanceManagerTest{
final JavaTestKit probe2 = new JavaTestKit(system);
final JavaTestKit probe3 = new JavaTestKit(system);
- cm.registerTaskManager(probe1.getRef(), resID1,
- ici1, hardwareDescription, 1, leaderSessionID);
- cm.registerTaskManager(probe2.getRef(), resID2,
- ici2, hardwareDescription, 2, leaderSessionID);
- cm.registerTaskManager(probe3.getRef(), resID3,
- ici3, hardwareDescription, 5, leaderSessionID);
+ cm.registerTaskManager(probe1.getRef(), ici1, hardwareDescription, 1, leaderSessionID);
+ cm.registerTaskManager(probe2.getRef(), ici2, hardwareDescription, 2, leaderSessionID);
+ cm.registerTaskManager(probe3.getRef(), ici3, hardwareDescription, 5, leaderSessionID);
assertEquals(3, cm.getNumberOfRegisteredTaskManagers());
assertEquals(8, cm.getTotalNumberOfSlots());
@@ -102,7 +99,7 @@ public class InstanceManagerTest{
HashSet<TaskManagerLocation>();
for(Instance instance: instances){
- taskManagerLocations.add(instance.getInstanceConnectionInfo());
+ taskManagerLocations.add(instance.getTaskManagerLocation());
}
assertTrue(taskManagerLocations.contains(ici1));
@@ -133,14 +130,13 @@ public class InstanceManagerTest{
TaskManagerLocation ici = new TaskManagerLocation(resID1, address, dataPort);
JavaTestKit probe = new JavaTestKit(system);
- cm.registerTaskManager(probe.getRef(), resID1,
- ici, resources, 1, leaderSessionID);
+ cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
assertEquals(1, cm.getTotalNumberOfSlots());
try {
- cm.registerTaskManager(probe.getRef(), resID2, ici, resources, 1, leaderSessionID);
+ cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
} catch (Exception e) {
// good
}
@@ -182,12 +178,12 @@ public class InstanceManagerTest{
JavaTestKit probe2 = new JavaTestKit(system);
JavaTestKit probe3 = new JavaTestKit(system);
- InstanceID instanceID1 = cm.registerTaskManager(probe1.getRef(), resID1,
- ici1, hardwareDescription, 1, leaderSessionID);
- InstanceID instanceID2 = cm.registerTaskManager(probe2.getRef(), resID2,
- ici2, hardwareDescription, 1, leaderSessionID);
- InstanceID instanceID3 = cm.registerTaskManager(probe3.getRef(), resID3,
- ici3, hardwareDescription, 1, leaderSessionID);
+ InstanceID instanceID1 = cm.registerTaskManager(
+ probe1.getRef(), ici1, hardwareDescription, 1, leaderSessionID);
+ InstanceID instanceID2 = cm.registerTaskManager(
+ probe2.getRef(), ici2, hardwareDescription, 1, leaderSessionID);
+ InstanceID instanceID3 = cm.registerTaskManager(
+ probe3.getRef(), ici3, hardwareDescription, 1, leaderSessionID);
// report some immediate heart beats
assertTrue(cm.reportHeartBeat(instanceID1, new byte[] {}));
@@ -241,8 +237,7 @@ public class InstanceManagerTest{
TaskManagerLocation ici = new TaskManagerLocation(resID, address, 20000);
JavaTestKit probe = new JavaTestKit(system);
- cm.registerTaskManager(probe.getRef(), resID,
- ici, resources, 1, leaderSessionID);
+ cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID);
fail("Should raise exception in shutdown state");
}
catch (IllegalStateException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 82d3723..aee62fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -42,7 +42,7 @@ public class InstanceTest {
TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
- resourceID, new InstanceID(), hardwareDescription, 4);
+ new InstanceID(), hardwareDescription, 4);
assertEquals(4, instance.getTotalNumberOfSlots());
assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -105,7 +105,7 @@ public class InstanceTest {
TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
- resourceID, new InstanceID(), hardwareDescription, 3);
+ new InstanceID(), hardwareDescription, 3);
assertEquals(3, instance.getNumberOfAvailableSlots());
@@ -137,7 +137,7 @@ public class InstanceTest {
TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
- resourceID, new InstanceID(), hardwareDescription, 3);
+ new InstanceID(), hardwareDescription, 3);
assertEquals(3, instance.getNumberOfAvailableSlots());
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 2c40e89..0edef5e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -132,7 +132,7 @@ public class SharedSlotsTest {
assertEquals(Locality.LOCAL, sub1.getLocality());
assertEquals(1, sub1.getNumberLeaves());
assertEquals(vid1, sub1.getGroupID());
- assertEquals(instance.getResourceId(), sub1.getTaskManagerID());
+ assertEquals(instance.getTaskManagerID(), sub1.getTaskManagerID());
assertEquals(jobId, sub1.getJobID());
assertEquals(sharedSlot, sub1.getParent());
assertEquals(sharedSlot, sub1.getRoot());
@@ -151,7 +151,7 @@ public class SharedSlotsTest {
assertEquals(Locality.UNCONSTRAINED, sub2.getLocality());
assertEquals(1, sub2.getNumberLeaves());
assertEquals(vid2, sub2.getGroupID());
- assertEquals(instance.getResourceId(), sub2.getTaskManagerID());
+ assertEquals(instance.getTaskManagerID(), sub2.getTaskManagerID());
assertEquals(jobId, sub2.getJobID());
assertEquals(sharedSlot, sub2.getParent());
assertEquals(sharedSlot, sub2.getRoot());
@@ -163,14 +163,14 @@ public class SharedSlotsTest {
assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3));
assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4));
- SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(instance.getInstanceConnectionInfo()));
+ SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(instance.getTaskManagerLocation()));
assertNotNull(sub3);
assertNull(sub3.getExecutedVertex());
assertEquals(Locality.LOCAL, sub3.getLocality());
assertEquals(1, sub3.getNumberLeaves());
assertEquals(vid3, sub3.getGroupID());
- assertEquals(instance.getResourceId(), sub3.getTaskManagerID());
+ assertEquals(instance.getTaskManagerID(), sub3.getTaskManagerID());
assertEquals(jobId, sub3.getJobID());
assertEquals(sharedSlot, sub3.getParent());
assertEquals(sharedSlot, sub3.getRoot());
@@ -183,14 +183,14 @@ public class SharedSlotsTest {
assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4));
SimpleSlot sub4 = assignment.getSlotForTask(vid4,
- Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getInstanceConnectionInfo()));
+ Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getTaskManagerLocation()));
assertNotNull(sub4);
assertNull(sub4.getExecutedVertex());
assertEquals(Locality.NON_LOCAL, sub4.getLocality());
assertEquals(1, sub4.getNumberLeaves());
assertEquals(vid4, sub4.getGroupID());
- assertEquals(instance.getResourceId(), sub4.getTaskManagerID());
+ assertEquals(instance.getTaskManagerID(), sub4.getTaskManagerID());
assertEquals(jobId, sub4.getJobID());
assertEquals(sharedSlot, sub4.getParent());
assertEquals(sharedSlot, sub4.getRoot());
@@ -456,7 +456,7 @@ public class SharedSlotsTest {
assertNotNull(constraint.getSharedSlot());
assertTrue(constraint.isAssigned());
assertTrue(constraint.isAssignedAndAlive());
- assertEquals(instance.getInstanceConnectionInfo(), constraint.getLocation());
+ assertEquals(instance.getTaskManagerLocation(), constraint.getLocation());
SimpleSlot tailSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet());
@@ -475,7 +475,7 @@ public class SharedSlotsTest {
assertTrue(tailSlot.isReleased());
assertTrue(constraint.isAssigned());
assertFalse(constraint.isAssignedAndAlive());
- assertEquals(instance.getInstanceConnectionInfo(), constraint.getLocation());
+ assertEquals(instance.getTaskManagerLocation(), constraint.getLocation());
// we should have resources again for the co-location constraint
assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
@@ -488,10 +488,10 @@ public class SharedSlotsTest {
assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
// verify some basic properties of the slots
- assertEquals(instance.getResourceId(), sourceSlot.getTaskManagerID());
- assertEquals(instance.getResourceId(), headSlot.getTaskManagerID());
- assertEquals(instance.getResourceId(), tailSlot.getTaskManagerID());
- assertEquals(instance.getResourceId(), sinkSlot.getTaskManagerID());
+ assertEquals(instance.getTaskManagerID(), sourceSlot.getTaskManagerID());
+ assertEquals(instance.getTaskManagerID(), headSlot.getTaskManagerID());
+ assertEquals(instance.getTaskManagerID(), tailSlot.getTaskManagerID());
+ assertEquals(instance.getTaskManagerID(), sinkSlot.getTaskManagerID());
assertEquals(sourceId, sourceSlot.getGroupID());
assertEquals(sinkId, sinkSlot.getGroupID());
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 82c2a74..c690d36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -150,7 +150,7 @@ public class SimpleSlotTest {
TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001);
Instance instance = new Instance(DummyActorGateway.INSTANCE, connection,
- resourceID, new InstanceID(), hardwareDescription, 1);
+ new InstanceID(), hardwareDescription, 1);
return instance.allocateSimpleSlot(new JobID());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index fff7bc6..5722cac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.partition;
-import com.google.common.collect.Lists;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -36,6 +35,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -121,7 +121,7 @@ public class SpilledSubpartitionViewTest {
}
}
- final List<Future<Boolean>> results = Lists.newArrayList();
+ final List<Future<Boolean>> results = new ArrayList<>();
// Submit the consuming tasks
for (ResultSubpartitionView view : readers) {
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 3bd4368..1344aef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -140,7 +140,7 @@ public class CoLocationConstraintTest {
// now, the location is assigned and we have a location
assertTrue(constraint.isAssigned());
assertTrue(constraint.isAssignedAndAlive());
- assertEquals(instance2, constraint.getLocation());
+ assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
// release the slot
slot2_1.releaseSlot();
@@ -148,7 +148,7 @@ public class CoLocationConstraintTest {
// we should still have a location
assertTrue(constraint.isAssigned());
assertFalse(constraint.isAssignedAndAlive());
- assertEquals(instance2, constraint.getLocation());
+ assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
// we can not assign a different location
try {
@@ -167,7 +167,7 @@ public class CoLocationConstraintTest {
assertTrue(constraint.isAssigned());
assertTrue(constraint.isAssignedAndAlive());
- assertEquals(instance2, constraint.getLocation());
+ assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 5b7d18a..eab4fea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -326,8 +326,8 @@ public class ScheduleWithCoLocationHintTest {
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
- TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
- TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+ TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+ TaskManagerLocation loc2 = i2.getTaskManagerLocation();
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
@@ -398,8 +398,8 @@ public class ScheduleWithCoLocationHintTest {
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
- TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
- TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+ TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+ TaskManagerLocation loc2 = i2.getTaskManagerLocation();
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
@@ -425,8 +425,8 @@ public class ScheduleWithCoLocationHintTest {
SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
// still preserves the previous instance mapping)
- assertEquals(i1.getResourceId(), s3.getTaskManagerID());
- assertEquals(i2.getResourceId(), s4.getTaskManagerID());
+ assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
+ assertEquals(i2.getTaskManagerID(), s4.getTaskManagerID());
s3.releaseSlot();
s4.releaseSlot();
@@ -455,8 +455,8 @@ public class ScheduleWithCoLocationHintTest {
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
- TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
- TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+ TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+ TaskManagerLocation loc2 = i2.getTaskManagerLocation();
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
@@ -516,7 +516,7 @@ public class ScheduleWithCoLocationHintTest {
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
- TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
+ TaskManagerLocation loc1 = i1.getTaskManagerLocation();
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
@@ -580,8 +580,8 @@ public class ScheduleWithCoLocationHintTest {
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
- TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
- TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+ TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+ TaskManagerLocation loc2 = i2.getTaskManagerLocation();
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index a683834..fd0523b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -108,10 +108,10 @@ public class SchedulerSlotSharingTest {
// make sure we have two slots on the first instance, and two on the second
int c = 0;
- c += (s5.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1;
- c += (s6.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1;
- c += (s7.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1;
- c += (s8.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1;
+ c += (s5.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+ c += (s6.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+ c += (s7.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+ c += (s8.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
assertEquals(0, c);
// release all
@@ -637,8 +637,8 @@ public class SchedulerSlotSharingTest {
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
- TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
- TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+ TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+ TaskManagerLocation loc2 = i2.getTaskManagerLocation();
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(i1);
@@ -690,8 +690,8 @@ public class SchedulerSlotSharingTest {
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
- TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
- TaskManagerLocation loc2 = i2.getInstanceConnectionInfo();
+ TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+ TaskManagerLocation loc2 = i2.getTaskManagerLocation();
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(i1);
@@ -743,7 +743,7 @@ public class SchedulerSlotSharingTest {
Instance i1 = getRandomInstance(2);
Instance i2 = getRandomInstance(2);
- TaskManagerLocation loc1 = i1.getInstanceConnectionInfo();
+ TaskManagerLocation loc1 = i1.getTaskManagerLocation();
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
scheduler.newInstanceAvailable(i1);
@@ -771,12 +771,12 @@ public class SchedulerSlotSharingTest {
assertEquals(0, i1.getNumberOfAvailableSlots());
assertEquals(0, i2.getNumberOfAvailableSlots());
- assertEquals(i1.getResourceId(), s1.getTaskManagerID());
- assertEquals(i1.getResourceId(), s2.getTaskManagerID());
- assertEquals(i1.getResourceId(), s3.getTaskManagerID());
- assertEquals(i1.getResourceId(), s4.getTaskManagerID());
- assertEquals(i2.getResourceId(), s5.getTaskManagerID());
- assertEquals(i2.getResourceId(), s6.getTaskManagerID());
+ assertEquals(i1.getTaskManagerID(), s1.getTaskManagerID());
+ assertEquals(i1.getTaskManagerID(), s2.getTaskManagerID());
+ assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
+ assertEquals(i1.getTaskManagerID(), s4.getTaskManagerID());
+ assertEquals(i2.getTaskManagerID(), s5.getTaskManagerID());
+ assertEquals(i2.getTaskManagerID(), s6.getTaskManagerID());
// check the scheduler's bookkeeping
assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index eef27a8..d040ec4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -69,8 +69,7 @@ public class SchedulerTestUtils {
final long GB = 1024L*1024*1024;
HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
- return new Instance(DummyActorGateway.INSTANCE, ci, resourceID,
- new InstanceID(), resources, numSlots);
+ return new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, numSlots);
}
@@ -88,7 +87,7 @@ public class SchedulerTestUtils {
public static Execution getTestVertex(Instance... preferredInstances) {
List<TaskManagerLocation> locations = new ArrayList<>(preferredInstances.length);
for (Instance i : preferredInstances) {
- locations.add(i.getInstanceConnectionInfo());
+ locations.add(i.getTaskManagerLocation());
}
return getTestVertex(locations);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
index d9c100c..ea0d2cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
@@ -56,9 +56,9 @@ public class SlotAllocationFutureTest {
final Instance instance2 = SchedulerTestUtils.getRandomInstance(1);
final SimpleSlot slot1 = new SimpleSlot(new JobID(), instance1,
- instance1.getInstanceConnectionInfo(), 0, instance1.getActorGateway(), null, null);
+ instance1.getTaskManagerLocation(), 0, instance1.getActorGateway(), null, null);
final SimpleSlot slot2 = new SimpleSlot(new JobID(), instance2,
- instance2.getInstanceConnectionInfo(), 0, instance2.getActorGateway(), null, null);
+ instance2.getTaskManagerLocation(), 0, instance2.getActorGateway(), null, null);
future.setSlot(slot1);
try {
@@ -85,7 +85,7 @@ public class SlotAllocationFutureTest {
final Instance instance = SchedulerTestUtils.getRandomInstance(1);
final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
- instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null);
+ instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
SlotAllocationFuture future = new SlotAllocationFuture();
@@ -108,7 +108,7 @@ public class SlotAllocationFutureTest {
final Instance instance = SchedulerTestUtils.getRandomInstance(1);
final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
- instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null);
+ instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
SlotAllocationFuture future = new SlotAllocationFuture();
future.setSlot(thisSlot);
@@ -141,7 +141,7 @@ public class SlotAllocationFutureTest {
final Instance instance = SchedulerTestUtils.getRandomInstance(1);
final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
- instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null);
+ instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
final SlotAllocationFuture future = new SlotAllocationFuture();
@@ -181,7 +181,7 @@ public class SlotAllocationFutureTest {
final Instance instance = SchedulerTestUtils.getRandomInstance(1);
final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance,
- instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null);
+ instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
final SlotAllocationFuture future = new SlotAllocationFuture();
future.setSlot(thisSlot);