You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:06:07 UTC
[10/11] flink git commit: [FLINK-9910][scheduling]
Execution#scheduleForeExecution does not cancel slot future
[FLINK-9910][scheduling] Execution#scheduleForeExecution does not cancel slot future
In order to properly give back an allocated slot to the SlotPool, one must not complete
the result future of Execution#allocateAndAssignSlotForExecution. This commit changes the
behaviour in Execution#scheduleForExecution accordingly.
This closes #6385.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0180d068
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0180d068
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0180d068
Branch: refs/heads/master
Commit: 0180d068565d99db1db998944686064ceddf398f
Parents: b1391a0
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 21:38:42 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200
----------------------------------------------------------------------
.../concurrent/FutureConsumerWithException.java | 43 +++++++++++
.../flink/runtime/executiongraph/Execution.java | 24 +++----
.../ExecutionGraphSchedulingTest.java | 2 +-
.../runtime/executiongraph/ExecutionTest.java | 75 ++++++++++++++++++++
4 files changed, 129 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0180d068/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
new file mode 100644
index 0000000..c49d7dc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+/**
+ * A checked extension of the {@link Consumer} interface which rethrows
+ * exceptions wrapped in a {@link CompletionException}.
+ *
+ * @param <T> type of the first argument
+ * @param <E> type of the thrown exception
+ */
+public interface FutureConsumerWithException<T, E extends Throwable> extends Consumer<T> {
+
+ void acceptWithException(T value) throws E;
+
+ @Override
+ default void accept(T value) {
+ try {
+ acceptWithException(value);
+ } catch (Throwable t) {
+ throw new CompletionException(t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0180d068/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index f8419d3..801f35a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -54,6 +54,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
import org.slf4j.Logger;
@@ -413,24 +414,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
// that we directly deploy the tasks if the slot allocation future is completed. This is
// necessary for immediate deployment.
- final CompletableFuture<Void> deploymentFuture = allocationFuture.handle(
- (Execution ignored, Throwable throwable) -> {
- if (throwable != null) {
- markFailed(ExceptionUtils.stripCompletionException(throwable));
- } else {
- try {
- deploy();
- } catch (Throwable t) {
- markFailed(ExceptionUtils.stripCompletionException(t));
- }
+ final CompletableFuture<Void> deploymentFuture = allocationFuture.thenAccept(
+ (FutureConsumerWithException<Execution, Exception>) value -> deploy());
+
+ deploymentFuture.whenComplete(
+ (Void ignored, Throwable failure) -> {
+ if (failure != null) {
+ markFailed(ExceptionUtils.stripCompletionException(failure));
}
- return null;
- }
- );
+ });
// if tasks have to scheduled immediately check that the task has been deployed
if (!queued && !deploymentFuture.isDone()) {
- allocationFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet."));
+ deploymentFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet."));
}
return deploymentFuture;
http://git-wip-us.apache.org/repos/asf/flink/blob/0180d068/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 6092f52..6680c9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -640,7 +640,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
}
@Nonnull
- private SingleLogicalSlot createSingleLogicalSlot(TestingSlotOwner slotOwner, SimpleAckingTaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
+ static SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner, TaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
TaskManagerLocation location = new TaskManagerLocation(
ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
http://git-wip-us.apache.org/repos/asf/flink/blob/0180d068/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index d3e88e1..56fd7e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -50,6 +51,8 @@ import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@@ -419,6 +422,78 @@ public class ExecutionTest extends TestLogger {
assertThat(execution.getTaskRestore(), is(nullValue()));
}
+ @Test
+ public void testEagerSchedulingFailureReturnsSlot() throws Exception {
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+ final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
+
+ final CompletableFuture<SlotRequestId> slotRequestIdFuture = new CompletableFuture<>();
+ final CompletableFuture<SlotRequestId> returnedSlotFuture = new CompletableFuture<>();
+
+ final TestingSlotProvider slotProvider = new TestingSlotProvider(
+ (SlotRequestId slotRequestId) -> {
+ slotRequestIdFuture.complete(slotRequestId);
+ return new CompletableFuture<>();
+ });
+
+ slotProvider.setSlotCanceller(returnedSlotFuture::complete);
+ slotOwner.getReturnedSlotFuture().thenAccept(
+ (LogicalSlot logicalSlot) -> returnedSlotFuture.complete(logicalSlot.getSlotRequestId()));
+
+ ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+ new JobID(),
+ slotProvider,
+ new NoRestartStrategy(),
+ jobVertex);
+
+ ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+ ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+
+ final Execution execution = executionVertex.getCurrentExecutionAttempt();
+
+ taskManagerGateway.setCancelConsumer(
+ executionAttemptID -> {
+ if (execution.getAttemptId().equals(executionAttemptID)) {
+ execution.cancelingComplete();
+ }
+ }
+ );
+
+ final ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ try {
+ slotRequestIdFuture.thenAcceptAsync(
+ (SlotRequestId slotRequestId) -> {
+ final SingleLogicalSlot singleLogicalSlot = ExecutionGraphSchedulingTest.createSingleLogicalSlot(
+ slotOwner,
+ taskManagerGateway,
+ slotRequestId);
+ slotProvider.complete(slotRequestId, singleLogicalSlot);
+ },
+ executorService);
+
+ final CompletableFuture<Void> schedulingFuture = execution.scheduleForExecution(
+ slotProvider,
+ false,
+ LocationPreferenceConstraint.ANY);
+
+ try {
+ schedulingFuture.get();
+ // cancel the execution in case we could schedule the execution
+ execution.cancel();
+ } catch (ExecutionException ignored) {
+ }
+
+ assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get())));
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
@Nonnull
private JobVertex createNoOpJobVertex() {
final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());