You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/04 21:10:49 UTC
[2/2] flink git commit: [FLINK-4459] [distributed runtime] Introduce
SlotProvider for Scheduler
[FLINK-4459] [distributed runtime] Introduce SlotProvider for Scheduler
This closes #2424
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e40f590
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e40f590
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e40f590
Branch: refs/heads/master
Commit: 6e40f59015dcdb8529691318e8f5a33e831252b8
Parents: 502a79d
Author: Kurt Young <yk...@gmail.com>
Authored: Fri Aug 26 17:51:40 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Sep 4 23:09:59 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 15 +-
.../runtime/executiongraph/ExecutionGraph.java | 32 +-
.../executiongraph/ExecutionJobVertex.java | 7 +-
.../runtime/executiongraph/ExecutionVertex.java | 8 +-
.../flink/runtime/instance/SlotProvider.java | 48 +++
.../runtime/jobmanager/scheduler/Scheduler.java | 27 +-
.../ExecutionGraphMetricsTest.java | 8 +-
.../ExecutionVertexSchedulingTest.java | 28 +-
.../TerminalStateDeadlockTest.java | 27 +-
.../ScheduleWithCoLocationHintTest.java | 303 +++++++++++--------
.../scheduler/SchedulerIsolatedTasksTest.java | 45 ++-
.../scheduler/SchedulerSlotSharingTest.java | 230 +++++++-------
12 files changed, 430 insertions(+), 348 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 846df49..6826365 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -34,13 +34,13 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -271,15 +271,15 @@ public class Execution {
* to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
* error sets the vertex state to failed and triggers the recovery logic.
*
- * @param scheduler The scheduler to use to schedule this execution attempt.
+ * @param slotProvider The slot provider to use to allocate slot for this execution attempt.
* @param queued Flag to indicate whether the scheduler may queue this task if it cannot
* immediately deploy it.
*
* @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
* @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
*/
- public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
- if (scheduler == null) {
+ public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+ if (slotProvider == null) {
throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
}
@@ -299,9 +299,8 @@ public class Execution {
// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
// in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
+ final SlotAllocationFuture future = slotProvider.allocateSlot(toSchedule, queued);
if (queued) {
- SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
-
future.setFutureAction(new SlotAllocationFutureAction() {
@Override
public void slotAllocated(SimpleSlot slot) {
@@ -319,7 +318,7 @@ public class Execution {
});
}
else {
- SimpleSlot slot = scheduler.scheduleImmediately(toSchedule);
+ SimpleSlot slot = future.get();
try {
deployToSlot(slot);
}
@@ -560,7 +559,7 @@ public class Execution {
public Boolean call() throws Exception {
try {
consumerVertex.scheduleForExecution(
- consumerVertex.getExecutionGraph().getScheduler(),
+ consumerVertex.getExecutionGraph().getSlotProvider(),
consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
} catch (Throwable t) {
consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 92cab41..585e9f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -47,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
@@ -197,8 +197,8 @@ public class ExecutionGraph {
// ------ Fields that are relevant to the execution and need to be cleared before archiving -------
- /** The scheduler to use for scheduling new tasks as they are needed */
- private Scheduler scheduler;
+ /** The slot provider to use for allocating slots for tasks as they are needed */
+ private SlotProvider slotProvider;
/** Strategy to use for restarts */
private RestartStrategy restartStrategy;
@@ -470,8 +470,8 @@ public class ExecutionGraph {
return jsonPlan;
}
- public Scheduler getScheduler() {
- return scheduler;
+ public SlotProvider getSlotProvider() {
+ return slotProvider;
}
public JobID getJobID() {
@@ -670,17 +670,17 @@ public class ExecutionGraph {
}
}
- public void scheduleForExecution(Scheduler scheduler) throws JobException {
- if (scheduler == null) {
+ public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
+ if (slotProvider == null) {
throw new IllegalArgumentException("Scheduler must not be null.");
}
- if (this.scheduler != null && this.scheduler != scheduler) {
- throw new IllegalArgumentException("Cannot use different schedulers for the same job");
+ if (this.slotProvider != null && this.slotProvider != slotProvider) {
+ throw new IllegalArgumentException("Cannot use different slot providers for the same job");
}
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
- this.scheduler = scheduler;
+ this.slotProvider = slotProvider;
switch (scheduleMode) {
@@ -688,14 +688,14 @@ public class ExecutionGraph {
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
- ejv.scheduleAll(scheduler, allowQueuedScheduling);
+ ejv.scheduleAll(slotProvider, allowQueuedScheduling);
}
}
break;
case EAGER:
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
- ejv.scheduleAll(scheduler, allowQueuedScheduling);
+ ejv.scheduleAll(slotProvider, allowQueuedScheduling);
}
break;
@@ -850,8 +850,8 @@ public class ExecutionGraph {
throw new IllegalStateException("Can only restart job from state restarting.");
}
- if (scheduler == null) {
- throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
+ if (slotProvider == null) {
+ throw new IllegalStateException("The execution graph has not been scheduled before - slotProvider is null.");
}
this.currentExecutions.clear();
@@ -885,7 +885,7 @@ public class ExecutionGraph {
}
}
- scheduleForExecution(scheduler);
+ scheduleForExecution(slotProvider);
}
catch (Throwable t) {
fail(t);
@@ -917,7 +917,7 @@ public class ExecutionGraph {
// clear the non-serializable fields
restartStrategy = null;
- scheduler = null;
+ slotProvider = null;
checkpointCoordinator = null;
executionContext = null;
kvStateLocationRegistry = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index d3dc8fe..1ac9522 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -37,7 +38,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.Preconditions;
@@ -289,12 +289,13 @@ public class ExecutionJobVertex {
// Actions
//---------------------------------------------------------------------------------------------
- public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
+ public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+
ExecutionVertex[] vertices = this.taskVertices;
// kick off the tasks
for (ExecutionVertex ev : vertices) {
- ev.scheduleForExecution(scheduler, queued);
+ ev.scheduleForExecution(slotProvider, queued);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 88e1b88..a8d5ee4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -40,12 +41,11 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.util.SerializedValue;
-
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
+
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
@@ -443,8 +443,8 @@ public class ExecutionVertex {
}
}
- public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
- return this.currentExecution.scheduleForExecution(scheduler, queued);
+ public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
+ return this.currentExecution.scheduleForExecution(slotProvider, queued);
}
public void deployToSlot(SimpleSlot slot) throws JobException {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
new file mode 100644
index 0000000..b2c23a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
+
+/**
+ * The slot provider is responsible for preparing slots for ready-to-run tasks.
+ *
+ * <p>It supports two allocating modes:
+ * <ul>
+ * <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
+ * {@link SlotAllocationFuture#get()} to get the allocated slot.</li>
+ * <li>Queued allocating: A request for a task slot is queued and returns a future that will be
+ * fulfilled as soon as a slot becomes available.</li>
+ * </ul>
+ */
+public interface SlotProvider {
+
+ /**
+ * Allocating slot with specific requirement.
+ *
+ * @param task The task to allocate the slot for
+ * @param allowQueued Whether allow the task be queued if we do not have enough resource
+ * @return The future of the allocation
+ *
+ * @throws NoResourceAvailableException
+ */
+ SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 734972d..c9cdd00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.instance.SharedSlot;
@@ -65,7 +66,7 @@ import scala.concurrent.ExecutionContext;
* fulfilled as soon as a slot becomes available.</li>
* </ul>
*/
-public class Scheduler implements InstanceListener, SlotAvailabilityListener {
+public class Scheduler implements InstanceListener, SlotAvailabilityListener, SlotProvider {
/** Scheduler-wide logger */
private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
@@ -129,30 +130,24 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
// ------------------------------------------------------------------------
// Scheduling
// ------------------------------------------------------------------------
-
- public SimpleSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
- Object ret = scheduleTask(task, false);
- if (ret instanceof SimpleSlot) {
- return (SimpleSlot) ret;
- }
- else {
- throw new RuntimeException();
- }
- }
-
- public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
- Object ret = scheduleTask(task, true);
+
+
+ @Override
+ public SlotAllocationFuture allocateSlot(ScheduledUnit task, boolean allowQueued)
+ throws NoResourceAvailableException {
+
+ final Object ret = scheduleTask(task, allowQueued);
if (ret instanceof SimpleSlot) {
return new SlotAllocationFuture((SimpleSlot) ret);
}
- if (ret instanceof SlotAllocationFuture) {
+ else if (ret instanceof SlotAllocationFuture) {
return (SlotAllocationFuture) ret;
}
else {
throw new RuntimeException();
}
}
-
+
/**
* Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link SlotAllocationFuture}.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d5520fd..aa5925f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
@@ -70,8 +71,8 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+
+import static org.mockito.Mockito.*;
public class ExecutionGraphMetricsTest extends TestLogger {
@@ -135,7 +136,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
when(simpleSlot.getRoot()).thenReturn(rootSlot);
- when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
+ when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean()))
+ .thenReturn(new SlotAllocationFuture(simpleSlot));
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 5e9ee33..c576ce5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -18,25 +18,29 @@
package org.apache.flink.runtime.executiongraph;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
+import org.junit.Test;
import org.mockito.Matchers;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class ExecutionVertexSchedulingTest {
@Test
@@ -54,7 +58,8 @@ public class ExecutionVertexSchedulingTest {
assertTrue(slot.isReleased());
Scheduler scheduler = mock(Scheduler.class);
- when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
+ when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
+ .thenReturn(new SlotAllocationFuture(slot));
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
@@ -86,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
final SlotAllocationFuture future = new SlotAllocationFuture();
Scheduler scheduler = mock(Scheduler.class);
- when(scheduler.scheduleQueued(Matchers.any(ScheduledUnit.class))).thenReturn(future);
+ when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
@@ -117,7 +122,8 @@ public class ExecutionVertexSchedulingTest {
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
Scheduler scheduler = mock(Scheduler.class);
- when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(slot);
+ when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean()))
+ .thenReturn(new SlotAllocationFuture(slot));
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -130,4 +136,4 @@ public class ExecutionVertexSchedulingTest {
fail(e.getMessage());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 870ae05..4cae7c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -54,12 +55,12 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class TerminalStateDeadlockTest {
-
+
private final Field stateField;
private final Field resourceField;
private final Field execGraphStateField;
- private final Field execGraphSchedulerField;
-
+ private final Field execGraphSlotProviderField;
+
private final SimpleSlot resource;
@@ -75,8 +76,8 @@ public class TerminalStateDeadlockTest {
this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
this.execGraphStateField.setAccessible(true);
- this.execGraphSchedulerField = ExecutionGraph.class.getDeclaredField("scheduler");
- this.execGraphSchedulerField.setAccessible(true);
+ this.execGraphSlotProviderField = ExecutionGraph.class.getDeclaredField("slotProvider");
+ this.execGraphSlotProviderField.setAccessible(true);
// the dummy resource
ResourceID resourceId = ResourceID.generate();
@@ -96,11 +97,9 @@ public class TerminalStateDeadlockTest {
throw new RuntimeException();
}
}
-
-
-
+
// ------------------------------------------------------------------------
-
+
@Test
public void testProvokeDeadlock() {
try {
@@ -135,7 +134,7 @@ public class TerminalStateDeadlockTest {
initializeExecution(e2);
execGraphStateField.set(eg, JobStatus.FAILING);
- execGraphSchedulerField.set(eg, scheduler);
+ execGraphSlotProviderField.set(eg, scheduler);
Runnable r1 = new Runnable() {
@Override
@@ -173,12 +172,10 @@ public class TerminalStateDeadlockTest {
static class TestExecGraph extends ExecutionGraph {
- private static final long serialVersionUID = -7606144898417942044L;
-
private static final Configuration EMPTY_CONFIG = new Configuration();
private static final FiniteDuration TIMEOUT = new FiniteDuration(30, TimeUnit.SECONDS);
-
+
private volatile boolean done;
TestExecGraph(JobID jobId) throws IOException {
@@ -193,14 +190,14 @@ public class TerminalStateDeadlockTest {
}
@Override
- public void scheduleForExecution(Scheduler scheduler) {
+ public void scheduleForExecution(SlotProvider slotProvider) {
// notify that we are done with the "restarting"
synchronized (this) {
done = true;
this.notifyAll();
}
}
-
+
public void waitTillDone() {
try {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index eab4fea..b803702 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -63,18 +63,18 @@ public class ScheduleWithCoLocationHintTest {
CoLocationConstraint c6 = new CoLocationConstraint(ccg);
// schedule 4 tasks from the first vertex group
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1));
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2));
- SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3));
- SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4));
- SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1));
- SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2));
- SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3));
- SimpleSlot s8 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5));
- SimpleSlot s9 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6));
- SimpleSlot s10 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4));
- SimpleSlot s11 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5));
- SimpleSlot s12 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6));
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get();
+ SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get();
+ SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get();
+ SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get();
+ SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get();
+ SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get();
+ SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get();
+ SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get();
assertNotNull(s1);
assertNotNull(s2);
@@ -109,22 +109,22 @@ public class ScheduleWithCoLocationHintTest {
assertEquals(s4.getTaskManagerID(), s10.getTaskManagerID());
assertEquals(s8.getTaskManagerID(), s11.getTaskManagerID());
assertEquals(s9.getTaskManagerID(), s12.getTaskManagerID());
-
+
assertEquals(c1.getLocation(), s1.getTaskManagerLocation());
assertEquals(c2.getLocation(), s2.getTaskManagerLocation());
assertEquals(c3.getLocation(), s3.getTaskManagerLocation());
assertEquals(c4.getLocation(), s4.getTaskManagerLocation());
assertEquals(c5.getLocation(), s8.getTaskManagerLocation());
assertEquals(c6.getLocation(), s9.getTaskManagerLocation());
-
+
// check the scheduler's bookkeeping
assertEquals(0, scheduler.getNumberOfAvailableSlots());
-
+
// the first assignments are unconstrained, co.-scheduling is constrained
assertEquals(6, scheduler.getNumberOfLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(6, scheduler.getNumberOfUnconstrainedAssignments());
-
+
// release some slots, be sure that new available ones come up
s1.releaseSlot();
s2.releaseSlot();
@@ -135,10 +135,11 @@ public class ScheduleWithCoLocationHintTest {
s11.releaseSlot();
s12.releaseSlot();
assertTrue(scheduler.getNumberOfAvailableSlots() >= 1);
-
- SimpleSlot single = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)));
+
+ SimpleSlot single = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false).get();
assertNotNull(single);
-
+
s1.releaseSlot();
s2.releaseSlot();
s3.releaseSlot();
@@ -149,9 +150,9 @@ public class ScheduleWithCoLocationHintTest {
s9.releaseSlot();
s11.releaseSlot();
s12.releaseSlot();
-
+
assertEquals(5, scheduler.getNumberOfAvailableSlots());
-
+
assertEquals(6, scheduler.getNumberOfLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(7, scheduler.getNumberOfUnconstrainedAssignments());
@@ -161,7 +162,7 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage());
}
}
-
+
@Test
public void scheduleWithIntermediateRelease() {
try {
@@ -169,34 +170,37 @@ public class ScheduleWithCoLocationHintTest {
JobVertexID jid2 = new JobVertexID();
JobVertexID jid3 = new JobVertexID();
JobVertexID jid4 = new JobVertexID();
-
+
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-
+
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
-
+
scheduler.newInstanceAvailable(i1);
scheduler.newInstanceAvailable(i2);
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
-
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1));
-
- SimpleSlot sSolo = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 1)));
-
+
+ SimpleSlot s1 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false).get();
+
+ SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get();
+
ResourceID taskManager = s1.getTaskManagerID();
-
+
s1.releaseSlot();
s2.releaseSlot();
sSolo.releaseSlot();
-
- SimpleSlot sNew = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
+
+ SimpleSlot sNew = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
assertEquals(taskManager, sNew.getTaskManagerID());
-
+
assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
@@ -206,41 +210,41 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage());
}
}
-
+
@Test
public void scheduleWithReleaseNoResource() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
JobVertexID jid3 = new JobVertexID();
-
+
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-
+
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
-
+
scheduler.newInstanceAvailable(i1);
scheduler.newInstanceAvailable(i2);
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
SlotSharingGroup sharingGroup = new SlotSharingGroup();
CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup());
-
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1));
+
+ SimpleSlot s1 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get();
s1.releaseSlot();
-
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 1)));
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 2)));
-
-
+
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get();
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get();
+
try {
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1));
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get();
fail("Scheduled even though no resource was available.");
} catch (NoResourceAvailableException e) {
// expected
}
-
+
assertEquals(0, scheduler.getNumberOfLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(3, scheduler.getNumberOfUnconstrainedAssignments());
@@ -250,7 +254,7 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage());
}
}
-
+
@Test
public void scheduleMixedCoLocationSlotSharing() {
try {
@@ -276,27 +280,35 @@ public class ScheduleWithCoLocationHintTest {
SlotSharingGroup shareGroup = new SlotSharingGroup();
// first wave
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup));
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup));
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup));
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup));
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false);
// second wave
- SimpleSlot s21 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1));
- SimpleSlot s22 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2));
- SimpleSlot s23 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3));
- SimpleSlot s24 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4));
+ SimpleSlot s21 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false).get();
+ SimpleSlot s22 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false).get();
+ SimpleSlot s23 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false).get();
+ SimpleSlot s24 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false).get();
// third wave
- SimpleSlot s31 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2));
- SimpleSlot s32 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3));
- SimpleSlot s33 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4));
- SimpleSlot s34 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1));
-
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup));
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup));
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup));
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup));
+ SimpleSlot s31 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false).get();
+ SimpleSlot s32 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false).get();
+ SimpleSlot s33 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false).get();
+ SimpleSlot s34 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false).get();
+
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false);
+ scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false);
assertEquals(s21.getTaskManagerID(), s34.getTaskManagerID());
assertEquals(s22.getTaskManagerID(), s31.getTaskManagerID());
@@ -341,20 +353,26 @@ public class ScheduleWithCoLocationHintTest {
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
// schedule something into the shared group so that both instances are in the sharing group
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup));
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup));
+ SimpleSlot s1 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get();
// schedule one locally to instance 1
- SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1));
+ SimpleSlot s3 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false).get();
// schedule with co location constraint (yet unassigned) and a preference for
// instance 1, but it can only get instance 2
- SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
+ SimpleSlot s4 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
// schedule something into the assigned co-location constraints and check that they override the
// other preferences
- SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1));
- SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2));
+ SimpleSlot s5 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false).get();
+ SimpleSlot s6 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false).get();
// check that each slot got three
assertEquals(3, s1.getRoot().getNumberLeaves());
@@ -386,13 +404,13 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testSlotReleasedInBetween() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
-
+
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
@@ -403,36 +421,40 @@ public class ScheduleWithCoLocationHintTest {
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
SlotSharingGroup sharingGroup = new SlotSharingGroup();
-
+
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
-
+ SimpleSlot s1 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+
s1.releaseSlot();
s2.releaseSlot();
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
- SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1));
- SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
-
+ SimpleSlot s3 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get();
+ SimpleSlot s4 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get();
+
// still preserves the previous instance mapping)
assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID());
assertEquals(i2.getTaskManagerID(), s4.getTaskManagerID());
-
+
s3.releaseSlot();
s4.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
assertEquals(4, scheduler.getNumberOfLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
@@ -442,14 +464,14 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testSlotReleasedInBetweenAndNoNewLocal() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
JobVertexID jidx = new JobVertexID();
-
+
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
Instance i1 = getRandomInstance(1);
@@ -460,41 +482,46 @@ public class ScheduleWithCoLocationHintTest {
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
SlotSharingGroup sharingGroup = new SlotSharingGroup();
-
+
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
-
+ SimpleSlot s1 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+
s1.releaseSlot();
s2.releaseSlot();
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
- SimpleSlot sa = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)));
- SimpleSlot sb = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)));
-
+ SimpleSlot sa = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get();
+ SimpleSlot sb = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get();
+
try {
- scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1));
+ scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false);
fail("should not be able to find a resource");
} catch (NoResourceAvailableException e) {
// good
} catch (Exception e) {
fail("wrong exception");
}
-
+
sa.releaseSlot();
sb.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
assertEquals(2, scheduler.getNumberOfLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(2, scheduler.getNumberOfUnconstrainedAssignments());
@@ -504,15 +531,15 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testScheduleOutOfOrder() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
-
+
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-
+
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
@@ -520,11 +547,11 @@ public class ScheduleWithCoLocationHintTest {
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
SlotSharingGroup sharingGroup = new SlotSharingGroup();
-
+
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
@@ -532,33 +559,37 @@ public class ScheduleWithCoLocationHintTest {
// schedule something from the second job vertex id before the first is filled,
// and give locality preferences that hint at using the same shared slot for both
// co location constraints (which we seek to prevent)
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2));
+ SimpleSlot s1 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false).get();
+
+ SimpleSlot s3 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false).get();
+ SimpleSlot s4 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false).get();
- SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1));
- SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2));
-
// check that each slot got three
assertEquals(2, s1.getRoot().getNumberLeaves());
assertEquals(2, s2.getRoot().getNumberLeaves());
-
+
assertEquals(s1.getTaskManagerID(), s3.getTaskManagerID());
assertEquals(s2.getTaskManagerID(), s4.getTaskManagerID());
-
+
// check the scheduler's bookkeeping
assertEquals(0, scheduler.getNumberOfAvailableSlots());
-
+
assertEquals(3, scheduler.getNumberOfLocalizedAssignments());
assertEquals(1, scheduler.getNumberOfNonLocalizedAssignments());
assertEquals(0, scheduler.getNumberOfUnconstrainedAssignments());
-
+
// release some slots, be sure that new available ones come up
s1.releaseSlot();
s2.releaseSlot();
s3.releaseSlot();
s4.releaseSlot();
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
@@ -568,15 +599,15 @@ public class ScheduleWithCoLocationHintTest {
fail(e.getMessage());
}
}
-
+
@Test
public void nonColocationFollowsCoLocation() {
try {
JobVertexID jid1 = new JobVertexID();
JobVertexID jid2 = new JobVertexID();
-
+
Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-
+
Instance i1 = getRandomInstance(1);
Instance i2 = getRandomInstance(1);
@@ -585,32 +616,36 @@ public class ScheduleWithCoLocationHintTest {
scheduler.newInstanceAvailable(i2);
scheduler.newInstanceAvailable(i1);
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
SlotSharingGroup sharingGroup = new SlotSharingGroup();
-
+
CoLocationGroup ccg = new CoLocationGroup();
CoLocationConstraint cc1 = new CoLocationConstraint(ccg);
CoLocationConstraint cc2 = new CoLocationConstraint(ccg);
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1));
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2));
+ SimpleSlot s1 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get();
+
+ SimpleSlot s3 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get();
+ SimpleSlot s4 = scheduler.allocateSlot(
+ new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false).get();
- SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup));
- SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup));
-
// check that each slot got two
assertEquals(2, s1.getRoot().getNumberLeaves());
assertEquals(2, s2.getRoot().getNumberLeaves());
-
+
s1.releaseSlot();
s2.releaseSlot();
s3.releaseSlot();
s4.releaseSlot();
-
+
assertEquals(2, scheduler.getNumberOfAvailableSlots());
-
+
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots());
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1));
assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2));
http://git-wip-us.apache.org/repos/asf/flink/blob/6e40f590/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 25498c4..d78f551 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -35,7 +35,6 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.a
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getDummyTask;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -122,17 +121,17 @@ public class SchedulerIsolatedTasksTest {
assertEquals(5, scheduler.getNumberOfAvailableSlots());
// schedule something into all slots
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
- SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
- SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
- SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
// the slots should all be different
assertTrue(areAllDistinct(s1, s2, s3, s4, s5));
try {
- scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+ scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false);
fail("Scheduler accepted scheduling request without available resource.");
}
catch (NoResourceAvailableException e) {
@@ -145,8 +144,8 @@ public class SchedulerIsolatedTasksTest {
assertEquals(2, scheduler.getNumberOfAvailableSlots());
// now we can schedule some more slots
- SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
- SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+ SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
+ SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7));
@@ -245,7 +244,7 @@ public class SchedulerIsolatedTasksTest {
disposeThread.start();
for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
- SlotAllocationFuture future = scheduler.scheduleQueued(new ScheduledUnit(getDummyTask()));
+ SlotAllocationFuture future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
future.setFutureAction(action);
allAllocatedSlots.add(future);
}
@@ -287,11 +286,11 @@ public class SchedulerIsolatedTasksTest {
scheduler.newInstanceAvailable(i3);
List<SimpleSlot> slots = new ArrayList<SimpleSlot>();
- slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
- slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
- slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
- slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
- slots.add(scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask())));
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
+ slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get());
i2.markDead();
@@ -312,7 +311,7 @@ public class SchedulerIsolatedTasksTest {
// cannot get another slot, since all instances are dead
try {
- scheduler.scheduleImmediately(new ScheduledUnit(getDummyTask()));
+ scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get();
fail("Scheduler served a slot from a dead instance");
}
catch (NoResourceAvailableException e) {
@@ -347,7 +346,7 @@ public class SchedulerIsolatedTasksTest {
scheduler.newInstanceAvailable(i3);
// schedule something on an arbitrary instance
- SimpleSlot s1 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(new Instance[0])));
+ SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false).get();
// figure out how we use the location hints
Instance first = (Instance) s1.getOwner();
@@ -355,28 +354,28 @@ public class SchedulerIsolatedTasksTest {
Instance third = first == i3 ? i2 : i3;
// something that needs to go to the first instance again
- SimpleSlot s2 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())));
+ SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get();
assertEquals(first, s2.getOwner());
// first or second --> second, because first is full
- SimpleSlot s3 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, second)));
+ SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false).get();
assertEquals(second, s3.getOwner());
// first or third --> third (because first is full)
- SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third)));
- SimpleSlot s5 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third)));
+ SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
+ SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
assertEquals(third, s4.getOwner());
assertEquals(third, s5.getOwner());
// first or third --> second, because all others are full
- SimpleSlot s6 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third)));
+ SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
assertEquals(second, s6.getOwner());
// release something on the first and second instance
s2.releaseSlot();
s6.releaseSlot();
- SimpleSlot s7 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertex(first, third)));
+ SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get();
assertEquals(first, s7.getOwner());
assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments());