You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/31 11:06:22 UTC
[flink] branch master updated: [FLINK-10569][tests] Replace various
Scheduler usages
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 568d90c [FLINK-10569][tests] Replace various Scheduler usages
568d90c is described below
commit 568d90cf0b718fb659a5fd09319df9b212a2ec2d
Author: ZILI CHEN <wa...@gmail.com>
AuthorDate: Thu Jan 31 19:06:15 2019 +0800
[FLINK-10569][tests] Replace various Scheduler usages
---
.../ExecutionGraphConstructionTest.java | 20 +-
.../ExecutionGraphDeploymentTest.java | 34 +-
.../executiongraph/ExecutionGraphMetricsTest.java | 16 +-
.../ExecutionGraphRescalingTest.java | 10 +-
.../executiongraph/ExecutionGraphTestUtils.java | 5 +-
.../executiongraph/ExecutionVertexCancelTest.java | 17 +-
.../ExecutionVertexSchedulingTest.java | 34 +-
.../executiongraph/PointwisePatternTest.java | 96 +-
.../RestartPipelinedRegionStrategyTest.java | 14 +-
.../executiongraph/VertexSlotSharingTest.java | 4 +-
.../scheduler/SchedulerIsolatedTasksTest.java | 2 +-
.../scheduler/SchedulerSlotSharingTest.java | 1052 --------------------
.../jobmanager/scheduler/SchedulerTest.java | 135 ---
.../jobmanager/scheduler/SchedulerTestBase.java | 2 +-
.../partitioner/RescalePartitionerTest.java | 5 +-
15 files changed, 95 insertions(+), 1351 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 00d1ac0..c3604f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
@@ -44,6 +43,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
@@ -119,7 +119,7 @@ public class ExecutionGraphConstructionTest {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
try {
eg.attachJobGraph(ordered);
}
@@ -167,10 +167,10 @@ public class ExecutionGraphConstructionTest {
jobId,
jobName,
cfg,
- new SerializedValue<>(new ExecutionConfig()),
+ new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
try {
eg.attachJobGraph(ordered);
}
@@ -246,7 +246,7 @@ public class ExecutionGraphConstructionTest {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
try {
eg.attachJobGraph(ordered);
}
@@ -317,7 +317,7 @@ public class ExecutionGraphConstructionTest {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
try {
eg.attachJobGraph(ordered);
}
@@ -383,7 +383,7 @@ public class ExecutionGraphConstructionTest {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
try {
eg.attachJobGraph(ordered);
fail("Attached wrong jobgraph");
@@ -453,7 +453,7 @@ public class ExecutionGraphConstructionTest {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
try {
eg.attachJobGraph(ordered);
}
@@ -501,7 +501,7 @@ public class ExecutionGraphConstructionTest {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
try {
eg.attachJobGraph(ordered);
@@ -584,7 +584,7 @@ public class ExecutionGraphConstructionTest {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12b4277..3f3142c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -52,9 +52,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -67,6 +68,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.slf4j.LoggerFactory;
+import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -164,7 +166,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
- new Scheduler(TestingUtils.defaultExecutionContext()),
+ new TestingSlotProvider(ignore -> new CompletableFuture<>()),
ExecutionGraph.class.getClassLoader(),
blobWriter,
AkkaUtils.getDefaultTimeout());
@@ -426,15 +428,13 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
+ final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
for (int i = 0; i < dop1; i++) {
- scheduler.newInstanceAvailable(
- ExecutionGraphTestUtils.getInstance(
- new ActorTaskManagerGateway(
- new ExecutionGraphTestUtils.SimpleActorGateway(
- TestingUtils.directExecutionContext()))));
+ slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlot()));
}
+ final SlotProvider slotProvider = new TestingSlotProvider(ignore -> slotFutures.removeFirst());
+
final JobInformation jobInformation = new DummyJobInformation(
jobId,
"failing test job");
@@ -447,7 +447,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
- scheduler,
+ slotProvider,
ExecutionGraph.class.getClassLoader(),
blobWriter,
AkkaUtils.getDefaultTimeout());
@@ -459,8 +459,6 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
List<JobVertex> ordered = Arrays.asList(v1, v2);
eg.attachJobGraph(ordered);
- assertEquals(dop1, scheduler.getNumberOfAvailableSlots());
-
// schedule, this triggers mock deployment
eg.scheduleForExecution();
@@ -508,15 +506,13 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
v1.setInvokableClass(BatchTask.class);
v2.setInvokableClass(BatchTask.class);
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+ final ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
for (int i = 0; i < dop1 + dop2; i++) {
- scheduler.newInstanceAvailable(
- ExecutionGraphTestUtils.getInstance(
- new ActorTaskManagerGateway(
- new ExecutionGraphTestUtils.SimpleActorGateway(
- TestingUtils.directExecutionContext()))));
+ slotFutures.addLast(CompletableFuture.completedFuture(new TestingLogicalSlot()));
}
+ final SlotProvider slotProvider = new TestingSlotProvider(ignore -> slotFutures.removeFirst());
+
final JobInformation jobInformation = new DummyJobInformation(
jobId,
"some job");
@@ -529,7 +525,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
- scheduler,
+ slotProvider,
ExecutionGraph.class.getClassLoader(),
blobWriter,
AkkaUtils.getDefaultTimeout());
@@ -540,8 +536,6 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
List<JobVertex> ordered = Arrays.asList(v1, v2);
eg.attachJobGraph(ordered);
- assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
-
// schedule, this triggers mock deployment
eg.scheduleForExecution();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index ec0d2e3..9a410fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -31,10 +30,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -44,6 +40,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -52,10 +49,6 @@ import java.util.concurrent.ScheduledExecutorService;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ExecutionGraphMetricsTest extends TestLogger {
@@ -76,11 +69,12 @@ public class ExecutionGraphMetricsTest extends TestLogger {
Configuration jobConfig = new Configuration();
Time timeout = Time.seconds(10L);
- Scheduler scheduler = mock(Scheduler.class);
CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot());
CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot());
- when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(slotFuture1, slotFuture2);
+ ArrayDeque<CompletableFuture<LogicalSlot>> slotFutures = new ArrayDeque<>();
+ slotFutures.addLast(slotFuture1);
+ slotFutures.addLast(slotFuture2);
TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
@@ -93,7 +87,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
new SerializedValue<>(null),
timeout,
testingRestartStrategy,
- scheduler);
+ new TestingSlotProvider(ignore -> slotFutures.removeFirst()));
RestartTimeGauge restartingTime = new RestartTimeGauge(executionGraph);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
index 69764e7..4b22f69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
@@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -69,7 +69,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
config,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new Scheduler(TestingUtils.defaultExecutionContext()),
+ new TestingSlotProvider(ignore -> new CompletableFuture<>()),
Thread.currentThread().getContextClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
AkkaUtils.getDefaultTimeout(),
@@ -98,7 +98,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
config,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new Scheduler(TestingUtils.defaultExecutionContext()),
+ new TestingSlotProvider(ignore -> new CompletableFuture<>()),
Thread.currentThread().getContextClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
AkkaUtils.getDefaultTimeout(),
@@ -127,7 +127,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
config,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new Scheduler(TestingUtils.defaultExecutionContext()),
+ new TestingSlotProvider(ignore -> new CompletableFuture<>()),
Thread.currentThread().getContextClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
AkkaUtils.getDefaultTimeout(),
@@ -169,7 +169,7 @@ public class ExecutionGraphRescalingTest extends TestLogger {
config,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new Scheduler(TestingUtils.defaultExecutionContext()),
+ new TestingSlotProvider(ignore -> new CompletableFuture<>()),
Thread.currentThread().getContextClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
AkkaUtils.getDefaultTimeout(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 0d603fc..e8bd30d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -48,7 +48,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
@@ -71,12 +70,12 @@ import java.lang.reflect.Field;
import java.net.InetAddress;
import java.time.Duration;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -583,7 +582,7 @@ public class ExecutionGraphTestUtils {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
return spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout()));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 4189476..d64f15f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
@@ -41,6 +40,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
import scala.concurrent.ExecutionContext;
@@ -53,7 +53,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
@SuppressWarnings("serial")
public class ExecutionVertexCancelTest extends TestLogger {
@@ -454,8 +453,11 @@ public class ExecutionVertexCancelTest extends TestLogger {
// scheduling after being canceled should be tolerated (no exception) because
// it can occur as the result of races
{
- Scheduler scheduler = mock(Scheduler.class);
- vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
+ vertex.scheduleForExecution(
+ new TestingSlotProvider(ignore -> new CompletableFuture<>()),
+ false,
+ LocationPreferenceConstraint.ALL,
+ Collections.emptySet());
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
}
@@ -493,8 +495,11 @@ public class ExecutionVertexCancelTest extends TestLogger {
AkkaUtils.getDefaultTimeout());
setVertexState(vertex, ExecutionState.CANCELING);
- Scheduler scheduler = mock(Scheduler.class);
- vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
+ vertex.scheduleForExecution(
+ new TestingSlotProvider(ignore -> new CompletableFuture<>()),
+ false,
+ LocationPreferenceConstraint.ALL,
+ Collections.emptySet());
}
catch (Exception e) {
fail("should not throw an exception");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 6bfcb7f..0bc53d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -18,20 +18,15 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
@@ -44,10 +39,6 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ExecutionVertexSchedulingTest {
@@ -65,14 +56,16 @@ public class ExecutionVertexSchedulingTest {
slot.releaseSlot();
assertTrue(slot.isReleased());
- Scheduler scheduler = mock(Scheduler.class);
CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
future.complete(slot);
- when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
- vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
+ vertex.scheduleForExecution(
+ new TestingSlotProvider(ignore -> future),
+ false,
+ LocationPreferenceConstraint.ALL,
+ Collections.emptySet());
// will have failed
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
@@ -99,12 +92,13 @@ public class ExecutionVertexSchedulingTest {
final CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
- Scheduler scheduler = mock(Scheduler.class);
- when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future);
-
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
- vertex.scheduleForExecution(scheduler, true, LocationPreferenceConstraint.ALL, Collections.emptySet());
+ vertex.scheduleForExecution(
+ new TestingSlotProvider(ignore -> future),
+ true,
+ LocationPreferenceConstraint.ALL,
+ Collections.emptySet());
// future has not yet a slot
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -131,15 +125,17 @@ public class ExecutionVertexSchedulingTest {
new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot();
- Scheduler scheduler = mock(Scheduler.class);
CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
future.complete(slot);
- when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
// try to deploy to the slot
- vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
+ vertex.scheduleForExecution(
+ new TestingSlotProvider(ignore -> future),
+ false,
+ LocationPreferenceConstraint.ALL,
+ Collections.emptySet());
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
}
catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 8ff0032..0344326 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
@@ -36,6 +35,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
@@ -45,10 +45,6 @@ import org.apache.flink.api.common.JobID;
public class PointwisePatternTest {
-
- private final JobID jobId = new JobID();
- private final String jobName = "Test Job Sample Name";
- private final Configuration cfg = new Configuration();
@Test
public void testNToN() throws Exception {
@@ -67,16 +63,7 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ ExecutionGraph eg = getDummyExecutionGraph();
try {
eg.attachJobGraph(ordered);
}
@@ -114,16 +101,7 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ ExecutionGraph eg = getDummyExecutionGraph();
try {
eg.attachJobGraph(ordered);
}
@@ -162,16 +140,7 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ ExecutionGraph eg = getDummyExecutionGraph();
try {
eg.attachJobGraph(ordered);
}
@@ -211,16 +180,7 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ ExecutionGraph eg = getDummyExecutionGraph();
try {
eg.attachJobGraph(ordered);
}
@@ -258,16 +218,7 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ ExecutionGraph eg = getDummyExecutionGraph();
try {
eg.attachJobGraph(ordered);
}
@@ -303,6 +254,19 @@ public class PointwisePatternTest {
testHighToLow(20, 15);
testHighToLow(31, 11);
}
+
+ private ExecutionGraph getDummyExecutionGraph() throws Exception {
+ return new ExecutionGraph(
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ new JobID(),
+ "Test Job Sample Name",
+ new Configuration(),
+ new SerializedValue<>(new ExecutionConfig()),
+ AkkaUtils.getDefaultTimeout(),
+ new NoRestartStrategy(),
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
+ }
private void testLowToHigh(int lowDop, int highDop) throws Exception {
if (highDop < lowDop) {
@@ -325,16 +289,7 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ ExecutionGraph eg = getDummyExecutionGraph();
try {
eg.attachJobGraph(ordered);
}
@@ -383,16 +338,7 @@ public class PointwisePatternTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- jobId,
- jobName,
- cfg,
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ ExecutionGraph eg = getDummyExecutionGraph();
try {
eg.attachJobGraph(ordered);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index 9974574..7b00e4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Test;
@@ -37,6 +36,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -89,7 +89,6 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
@@ -101,7 +100,7 @@ public class RestartPipelinedRegionStrategyTest {
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartPipelinedRegionStrategy.Factory(),
- scheduler,
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()),
ExecutionGraph.class.getClassLoader(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout());
@@ -174,7 +173,6 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
@@ -186,7 +184,7 @@ public class RestartPipelinedRegionStrategyTest {
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartPipelinedRegionStrategy.Factory(),
- scheduler,
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()),
ExecutionGraph.class.getClassLoader(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout());
@@ -264,7 +262,6 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
@@ -276,7 +273,7 @@ public class RestartPipelinedRegionStrategyTest {
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartPipelinedRegionStrategy.Factory(),
- scheduler,
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()),
ExecutionGraph.class.getClassLoader(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout());
@@ -345,7 +342,6 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
final JobInformation jobInformation = new DummyJobInformation(
jobId,
jobName);
@@ -357,7 +353,7 @@ public class RestartPipelinedRegionStrategyTest {
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartPipelinedRegionStrategy.Factory(),
- scheduler,
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()),
ExecutionGraph.class.getClassLoader(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 90e3368..a12710c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
@@ -90,7 +90,7 @@ public class VertexSlotSharingTest {
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
- new Scheduler(TestingUtils.defaultExecutionContext()));
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()));
eg.attachJobGraph(vertices);
// verify that the vertices are all in the same slot sharing group
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 03da3b1..4fbb7f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -49,7 +49,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
- * Tests for the {@link Scheduler} when scheduling individual tasks.
+ * Tests for scheduling individual tasks.
*/
public class SchedulerIsolatedTasksTest extends SchedulerTestBase {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
deleted file mode 100644
index 20e221d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ /dev/null
@@ -1,1052 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.areAllDistinct;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertex;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getTestVertexWithLocation;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.sleepUninterruptibly;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the scheduler when scheduling tasks in slot sharing groups.
- */
-public class SchedulerSlotSharingTest extends SchedulerTestBase {
-
- @Test
- public void scheduleSingleVertexType() {
- try {
- JobVertexID jid1 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1);
-
- final ResourceID tm1ResourceId = testingSlotProvider.addTaskManager(2).getResourceID();
- testingSlotProvider.addTaskManager(2);
-
- // schedule 4 tasks from the first vertex group
- LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s1);
- assertNotNull(s2);
- assertNotNull(s3);
- assertNotNull(s4);
-
- assertTrue(areAllDistinct(s1, s2, s3, s4));
-
- // we cannot schedule another task from the first vertex group
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // release something
- s3.releaseSlot();
-
- // allocate another slot from that group
- LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(s5);
-
- // release all old slots
- s1.releaseSlot();
- s2.releaseSlot();
- s4.releaseSlot();
-
- LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s6);
- assertNotNull(s7);
- assertNotNull(s8);
-
- // make sure we have two slots on the first instance, and two on the second
- int c = 0;
- c += (s5.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
- c += (s6.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
- c += (s7.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
- c += (s8.getTaskManagerLocation().getResourceID().equals(tm1ResourceId)) ? 1 : -1;
- assertEquals(0, c);
-
- // release all
- s5.releaseSlot();
- s6.releaseSlot();
- s7.releaseSlot();
- s8.releaseSlot();
-
- // test that everything is released
- assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-
- // check the scheduler's bookkeeping
- assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
- assertEquals(8, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void allocateSlotWithSharing() throws Exception {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- testingSlotProvider.addTaskManager(2);
- testingSlotProvider.addTaskManager(2);
-
- // schedule 4 tasks from the first vertex group
- LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s1);
- assertNotNull(s2);
- assertNotNull(s3);
- assertNotNull(s4);
-
- assertTrue(areAllDistinct(s1, s2, s3, s4));
-
- // we cannot schedule another task from the first vertex group
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // schedule some tasks from the second ID group
- LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s1_2);
- assertNotNull(s2_2);
- assertNotNull(s3_2);
- assertNotNull(s4_2);
-
- // we cannot schedule another task from the second vertex group
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // now, we release some vertices (sub-slots) from the first group.
- // that should allow us to schedule more vertices from the first group
- s1.releaseSlot();
- s4.releaseSlot();
-
- assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(2, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
- // we can still not schedule anything from the second group of vertices
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // we can schedule something from the first vertex group
- LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(s5);
-
- assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
-
- // now we release a slot from the second vertex group and schedule another task from that group
- s2_2.releaseSlot();
- LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(s5_2);
-
- // release all slots
- s2.releaseSlot();
- s3.releaseSlot();
- s5.releaseSlot();
-
- s1_2.releaseSlot();
- s3_2.releaseSlot();
- s4_2.releaseSlot();
- s5_2.releaseSlot();
-
- // test that everything is released
- assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-
- // check the scheduler's bookkeeping
- assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
- assertEquals(10, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
-
- @Test
- public void allocateSlotWithIntermediateTotallyEmptySharingGroup() {
- try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- testingSlotProvider.addTaskManager(2);
- testingSlotProvider.addTaskManager(2);
-
- // schedule 4 tasks from the first vertex group
- LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
- assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
- s1.releaseSlot();
- s2.releaseSlot();
- s3.releaseSlot();
- s4.releaseSlot();
-
- assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
- // schedule some tasks from the second ID group
- LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(4, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
- s1_2.releaseSlot();
- s2_2.releaseSlot();
- s3_2.releaseSlot();
- s4_2.releaseSlot();
-
- assertEquals(0, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
- // test that everything is released
- assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-
- // check the scheduler's bookkeeping
- assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
- assertEquals(8, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void allocateSlotWithTemporarilyEmptyVertexGroup() {
- try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
- JobVertexID jid3 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3);
-
- testingSlotProvider.addTaskManager(2);
- testingSlotProvider.addTaskManager(2);
-
- // schedule 4 tasks from the first vertex group
- LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s1_1);
- assertNotNull(s2_1);
- assertNotNull(s3_1);
- assertNotNull(s4_1);
-
- assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1));
-
- // schedule 4 tasks from the second vertex group
- LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s1_2);
- assertNotNull(s2_2);
- assertNotNull(s3_2);
- assertNotNull(s4_2);
-
- assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2));
-
- // schedule 4 tasks from the third vertex group
- LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s1_3);
- assertNotNull(s2_3);
- assertNotNull(s3_3);
- assertNotNull(s4_3);
-
- assertTrue(areAllDistinct(s1_3, s2_3, s3_3, s4_3));
-
-
- // we cannot schedule another task from the second vertex group
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // release the second vertex group
- s1_2.releaseSlot();
- s2_2.releaseSlot();
- s3_2.releaseSlot();
- s4_2.releaseSlot();
-
- LogicalSlot s5_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s6_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s7_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s5_2);
- assertNotNull(s6_2);
- assertNotNull(s7_2);
-
- // release the slots
- s1_1.releaseSlot();
- s2_1.releaseSlot();
- s3_1.releaseSlot();
- s4_1.releaseSlot();
-
- s5_2.releaseSlot();
- s6_2.releaseSlot();
- s7_2.releaseSlot();
-
- // test that everything is released
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlots());
-
- s1_3.releaseSlot();
- s2_3.releaseSlot();
- s3_3.releaseSlot();
- s4_3.releaseSlot();
-
- // test that everything is released
- assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
-
- // check the scheduler's bookkeeping
- assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
- assertEquals(15, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void allocateSlotWithTemporarilyEmptyVertexGroup2() {
- try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
- JobVertexID jid3 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- testingSlotProvider.addTaskManager(2);
-
- // schedule 1 tasks from the first vertex group and 2 from the second
- LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s1_1);
- assertNotNull(s2_1);
- assertNotNull(s2_2);
-
- assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
- // release the two from the second
- s2_1.releaseSlot();
- s2_2.releaseSlot();
-
-
- // this should free one slot so we can allocate one non-shared
- LogicalSlot sx = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(sx);
-
- assertEquals(1, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(0, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid1));
- assertEquals(1, testingSlotProvider.getNumberOfAvailableSlotsForGroup(sharingGroup, jid2));
-
- // check the scheduler's bookkeeping
- assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
- assertEquals(4, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void scheduleMixedSharingAndNonSharing() {
- try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
-
- JobVertexID jidA = new JobVertexID();
- JobVertexID jidB= new JobVertexID();
- JobVertexID jidC = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- testingSlotProvider.addTaskManager(3);
- testingSlotProvider.addTaskManager(2);
-
- // schedule some individual vertices
- LogicalSlot sA2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot sA1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(sA1);
- assertNotNull(sA2);
-
- // schedule some vertices in the sharing group
- LogicalSlot s1_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(s1_0);
- assertNotNull(s1_1);
- assertNotNull(s2_0);
- assertNotNull(s2_1);
-
- // schedule another isolated vertex
- LogicalSlot sB1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(sB1);
-
- // should not be able to schedule more vertices
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- try {
- testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("Scheduler accepted too many tasks at the same time");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
- catch (Exception e) {
- fail("Wrong exception.");
- }
-
- // release some isolated task and check that the sharing group may grow
- sA1.releaseSlot();
-
- LogicalSlot s1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(s1_2);
- assertNotNull(s2_2);
-
- // release three of the previously allocated sub slots, which guarantees to return one shared slot
- s1_0.releaseSlot();
- s1_1.releaseSlot();
- s2_0.releaseSlot();
-
- assertEquals(1, testingSlotProvider.getNumberOfAvailableSlots());
-
- // schedule one more no-shared task
- LogicalSlot sB0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(sB0);
-
- // release the last of the original shared slots and allocate one more non-shared slot
- s2_1.releaseSlot();
- LogicalSlot sB2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(sB2);
-
-
- // release on non-shared and add some shared slots
- sA2.releaseSlot();
- LogicalSlot s1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(s1_3);
- assertNotNull(s2_3);
-
- // release all shared and allocate all in non-shared
- s1_2.releaseSlot();
- s2_2.releaseSlot();
- s1_3.releaseSlot();
- s2_3.releaseSlot();
-
- LogicalSlot sC0 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot sC1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- assertNotNull(sC0);
- assertNotNull(sC1);
-
-
- sB0.releaseSlot();
- sB1.releaseSlot();
- sB2.releaseSlot();
- sC0.releaseSlot();
- sC1.releaseSlot();
-
- // test that everything is released
- assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots());
-
- // check the scheduler's bookkeeping
- assertEquals(0, testingSlotProvider.getNumberOfLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
- assertEquals(15, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests that the scheduler assigns the correct existing shared slots
- */
- @Test
- public void testLocalizedAssignment1() {
- try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
- TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
-
- // schedule one to each instance
- LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
- LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
- assertNotNull(s1);
- assertNotNull(s2);
-
- assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(loc1, s1.getTaskManagerLocation());
- assertEquals(loc2, s2.getTaskManagerLocation());
-
- // schedule one from the other group to each instance
- LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
- LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
- assertNotNull(s3);
- assertNotNull(s4);
-
- assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(loc1, s3.getTaskManagerLocation());
- assertEquals(loc2, s4.getTaskManagerLocation());
- assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots());
-
- // check the scheduler's bookkeeping
- assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests that the scheduler assigns to new local slots, rather than to existing non-local slots
- */
- @Test
- public void testLocalizedAssignment2() {
- try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
- TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
-
- // schedule one to each instance
- LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
- LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
- assertNotNull(s1);
- assertNotNull(s2);
-
- assertEquals(2, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(loc1, s1.getTaskManagerLocation());
- assertEquals(loc1, s2.getTaskManagerLocation());
-
- // schedule one from the other group to each instance
- LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
- LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get();
- assertNotNull(s3);
- assertNotNull(s4);
-
- assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
- assertEquals(loc2, s3.getTaskManagerLocation());
- assertEquals(loc2, s4.getTaskManagerLocation());
-
- // check the scheduler's bookkeeping
- assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfNonLocalizedAssignments());
- assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests that the scheduler can fall back to non-local
- */
- @Test
- public void testLocalizedAssignment3() {
- try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2);
-
- TaskManagerLocation loc1 = testingSlotProvider.addTaskManager(2);
- TaskManagerLocation loc2 = testingSlotProvider.addTaskManager(2);
-
- // schedule until the one instance is full
- LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
- LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
- LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
- LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-
- // schedule two more with preference of same instance --> need to go to other instance
- LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
- LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get();
-
- assertNotNull(s1);
- assertNotNull(s2);
- assertNotNull(s3);
- assertNotNull(s4);
- assertNotNull(s5);
- assertNotNull(s6);
-
- assertEquals(4, testingSlotProvider.getNumberOfSlots(sharingGroup));
-
- assertEquals(loc1, s1.getTaskManagerLocation());
- assertEquals(loc1, s2.getTaskManagerLocation());
- assertEquals(loc1, s3.getTaskManagerLocation());
- assertEquals(loc1, s4.getTaskManagerLocation());
- assertEquals(loc2, s5.getTaskManagerLocation());
- assertEquals(loc2, s6.getTaskManagerLocation());
-
- // check the scheduler's bookkeeping
- assertEquals(4, testingSlotProvider.getNumberOfLocalizedAssignments());
- // Flink supports host localized assignments which happen in this case because all TaskManagerLocations point to the loopback address
- assertTrue(2 == testingSlotProvider.getNumberOfNonLocalizedAssignments() || 2 == testingSlotProvider.getNumberOfHostLocalizedAssignments());
-
- assertEquals(0, testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSequentialAllocateAndRelease() {
- try {
- final JobVertexID jid1 = new JobVertexID();
- final JobVertexID jid2 = new JobVertexID();
- final JobVertexID jid3 = new JobVertexID();
- final JobVertexID jid4 = new JobVertexID();
-
- final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
-
- testingSlotProvider.addTaskManager(4);
-
- // allocate something from group 1 and 2 interleaved with schedule for group 3
- LogicalSlot slot_1_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot slot_1_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- LogicalSlot slot_2_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot slot_2_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- LogicalSlot slot_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- LogicalSlot slot_1_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot slot_1_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- LogicalSlot slot_2_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot slot_2_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- // release groups 1 and 2
-
- slot_1_1.releaseSlot();
- slot_1_2.releaseSlot();
- slot_1_3.releaseSlot();
- slot_1_4.releaseSlot();
-
- slot_2_1.releaseSlot();
- slot_2_2.releaseSlot();
- slot_2_3.releaseSlot();
- slot_2_4.releaseSlot();
-
- // allocate group 4
-
- LogicalSlot slot_4_1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot slot_4_2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot slot_4_3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot slot_4_4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- // release groups 3 and 4
-
- slot_3.releaseSlot();
-
- slot_4_1.releaseSlot();
- slot_4_2.releaseSlot();
- slot_4_3.releaseSlot();
- slot_4_4.releaseSlot();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testConcurrentAllocateAndRelease() {
- final ExecutorService executor = Executors.newFixedThreadPool(20);
-
- try {
- testingSlotProvider.addTaskManager(4);
-
- for (int run = 0; run < 50; run++) {
- final JobVertexID jid1 = new JobVertexID();
- final JobVertexID jid2 = new JobVertexID();
- final JobVertexID jid3 = new JobVertexID();
- final JobVertexID jid4 = new JobVertexID();
-
- final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
-
- final AtomicInteger enumerator1 = new AtomicInteger();
- final AtomicInteger enumerator2 = new AtomicInteger();
- final AtomicBoolean flag3 = new AtomicBoolean();
- final AtomicInteger enumerator4 = new AtomicInteger();
-
- final Random rnd = new Random();
-
- // use atomic boolean as a mutable boolean reference
- final AtomicBoolean failed = new AtomicBoolean(false);
-
- // use atomic integer as a mutable integer reference
- final AtomicInteger completed = new AtomicInteger();
-
- final Runnable deploy4 = new Runnable() {
- @Override
- public void run() {
- try {
- LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- sleepUninterruptibly(rnd.nextInt(5));
- slot.releaseSlot();
-
- if (completed.incrementAndGet() == 13) {
- synchronized (completed) {
- completed.notifyAll();
- }
- }
- }
- catch (Throwable t) {
- t.printStackTrace();
- failed.set(true);
- }
- }
- };
-
- final Runnable deploy3 = new Runnable() {
- @Override
- public void run() {
- try {
- if (flag3.compareAndSet(false, true)) {
- LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- sleepUninterruptibly(5);
-
- executor.execute(deploy4);
- executor.execute(deploy4);
- executor.execute(deploy4);
- executor.execute(deploy4);
-
- slot.releaseSlot();
-
- if (completed.incrementAndGet() == 13) {
- synchronized (completed) {
- completed.notifyAll();
- }
- }
- }
- }
- catch (Throwable t) {
- t.printStackTrace();
- failed.set(true);
- }
- }
- };
-
- final Runnable deploy2 = new Runnable() {
- @Override
- public void run() {
- try {
- LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- // wait a bit till scheduling the successor
- sleepUninterruptibly(rnd.nextInt(5));
- executor.execute(deploy3);
-
- // wait a bit until release
- sleepUninterruptibly(rnd.nextInt(5));
- slot.releaseSlot();
-
- if (completed.incrementAndGet() == 13) {
- synchronized (completed) {
- completed.notifyAll();
- }
- }
- }
- catch (Throwable t) {
- t.printStackTrace();
- failed.set(true);
- }
- }
- };
-
- final Runnable deploy1 = new Runnable() {
- @Override
- public void run() {
- try {
- LogicalSlot slot = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- // wait a bit till scheduling the successor
- sleepUninterruptibly(rnd.nextInt(5));
- executor.execute(deploy2);
-
- // wait a bit until release
- sleepUninterruptibly(rnd.nextInt(5));
- slot.releaseSlot();
-
- if (completed.incrementAndGet() == 13) {
- synchronized (completed) {
- completed.notifyAll();
- }
- }
- }
- catch (Throwable t) {
- t.printStackTrace();
- failed.set(true);
- }
- }
- };
-
- final Runnable deploy0 = new Runnable() {
- @Override
- public void run() {
- sleepUninterruptibly(rnd.nextInt(10));
- executor.execute(deploy1);
- }
- };
- executor.execute(deploy0);
- executor.execute(deploy0);
- executor.execute(deploy0);
- executor.execute(deploy0);
-
- // wait until all tasks have finished
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (completed) {
- while (!failed.get() && completed.get() < 13) {
- completed.wait(1000);
- }
- }
-
- assertFalse("Thread failed", failed.get());
-
- while (testingSlotProvider.getNumberOfAvailableSlots() < 4) {
- sleepUninterruptibly(5);
- }
-
- assertEquals(4, testingSlotProvider.getNumberOfAvailableSlots());
- assertEquals(13 * (run + 1), testingSlotProvider.getNumberOfUnconstrainedAssignments());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testDopIncreases() {
- try {
- JobVertexID jid1 = new JobVertexID();
- JobVertexID jid2 = new JobVertexID();
- JobVertexID jid3 = new JobVertexID();
- JobVertexID jid4 = new JobVertexID();
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
-
- Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
- scheduler.newInstanceAvailable(getRandomInstance(4));
-
- // schedule one task for the first and second vertex
- LogicalSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- assertEquals( s1.getTaskManagerLocation(), s2.getTaskManagerLocation() );
- assertEquals(3, scheduler.getNumberOfAvailableSlots());
-
- LogicalSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- s1.releaseSlot();
- s2.releaseSlot();
-
- LogicalSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- LogicalSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
-
- try {
- scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5, sharingGroup), sharingGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get();
- fail("should throw an exception");
- }
- catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof NoResourceAvailableException);
- }
-
- assertEquals(0, scheduler.getNumberOfAvailableSlots());
-
- s3_0.releaseSlot();
- s3_1.releaseSlot();
- s3_2.releaseSlot();
- s3_3.releaseSlot();
- s4_0.releaseSlot();
- s4_1.releaseSlot();
- s4_2.releaseSlot();
- s4_3.releaseSlot();
-
- assertEquals(4, scheduler.getNumberOfAvailableSlots());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private static SlotProfile slotProfileForLocation(TaskManagerLocation location) {
- return new SlotProfile(ResourceProfile.UNKNOWN, Collections.singletonList(location), Collections.emptyList());
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
deleted file mode 100644
index d9919ac..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.hamcrest.Matchers;
-import org.junit.Test;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-public class SchedulerTest extends TestLogger {
-
- @Test
- public void testAddAndRemoveInstance() {
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
- Instance i1 = getRandomInstance(2);
- Instance i2 = getRandomInstance(2);
- Instance i3 = getRandomInstance(2);
-
- assertEquals(0, scheduler.getNumberOfAvailableInstances());
- assertEquals(0, scheduler.getNumberOfAvailableSlots());
- scheduler.newInstanceAvailable(i1);
- assertEquals(1, scheduler.getNumberOfAvailableInstances());
- assertEquals(2, scheduler.getNumberOfAvailableSlots());
- scheduler.newInstanceAvailable(i2);
- assertEquals(2, scheduler.getNumberOfAvailableInstances());
- assertEquals(4, scheduler.getNumberOfAvailableSlots());
- scheduler.newInstanceAvailable(i3);
- assertEquals(3, scheduler.getNumberOfAvailableInstances());
- assertEquals(6, scheduler.getNumberOfAvailableSlots());
-
- // cannot add available instance again
- try {
- scheduler.newInstanceAvailable(i2);
- fail("Scheduler accepted instance twice");
- }
- catch (IllegalArgumentException e) {
- // bueno!
- }
-
- // some instances die
- assertEquals(3, scheduler.getNumberOfAvailableInstances());
- assertEquals(6, scheduler.getNumberOfAvailableSlots());
- scheduler.instanceDied(i2);
- assertEquals(2, scheduler.getNumberOfAvailableInstances());
- assertEquals(4, scheduler.getNumberOfAvailableSlots());
-
- // try to add a dead instance
- try {
- scheduler.newInstanceAvailable(i2);
- fail("Scheduler accepted dead instance");
- }
- catch (IllegalArgumentException e) {
- // stimmt
-
- }
-
- scheduler.instanceDied(i1);
- assertEquals(1, scheduler.getNumberOfAvailableInstances());
- assertEquals(2, scheduler.getNumberOfAvailableSlots());
- scheduler.instanceDied(i3);
- assertEquals(0, scheduler.getNumberOfAvailableInstances());
- assertEquals(0, scheduler.getNumberOfAvailableSlots());
-
- assertFalse(i1.isAlive());
- assertFalse(i2.isAlive());
- assertFalse(i3.isAlive());
- }
-
- /**
- * Tests that the Scheduler times out uncompleted slot futures.
- */
- @Test
- public void testSlotAllocationTimeout() throws Exception {
- final Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
-
- final ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph();
-
- final Map<ExecutionAttemptID, Execution> registeredExecutions = executionGraph.getRegisteredExecutions();
-
- assertThat(registeredExecutions.values(), Matchers.not(Matchers.empty()));
-
- final Execution execution = registeredExecutions.values().iterator().next();
-
- final CompletableFuture<LogicalSlot> slotFuture = scheduler.allocateSlot(
- new ScheduledUnit(
- execution),
- true,
- SlotProfile.noRequirements(),
- Time.milliseconds(1L));
-
- try {
- slotFuture.get();
- } catch (ExecutionException ee) {
- assertThat(ExceptionUtils.stripExecutionException(ee), Matchers.instanceOf(TimeoutException.class));
- }
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 683b0cd..f62f89f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -61,7 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Test base for scheduler related test cases. The test are
- * executed with the {@link Scheduler} and the {@link SlotPool}.
+ * executed with the {@link SlotPool}.
*/
public class SchedulerTestBase extends TestLogger {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 8ff47c7..8fc04db 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -31,11 +31,11 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,6 +49,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -139,7 +140,7 @@ public class RescalePartitionerTest extends StreamPartitionerTest {
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
new RestartAllStrategy.Factory(),
- new Scheduler(TestingUtils.defaultExecutionContext()),
+ new TestingSlotProvider(ignored -> new CompletableFuture<>()),
ExecutionGraph.class.getClassLoader(),
VoidBlobWriter.getInstance(),
AkkaUtils.getDefaultTimeout());