You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/23 09:36:05 UTC
[flink] branch master updated: [FLINK-12181][runtime] Port
ExecutionGraphRestartTest to new codebase
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 21b92ac [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase
21b92ac is described below
commit 21b92ac95f1824e9b1ca483fa3ffaaf77ef14d4a
Author: azagrebin <az...@users.noreply.github.com>
AuthorDate: Thu May 23 11:35:48 2019 +0200
[FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase
---
.../executiongraph/ExecutionGraphRestartTest.java | 709 ++++++++++-----------
1 file changed, 331 insertions(+), 378 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1c5b650..1e6be72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -35,9 +35,6 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -45,10 +42,19 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -58,23 +64,18 @@ import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Test;
-import javax.annotation.Nonnull;
-
import java.io.IOException;
-import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import scala.concurrent.duration.FiniteDuration;
-
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
@@ -97,7 +98,10 @@ public class ExecutionGraphRestartTest extends TestLogger {
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
- private TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+ private static final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor =
+ TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+ private static final JobID TEST_JOB_ID = new JobID();
@After
public void shutdown() {
@@ -109,8 +113,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testNoManualRestart() throws Exception {
NoRestartStrategy restartStrategy = new NoRestartStrategy();
- Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
- ExecutionGraph eg = executionGraphInstanceTuple.f0;
+ ExecutionGraph eg = createSimpleExecutionGraph(
+ restartStrategy, new SimpleSlotProvider(TEST_JOB_ID, NUM_TASKS), createJobGraph());
eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
@@ -136,149 +140,128 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testRestartAutomatically() throws Exception {
- Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple =
- createExecutionGraph(TestRestartStrategy.directExecuting());
-
- ExecutionGraph eg = executionGraphInstanceTuple.f0;
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ restartAfterFailure(TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(TestRestartStrategy.directExecuting())
+ .buildAndScheduleForExecution(slotPool));
+ }
- restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true);
}
@Test
public void testCancelWhileRestarting() throws Exception {
// We want to manually control the restart and delay
- RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
- Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
- ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
- Instance instance = executionGraphInstanceTuple.f1;
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new InfiniteDelayRestartStrategy())
+ .setTaskManagerLocation(taskManagerLocation)
+ .buildAndScheduleForExecution(slotPool);
- // Kill the instance and wait for the job to restart
- instance.markDead();
- Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+ // Release the TaskManager and wait for the job to restart
+ slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception"));
+ assertEquals(JobStatus.RESTARTING, executionGraph.getState());
- assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+ // Canceling needs to abort the restart
+ executionGraph.cancel();
- // Canceling needs to abort the restart
- executionGraph.cancel();
+ assertEquals(JobStatus.CANCELED, executionGraph.getState());
- assertEquals(JobStatus.CANCELED, executionGraph.getState());
+ // The restart has been aborted
+ executionGraph.restart(executionGraph.getGlobalModVersion());
- // The restart has been aborted
- executionGraph.restart(executionGraph.getGlobalModVersion());
+ assertEquals(JobStatus.CANCELED, executionGraph.getState());
+ }
- assertEquals(JobStatus.CANCELED, executionGraph.getState());
}
@Test
public void testFailWhileRestarting() throws Exception {
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
- Instance instance = ExecutionGraphTestUtils.getInstance(
- new SimpleAckingTaskManagerGateway(),
- NUM_TASKS);
-
- scheduler.newInstanceAvailable(instance);
-
- // Blocking program
- ExecutionGraph executionGraph = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- new JobID(),
- "TestJob",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- // We want to manually control the restart and delay
- new InfiniteDelayRestartStrategy(),
- scheduler);
-
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
-
- JobVertex jobVertex = new JobVertex("NoOpInvokable");
- jobVertex.setInvokableClass(NoOpInvokable.class);
- jobVertex.setParallelism(NUM_TASKS);
-
- JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
-
- executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new InfiniteDelayRestartStrategy())
+ .setTaskManagerLocation(taskManagerLocation)
+ .buildAndScheduleForExecution(slotPool);
- assertEquals(JobStatus.CREATED, executionGraph.getState());
+ // Release the TaskManager and wait for the job to restart
+ slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception"));
- executionGraph.scheduleForExecution();
-
- assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
- // Kill the instance and wait for the job to restart
- instance.markDead();
-
- assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+ assertEquals(JobStatus.RESTARTING, executionGraph.getState());
- // If we fail when being in RESTARTING, then we should try to restart again
- final long globalModVersion = executionGraph.getGlobalModVersion();
- final Exception testException = new Exception("Test exception");
- executionGraph.failGlobal(testException);
+ // If we fail when being in RESTARTING, then we should try to restart again
+ final long globalModVersion = executionGraph.getGlobalModVersion();
+ final Exception testException = new Exception("Test exception");
+ executionGraph.failGlobal(testException);
- assertNotEquals(globalModVersion, executionGraph.getGlobalModVersion());
- assertEquals(JobStatus.RESTARTING, executionGraph.getState());
- assertEquals(testException, executionGraph.getFailureCause()); // we should have updated the failure cause
+ assertNotEquals(globalModVersion, executionGraph.getGlobalModVersion());
+ assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+ assertEquals(testException, executionGraph.getFailureCause()); // we should have updated the failure cause
- // but it should fail when sending a SuppressRestartsException
- executionGraph.failGlobal(new SuppressRestartsException(new Exception("Suppress restart exception")));
+ // but it should fail when sending a SuppressRestartsException
+ executionGraph.failGlobal(new SuppressRestartsException(new Exception("Suppress restart exception")));
- assertEquals(JobStatus.FAILED, executionGraph.getState());
+ assertEquals(JobStatus.FAILED, executionGraph.getState());
- // The restart has been aborted
- executionGraph.restart(executionGraph.getGlobalModVersion());
+ // The restart has been aborted
+ executionGraph.restart(executionGraph.getGlobalModVersion());
- assertEquals(JobStatus.FAILED, executionGraph.getState());
+ assertEquals(JobStatus.FAILED, executionGraph.getState());
+ }
}
@Test
public void testCancelWhileFailing() throws Exception {
- final RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
- final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new InfiniteDelayRestartStrategy())
+ .buildAndScheduleForExecution(slotPool);
- assertEquals(JobStatus.RUNNING, graph.getState());
+ assertEquals(JobStatus.RUNNING, graph.getState());
- // switch all tasks to running
- for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
- vertex.getCurrentExecutionAttempt().switchToRunning();
- }
+ // switch all tasks to running
+ for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
- graph.failGlobal(new Exception("test"));
+ graph.failGlobal(new Exception("test"));
- assertEquals(JobStatus.FAILING, graph.getState());
+ assertEquals(JobStatus.FAILING, graph.getState());
- graph.cancel();
+ graph.cancel();
- assertEquals(JobStatus.CANCELLING, graph.getState());
+ assertEquals(JobStatus.CANCELLING, graph.getState());
- // let all tasks finish cancelling
- completeCanceling(graph);
+ // let all tasks finish cancelling
+ completeCanceling(graph);
+
+ assertEquals(JobStatus.CANCELED, graph.getState());
+ }
- assertEquals(JobStatus.CANCELED, graph.getState());
}
@Test
public void testFailWhileCanceling() throws Exception {
- final RestartStrategy restartStrategy = new NoRestartStrategy();
- final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder().buildAndScheduleForExecution(slotPool);
+
+ assertEquals(JobStatus.RUNNING, graph.getState());
+ switchAllTasksToRunning(graph);
- assertEquals(JobStatus.RUNNING, graph.getState());
- switchAllTasksToRunning(graph);
+ graph.cancel();
- graph.cancel();
+ assertEquals(JobStatus.CANCELLING, graph.getState());
- assertEquals(JobStatus.CANCELLING, graph.getState());
+ graph.failGlobal(new Exception("test"));
- graph.failGlobal(new Exception("test"));
+ assertEquals(JobStatus.FAILING, graph.getState());
- assertEquals(JobStatus.FAILING, graph.getState());
+ // let all tasks finish cancelling
+ completeCanceling(graph);
- // let all tasks finish cancelling
- completeCanceling(graph);
+ assertEquals(JobStatus.FAILED, graph.getState());
+ }
- assertEquals(JobStatus.FAILED, graph.getState());
}
private void switchAllTasksToRunning(ExecutionGraph graph) {
@@ -287,23 +270,28 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testNoRestartOnSuppressException() throws Exception {
- final ExecutionGraph eg = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0;
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0))
+ .buildAndScheduleForExecution(slotPool);
- // Fail with unrecoverable Exception
- eg.getAllExecutionVertices().iterator().next().fail(
- new SuppressRestartsException(new Exception("Test Exception")));
+ // Fail with unrecoverable Exception
+ eg.getAllExecutionVertices().iterator().next().fail(
+ new SuppressRestartsException(new Exception("Test Exception")));
- assertEquals(JobStatus.FAILING, eg.getState());
+ assertEquals(JobStatus.FAILING, eg.getState());
- completeCanceling(eg);
+ completeCanceling(eg);
- eg.waitUntilTerminal();
- assertEquals(JobStatus.FAILED, eg.getState());
+ eg.waitUntilTerminal();
+ assertEquals(JobStatus.FAILED, eg.getState());
- RestartStrategy restartStrategy = eg.getRestartStrategy();
- assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
+ RestartStrategy restartStrategy = eg.getRestartStrategy();
+ assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
+
+ assertEquals(0, ((FixedDelayRestartStrategy) restartStrategy).getCurrentRestartAttempt());
+ }
- assertEquals(0, ((FixedDelayRestartStrategy) restartStrategy).getCurrentRestartAttempt());
}
/**
@@ -313,56 +301,47 @@ public class ExecutionGraphRestartTest extends TestLogger {
*/
@Test
public void testFailingExecutionAfterRestart() throws Exception {
- Instance instance = ExecutionGraphTestUtils.getInstance(
- new SimpleAckingTaskManagerGateway(),
- 2);
-
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
- scheduler.newInstanceAvailable(instance);
-
- TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting();
-
JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
- ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler);
- eg.start(mainThreadExecutor);
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
- assertEquals(JobStatus.CREATED, eg.getState());
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(TestRestartStrategy.directExecuting())
+ .setJobGraph(jobGraph)
+ .setNumberOfTasks(2)
+ .buildAndScheduleForExecution(slotPool);
- eg.scheduleForExecution();
- assertEquals(JobStatus.RUNNING, eg.getState());
+ Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator();
- Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator();
+ Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt();
+ Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt();
- Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt();
- Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt();
+ finishedExecution.markFinished();
- finishedExecution.markFinished();
+ failedExecution.fail(new Exception("Test Exception"));
+ failedExecution.completeCancelling();
- failedExecution.fail(new Exception("Test Exception"));
- failedExecution.completeCancelling();
-
- assertEquals(JobStatus.RUNNING, eg.getState());
+ assertEquals(JobStatus.RUNNING, eg.getState());
- // At this point all resources have been assigned
- for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
- assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource());
- vertex.getCurrentExecutionAttempt().switchToRunning();
- }
+ // At this point all resources have been assigned
+ for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+ assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource());
+ vertex.getCurrentExecutionAttempt().switchToRunning();
+ }
- // fail old finished execution, this should not affect the execution
- finishedExecution.fail(new Exception("This should have no effect"));
+ // fail old finished execution, this should not affect the execution
+ finishedExecution.fail(new Exception("This should have no effect"));
- for (ExecutionVertex vertex: eg.getAllExecutionVertices()) {
- vertex.getCurrentExecutionAttempt().markFinished();
- }
+ for (ExecutionVertex vertex: eg.getAllExecutionVertices()) {
+ vertex.getCurrentExecutionAttempt().markFinished();
+ }
- // the state of the finished execution should have not changed since it is terminal
- assertEquals(ExecutionState.FINISHED, finishedExecution.getState());
+ // the state of the finished execution should have not changed since it is terminal
+ assertEquals(ExecutionState.FINISHED, finishedExecution.getState());
- assertEquals(JobStatus.FINISHED, eg.getState());
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
}
/**
@@ -372,43 +351,27 @@ public class ExecutionGraphRestartTest extends TestLogger {
*/
@Test
public void testFailExecutionAfterCancel() throws Exception {
- Instance instance = ExecutionGraphTestUtils.getInstance(
- new SimpleAckingTaskManagerGateway(),
- 2);
-
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
- scheduler.newInstanceAvailable(instance);
-
- JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
-
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- Integer.MAX_VALUE, Integer.MAX_VALUE));
- JobGraph jobGraph = new JobGraph("Test Job", vertex);
- jobGraph.setExecutionConfig(executionConfig);
-
- ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler);
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new InfiniteDelayRestartStrategy())
+ .setJobGraph(createJobGraphToCancel())
+ .setNumberOfTasks(2)
+ .buildAndScheduleForExecution(slotPool);
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+ // Fail right after cancel (for example with concurrent slot release)
+ eg.cancel();
- assertEquals(JobStatus.CREATED, eg.getState());
+ for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+ v.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+ }
- eg.scheduleForExecution();
- assertEquals(JobStatus.RUNNING, eg.getState());
+ assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
- // Fail right after cancel (for example with concurrent slot release)
- eg.cancel();
+ Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
- for (ExecutionVertex v : eg.getAllExecutionVertices()) {
- v.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+ execution.completeCancelling();
+ assertEquals(JobStatus.CANCELED, eg.getState());
}
-
- assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
-
- Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
- execution.completeCancelling();
- assertEquals(JobStatus.CANCELED, eg.getState());
}
/**
@@ -417,41 +380,25 @@ public class ExecutionGraphRestartTest extends TestLogger {
*/
@Test
public void testFailExecutionGraphAfterCancel() throws Exception {
- Instance instance = ExecutionGraphTestUtils.getInstance(
- new SimpleAckingTaskManagerGateway(),
- 2);
-
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
- scheduler.newInstanceAvailable(instance);
-
- JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
-
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- Integer.MAX_VALUE, Integer.MAX_VALUE));
- JobGraph jobGraph = new JobGraph("Test Job", vertex);
- jobGraph.setExecutionConfig(executionConfig);
-
- ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler);
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(new InfiniteDelayRestartStrategy())
+ .setJobGraph(createJobGraphToCancel())
+ .setNumberOfTasks(2)
+ .buildAndScheduleForExecution(slotPool);
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+ // Fail right after cancel (for example with concurrent slot release)
+ eg.cancel();
+ assertEquals(JobStatus.CANCELLING, eg.getState());
- assertEquals(JobStatus.CREATED, eg.getState());
+ eg.failGlobal(new Exception("Test Exception"));
+ assertEquals(JobStatus.FAILING, eg.getState());
- eg.scheduleForExecution();
- assertEquals(JobStatus.RUNNING, eg.getState());
-
- // Fail right after cancel (for example with concurrent slot release)
- eg.cancel();
- assertEquals(JobStatus.CANCELLING, eg.getState());
-
- eg.failGlobal(new Exception("Test Exception"));
- assertEquals(JobStatus.FAILING, eg.getState());
+ Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
- Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
- execution.completeCancelling();
- assertEquals(JobStatus.RESTARTING, eg.getState());
+ execution.completeCancelling();
+ assertEquals(JobStatus.RESTARTING, eg.getState());
+ }
}
/**
@@ -459,55 +406,29 @@ public class ExecutionGraphRestartTest extends TestLogger {
*/
@Test
public void testSuspendWhileRestarting() throws Exception {
-
- Instance instance = ExecutionGraphTestUtils.getInstance(
- new SimpleAckingTaskManagerGateway(),
- NUM_TASKS);
-
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
- scheduler.newInstanceAvailable(instance);
-
- JobVertex sender = new JobVertex("Task");
- sender.setInvokableClass(NoOpInvokable.class);
- sender.setParallelism(NUM_TASKS);
-
- JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
TestRestartStrategy controllableRestartStrategy = TestRestartStrategy.manuallyTriggered();
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+ .setRestartStrategy(controllableRestartStrategy)
+ .setTaskManagerLocation(taskManagerLocation)
+ .buildAndScheduleForExecution(slotPool);
- ExecutionGraph eg = new ExecutionGraph(
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- new JobID(),
- "Test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
- AkkaUtils.getDefaultTimeout(),
- controllableRestartStrategy,
- scheduler);
-
- eg.start(mainThreadExecutor);
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
- assertEquals(JobStatus.CREATED, eg.getState());
-
- eg.scheduleForExecution();
-
- assertEquals(JobStatus.RUNNING, eg.getState());
-
- instance.markDead();
+ // Release the TaskManager and wait for the job to restart
+ slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception"));
- Assert.assertEquals(1, controllableRestartStrategy.getNumberOfQueuedActions());
+ assertEquals(1, controllableRestartStrategy.getNumberOfQueuedActions());
- assertEquals(JobStatus.RESTARTING, eg.getState());
+ assertEquals(JobStatus.RESTARTING, eg.getState());
- eg.suspend(new Exception("Test exception"));
+ eg.suspend(new Exception("Test exception"));
- assertEquals(JobStatus.SUSPENDED, eg.getState());
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
- controllableRestartStrategy.triggerAll().join();
+ controllableRestartStrategy.triggerAll().join();
- assertEquals(JobStatus.SUSPENDED, eg.getState());
+ assertEquals(JobStatus.SUSPENDED, eg.getState());
+ }
}
@Test
@@ -518,7 +439,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();
final ExecutionGraph eg = createSimpleTestGraph(
- new JobID(),
+ TEST_JOB_ID,
taskManagerGateway,
triggeredRestartStrategy,
createNoOpVertex(parallelism));
@@ -538,15 +459,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
first.fail(new Exception("intended test failure 1"));
last.fail(new Exception("intended test failure 2"));
- Assert.assertEquals(JobStatus.FAILING, eg.getState());
+ assertEquals(JobStatus.FAILING, eg.getState());
completeCancellingForAllVertices(eg);
// Now trigger the restart
- Assert.assertEquals(1, triggeredRestartStrategy.getNumberOfQueuedActions());
+ assertEquals(1, triggeredRestartStrategy.getNumberOfQueuedActions());
triggeredRestartStrategy.triggerAll().join();
- Assert.assertEquals(JobStatus.RUNNING, eg.getState());
+ assertEquals(JobStatus.RUNNING, eg.getState());
switchToRunning(eg);
finishAllVertices(eg);
@@ -558,13 +479,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
@Test
public void testGlobalFailAndRestarts() throws Exception {
final int parallelism = 10;
- final JobID jid = new JobID();
final JobVertex vertex = createNoOpVertex(parallelism);
final NotCancelAckingTaskGateway taskManagerGateway = new NotCancelAckingTaskGateway();
- final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, taskManagerGateway);
+ final SlotProvider slots = new SimpleSlotProvider(TEST_JOB_ID, parallelism, taskManagerGateway);
final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered();
- final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
+ final ExecutionGraph eg = createSimpleTestGraph(TEST_JOB_ID, slots, restartStrategy, vertex);
eg.start(mainThreadExecutor);
eg.setScheduleMode(ScheduleMode.EAGER);
@@ -603,55 +523,57 @@ public class ExecutionGraphRestartTest extends TestLogger {
// this test is inconclusive if not used with a proper multi-threaded executor
assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
- SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
final int parallelism = 20;
- final Scheduler scheduler = createSchedulerWithInstances(parallelism, taskManagerGateway);
- final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ final Scheduler scheduler = createSchedulerWithSlots(parallelism, slotPool, new LocalTaskManagerLocation());
- final JobVertex source = new JobVertex("source");
- source.setInvokableClass(NoOpInvokable.class);
- source.setParallelism(parallelism);
- source.setSlotSharingGroup(sharingGroup);
+ final SlotSharingGroup sharingGroup = new SlotSharingGroup();
- final JobVertex sink = new JobVertex("sink");
- sink.setInvokableClass(NoOpInvokable.class);
- sink.setParallelism(parallelism);
- sink.setSlotSharingGroup(sharingGroup);
- sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(parallelism);
+ source.setSlotSharingGroup(sharingGroup);
- TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting();
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+ sink.setParallelism(parallelism);
+ sink.setSlotSharingGroup(sharingGroup);
+ sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
- final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
- new JobID(),
- scheduler,
- restartStrategy,
- executor,
- source,
- sink);
+ TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting();
- eg.start(mainThreadExecutor);
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+ TEST_JOB_ID,
+ scheduler,
+ restartStrategy,
+ executor,
+ source,
+ sink);
- eg.setScheduleMode(ScheduleMode.EAGER);
- eg.scheduleForExecution();
+ eg.start(mainThreadExecutor);
- switchToRunning(eg);
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
- // fail into 'RESTARTING'
- eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
- new Exception("intended test failure"));
+ switchToRunning(eg);
- assertEquals(JobStatus.FAILING, eg.getState());
+ // fail into 'RESTARTING'
+ eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
+ new Exception("intended test failure"));
- completeCancellingForAllVertices(eg);
+ assertEquals(JobStatus.FAILING, eg.getState());
- assertEquals(JobStatus.RUNNING, eg.getState());
+ completeCancellingForAllVertices(eg);
- // clean termination
- switchToRunning(eg);
- finishAllVertices(eg);
+ assertEquals(JobStatus.RUNNING, eg.getState());
- assertEquals(JobStatus.FINISHED, eg.getState());
+ // clean termination
+ switchToRunning(eg);
+ finishAllVertices(eg);
+
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
}
@Test
@@ -662,43 +584,44 @@ public class ExecutionGraphRestartTest extends TestLogger {
final int numRestarts = 10;
final int parallelism = 20;
- TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
- final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1, taskManagerGateway);
-
- final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+ final Scheduler scheduler = createSchedulerWithSlots(
+ parallelism - 1, slotPool, new LocalTaskManagerLocation());
- final JobVertex source = new JobVertex("source");
- source.setInvokableClass(NoOpInvokable.class);
- source.setParallelism(parallelism);
- source.setSlotSharingGroup(sharingGroup);
+ final SlotSharingGroup sharingGroup = new SlotSharingGroup();
- final JobVertex sink = new JobVertex("sink");
- sink.setInvokableClass(NoOpInvokable.class);
- sink.setParallelism(parallelism);
- sink.setSlotSharingGroup(sharingGroup);
- sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+ final JobVertex source = new JobVertex("source");
+ source.setInvokableClass(NoOpInvokable.class);
+ source.setParallelism(parallelism);
+ source.setSlotSharingGroup(sharingGroup);
- TestRestartStrategy restartStrategy =
- new TestRestartStrategy(numRestarts, false);
+ final JobVertex sink = new JobVertex("sink");
+ sink.setInvokableClass(NoOpInvokable.class);
+ sink.setParallelism(parallelism);
+ sink.setSlotSharingGroup(sharingGroup);
+ sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+ TestRestartStrategy restartStrategy =
+ new TestRestartStrategy(numRestarts, false);
- final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
- new JobID(), scheduler, restartStrategy, executor, source, sink);
+ final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+ TEST_JOB_ID, scheduler, restartStrategy, executor, source, sink);
- eg.start(mainThreadExecutor);
- eg.setScheduleMode(ScheduleMode.EAGER);
- eg.scheduleForExecution();
+ eg.start(mainThreadExecutor);
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution();
- // wait until no more changes happen
- while (eg.getNumberOfFullRestarts() < numRestarts) {
- Thread.sleep(1);
- }
+ // wait until no more changes happen
+ while (eg.getNumberOfFullRestarts() < numRestarts) {
+ Thread.sleep(1);
+ }
- assertEquals(JobStatus.FAILED, eg.getState());
+ assertEquals(JobStatus.FAILED, eg.getState());
- final Throwable t = eg.getFailureCause();
- if (!(t instanceof NoResourceAvailableException)) {
- ExceptionUtils.rethrowException(t, t.getMessage());
+ final Throwable t = eg.getFailureCause();
+ if (!(t instanceof NoResourceAvailableException)) {
+ ExceptionUtils.rethrowException(t, t.getMessage());
+ }
}
}
@@ -710,7 +633,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
public void testFailureWhileRestarting() throws Exception {
final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered();
- final ExecutionGraph executionGraph = createSimpleExecutionGraph(restartStrategy, new TestingSlotProvider(ignored -> new CompletableFuture<>()));
+ final ExecutionGraph executionGraph = createSimpleExecutionGraph(
+ restartStrategy, new TestingSlotProvider(ignored -> new CompletableFuture<>()), createJobGraph());
executionGraph.start(mainThreadExecutor);
executionGraph.setQueuedSchedulingAllowed(true);
@@ -733,66 +657,100 @@ public class ExecutionGraphRestartTest extends TestLogger {
// Utilities
// ------------------------------------------------------------------------
- private Scheduler createSchedulerWithInstances(int num, TaskManagerGateway taskManagerGateway) {
- final Scheduler scheduler = new Scheduler(executor);
- final Instance[] instances = new Instance[num];
+ private static class TestingExecutionGraphBuilder {
+ private RestartStrategy restartStrategy = new NoRestartStrategy();
+ private JobGraph jobGraph = createJobGraph();
+ private int tasksNum = NUM_TASKS;
+ private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
- for (int i = 0; i < instances.length; i++) {
- instances[i] = createInstance(taskManagerGateway, 55443 + i);
- scheduler.newInstanceAvailable(instances[i]);
+ private TestingExecutionGraphBuilder setRestartStrategy(RestartStrategy restartStrategy) {
+ this.restartStrategy = restartStrategy;
+ return this;
}
- return scheduler;
- }
+ private TestingExecutionGraphBuilder setJobGraph(JobGraph jobGraph) {
+ this.jobGraph = jobGraph;
+ return this;
+ }
- private static Instance createInstance(TaskManagerGateway taskManagerGateway, int port) {
- final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
- final TaskManagerLocation location = new TaskManagerLocation(
- ResourceID.generate(), InetAddress.getLoopbackAddress(), port);
- return new Instance(taskManagerGateway, location, new InstanceID(), resources, 1);
- }
+ TestingExecutionGraphBuilder setNumberOfTasks(@SuppressWarnings("SameParameterValue") int tasksNum) {
+ this.tasksNum = tasksNum;
+ return this;
+ }
- // ------------------------------------------------------------------------
+ private TestingExecutionGraphBuilder setTaskManagerLocation(TaskManagerLocation taskManagerLocation) {
+ this.taskManagerLocation = taskManagerLocation;
+ return this;
+ }
- private Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
- Instance instance = ExecutionGraphTestUtils.getInstance(
- new SimpleAckingTaskManagerGateway(),
- NUM_TASKS);
+ private static TestingExecutionGraphBuilder newBuilder() {
+ return new TestingExecutionGraphBuilder();
+ }
- Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
- scheduler.newInstanceAvailable(instance);
+ private ExecutionGraph buildAndScheduleForExecution(SlotPool slotPool) throws Exception {
+ final Scheduler scheduler = createSchedulerWithSlots(tasksNum, slotPool, taskManagerLocation);
+ final ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler, jobGraph);
- ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler);
+ assertEquals(JobStatus.CREATED, eg.getState());
- assertEquals(JobStatus.CREATED, eg.getState());
+ eg.scheduleForExecution();
+ assertEquals(JobStatus.RUNNING, eg.getState());
- eg.scheduleForExecution();
- assertEquals(JobStatus.RUNNING, eg.getState());
- return new Tuple2<>(eg, instance);
+ return eg;
+ }
}
- private ExecutionGraph createSimpleExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException, JobException {
- JobGraph jobGraph = createJobGraph(NUM_TASKS);
+ private static Scheduler createSchedulerWithSlots(
+ int numSlots, SlotPool slotPool, TaskManagerLocation taskManagerLocation) throws Exception {
- ExecutionGraph eg = newExecutionGraph(restartStrategy, slotProvider);
- eg.start(mainThreadExecutor);
- eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+ final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+ setupSlotPool(slotPool);
+ Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.INSTANCE, slotPool);
+ scheduler.start(mainThreadExecutor);
+ slotPool.registerTaskManager(taskManagerLocation.getResourceID());
- return eg;
+ final List<SlotOffer> slotOffers = new ArrayList<>(NUM_TASKS);
+ for (int i = 0; i < numSlots; i++) {
+ final AllocationID allocationId = new AllocationID();
+ final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+ slotOffers.add(slotOffer);
+ }
+
+ slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
+
+ return scheduler;
}
- @Nonnull
- private static JobGraph createJobGraph(int parallelism) {
- JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", parallelism, NoOpInvokable.class);
+ private static void setupSlotPool(SlotPool slotPool) throws Exception {
+ final String jobManagerAddress = "foobar";
+ final ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ slotPool.start(JobMasterId.generate(), jobManagerAddress, mainThreadExecutor);
+ slotPool.connectToResourceManager(resourceManagerGateway);
+ }
+ private static JobGraph createJobGraph() {
+ JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class);
return new JobGraph("Pointwise job", sender);
}
- private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException {
- final ExecutionGraph executionGraph = new ExecutionGraph(
+ private static JobGraph createJobGraphToCancel() throws IOException {
+ JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+ Integer.MAX_VALUE, Integer.MAX_VALUE));
+ JobGraph jobGraph = new JobGraph("Test Job", vertex);
+ jobGraph.setExecutionConfig(executionConfig);
+ return jobGraph;
+ }
+
+ private static ExecutionGraph createSimpleExecutionGraph(
+ RestartStrategy restartStrategy, SlotProvider slotProvider, JobGraph jobGraph)
+ throws IOException, JobException {
+
+ ExecutionGraph executionGraph = new ExecutionGraph(
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new JobID(),
+ TEST_JOB_ID,
"Test job",
new Configuration(),
new SerializedValue<>(new ExecutionConfig()),
@@ -800,12 +758,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
restartStrategy,
slotProvider);
- executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+ executionGraph.start(mainThreadExecutor);
+ executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
return executionGraph;
}
- private void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) {
+ private void restartAfterFailure(ExecutionGraph eg) {
eg.start(mainThreadExecutor);
eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
@@ -817,12 +776,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.RUNNING, eg.getState());
- if (haltAfterRestart) {
- haltExecution(eg);
- }
- }
-
- private static void haltExecution(ExecutionGraph eg) {
finishAllVertices(eg);
assertEquals(JobStatus.FINISHED, eg.getState());
}