You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/02/21 16:11:40 UTC
[GitHub] flink pull request #5548: [FLINK-8732] [flip6] Cancel ongoing scheduling ope...
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/5548
[FLINK-8732] [flip6] Cancel ongoing scheduling operation
## What is the purpose of the change
Keeps track of ongoing scheduling operations in the ExecutionGraph and cancels
them in case of a concurrent cancel, suspend or fail call. This makes sure that
the original cause for termination is maintained.
cc: @GJL
## Verifying this change
- Added `ExecutionGraphSchedulingTest#testEagerSchedulingWithSlotTimeout`
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink cancelSchedulingOperation
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5548.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5548
----
commit 431346f2c460076d90a0ce6b538609f78b0f6788
Author: Till Rohrmann <tr...@...>
Date: 2018-02-21T14:57:50Z
[FLINK-8732] [flip6] Cancel ongoing scheduling operation
Keeps track of ongoing scheduling operations in the ExecutionGraph and cancels
them in case of a concurrent cancel, suspend or fail call. This makes sure that
the original cause for termination is maintained.
----
---
[GitHub] flink pull request #5548: [FLINK-8732] [flip6] Cancel ongoing scheduling ope...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5548#discussion_r169949268
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java ---
@@ -412,6 +417,54 @@ public void testEagerSchedulingWithSlotTimeout() throws Exception {
verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
}
+ /**
+ * Tests that an ongoing scheduling operation does not fail the {@link ExecutionGraph}
+ * if it gets concurrently cancelled
+ */
+ @Test
+ public void testSchedulingOperationCancellationWhenCancel() throws Exception {
+ final JobVertex jobVertex = new JobVertex("NoOp JobVertex");
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+ jobVertex.setParallelism(2);
+ final JobGraph jobGraph = new JobGraph(jobVertex);
+ jobGraph.setScheduleMode(ScheduleMode.EAGER);
+ jobGraph.setAllowQueuedScheduling(true);
+
+ final CompletableFuture<LogicalSlot> slotFuture1 = new CompletableFuture<>();
+ final CompletableFuture<LogicalSlot> slotFuture2 = new CompletableFuture<>();
+ final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(2);
+ slotProvider.addSlots(jobVertex.getID(), new CompletableFuture[]{slotFuture1, slotFuture2});
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
+
+ executionGraph.scheduleForExecution();
+
+ final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+
+ final TestingLogicalSlot slot = new TestingLogicalSlot(
+ new LocalTaskManagerLocation(),
+ new SimpleAckingTaskManagerGateway(),
+ 0,
+ new AllocationID(),
+ new SlotRequestId(),
+ new SlotSharingGroupId(),
+ releaseFuture);
+ slotFuture1.complete(slot);
+
+ // cancel should change the state of all executions to CANCELLED
+ executionGraph.cancel();
+
+ // complete the now CANCELLED execution --> this should cause a failure
+ slotFuture2.complete(new TestingLogicalSlot());
+
+ Thread.sleep(1L);
--- End diff --
Did you include this to make the race more likely to happen?
---
[GitHub] flink pull request #5548: [FLINK-8732] [flip6] Cancel ongoing scheduling ope...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5548#discussion_r169960392
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java ---
@@ -412,6 +417,54 @@ public void testEagerSchedulingWithSlotTimeout() throws Exception {
verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
}
+ /**
+ * Tests that an ongoing scheduling operation does not fail the {@link ExecutionGraph}
+ * if it gets concurrently cancelled
+ */
+ @Test
+ public void testSchedulingOperationCancellationWhenCancel() throws Exception {
+ final JobVertex jobVertex = new JobVertex("NoOp JobVertex");
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+ jobVertex.setParallelism(2);
+ final JobGraph jobGraph = new JobGraph(jobVertex);
+ jobGraph.setScheduleMode(ScheduleMode.EAGER);
+ jobGraph.setAllowQueuedScheduling(true);
+
+ final CompletableFuture<LogicalSlot> slotFuture1 = new CompletableFuture<>();
+ final CompletableFuture<LogicalSlot> slotFuture2 = new CompletableFuture<>();
+ final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(2);
+ slotProvider.addSlots(jobVertex.getID(), new CompletableFuture[]{slotFuture1, slotFuture2});
+ final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
+
+ executionGraph.scheduleForExecution();
+
+ final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+
+ final TestingLogicalSlot slot = new TestingLogicalSlot(
+ new LocalTaskManagerLocation(),
+ new SimpleAckingTaskManagerGateway(),
+ 0,
+ new AllocationID(),
+ new SlotRequestId(),
+ new SlotSharingGroupId(),
+ releaseFuture);
+ slotFuture1.complete(slot);
+
+ // cancel should change the state of all executions to CANCELLED
+ executionGraph.cancel();
+
+ // complete the now CANCELLED execution --> this should cause a failure
+ slotFuture2.complete(new TestingLogicalSlot());
+
+ Thread.sleep(1L);
--- End diff --
Yes.
---
[GitHub] flink pull request #5548: [FLINK-8732] [flip6] Cancel ongoing scheduling ope...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5548#discussion_r169960433
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
@@ -377,14 +377,14 @@ public boolean scheduleForExecution() {
* @param queued Flag to indicate whether the scheduler may queue this task if it cannot
* immediately deploy it.
* @param locationPreferenceConstraint constraint for the location preferences
- * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
+ * @returns Future which is completed once the Execution has been deployed
--- End diff --
True, will change it.
---
[GitHub] flink pull request #5548: [FLINK-8732] [flip6] Cancel ongoing scheduling ope...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5548
---
[GitHub] flink pull request #5548: [FLINK-8732] [flip6] Cancel ongoing scheduling ope...
Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5548#discussion_r169943180
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
@@ -377,14 +377,14 @@ public boolean scheduleForExecution() {
* @param queued Flag to indicate whether the scheduler may queue this task if it cannot
* immediately deploy it.
* @param locationPreferenceConstraint constraint for the location preferences
- * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
+ * @returns Future which is completed once the Execution has been deployed
--- End diff --
Should be `@return` not `@returns`.
---