You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:30 UTC
[16/19] flink git commit: [FLINK-5747] [distributed coordination]
Eager scheduling allocates slots and deploys tasks in bulk
[FLINK-5747] [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk
That way, strictly topological deployment can be guaranteed.
Also, many quick deploy/not-enough-resources/fail/recover cycles can be
avoided in the cases where resources need some time to appear.
This closes #3295
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f113d794
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f113d794
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f113d794
Branch: refs/heads/master
Commit: f113d79451ba88c487358861cc3e20aac3d19257
Parents: 5902ea0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 3 20:26:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:24 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/util/ExceptionUtils.java | 12 +
.../apache/flink/util/ExceptionUtilsTest.java | 60 ++
.../flink/runtime/concurrent/Executors.java | 3 +-
.../flink/runtime/concurrent/FutureUtils.java | 115 ++++
.../flink/runtime/executiongraph/Execution.java | 73 ++-
.../executiongraph/ExecutionAndSlot.java | 46 ++
.../runtime/executiongraph/ExecutionGraph.java | 170 +++++-
.../executiongraph/ExecutionGraphUtils.java | 106 ++++
.../executiongraph/ExecutionJobVertex.java | 46 +-
.../runtime/executiongraph/ExecutionVertex.java | 3 +-
.../IllegalExecutionStateException.java | 53 ++
.../apache/flink/runtime/instance/SlotPool.java | 9 +-
.../runtime/concurrent/FutureUtilsTest.java | 194 ++++++
.../ExecutionGraphSchedulingTest.java | 610 +++++++++++++++++++
.../executiongraph/ExecutionGraphUtilsTest.java | 124 ++++
.../ExecutionVertexCancelTest.java | 2 +-
.../ExecutionVertexSchedulingTest.java | 3 -
.../executiongraph/PointwisePatternTest.java | 12 +-
.../executiongraph/ProgrammedSlotProvider.java | 87 +++
.../TerminalJobStatusListener.java | 45 ++
.../LeaderChangeJobRecoveryTest.java | 23 +-
.../runtime/minicluster/MiniClusterITCase.java | 28 +-
.../Flip6LocalStreamEnvironment.java | 4 +-
23 files changed, 1735 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 6ba9ef6..69c2692 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -101,6 +101,18 @@ public final class ExceptionUtils {
}
/**
+ * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM.
+ * See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a definition of fatal errors.
+ *
+ * @param t The Throwable to check and rethrow.
+ */
+ public static void rethrowIfFatalError(Throwable t) {
+ if (isJvmFatalError(t)) {
+ throw (Error) t;
+ }
+ }
+
+ /**
* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
* to a prior exception, or returns the new exception, if no prior exception exists.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
new file mode 100644
index 0000000..343b9d6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/ExceptionUtilsTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the utility methods in {@link ExceptionUtils}.
+ */
+public class ExceptionUtilsTest {
+
+ @Test
+ public void testStringifyNullException() {
+ assertNotNull(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION);
+ assertEquals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION, ExceptionUtils.stringifyException(null));
+ }
+
+ @Test
+ public void testJvmFatalError() {
+ // not all errors are fatal
+ assertFalse(ExceptionUtils.isJvmFatalError(new Error()));
+
+ // linkage errors are not fatal
+ assertFalse(ExceptionUtils.isJvmFatalError(new LinkageError()));
+
+ // some errors are fatal
+ assertTrue(ExceptionUtils.isJvmFatalError(new InternalError()));
+ assertTrue(ExceptionUtils.isJvmFatalError(new UnknownError()));
+ }
+
+ @Test
+ public void testRethrowFatalError() {
+ // fatal error is rethrown
+ try {
+ ExceptionUtils.rethrowIfFatalError(new InternalError());
+ fail();
+ } catch (InternalError ignored) {}
+
+ // non-fatal error is not rethrown
+ ExceptionUtils.rethrowIfFatalError(new NoClassDefFoundError());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 391f233..63b6a25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -52,7 +53,7 @@ public class Executors {
private DirectExecutor() {}
@Override
- public void execute(Runnable command) {
+ public void execute(@Nonnull Runnable command) {
command.run();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index a404c98..4948147 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -20,11 +20,22 @@ package org.apache.flink.runtime.concurrent;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A collection of utilities that expand the usage of {@link Future} and {@link CompletableFuture}.
+ */
public class FutureUtils {
+ // ------------------------------------------------------------------------
+ // retrying operations
+ // ------------------------------------------------------------------------
+
/**
* Retry the given operation the given number of times in case of a failure.
*
@@ -88,4 +99,108 @@ public class FutureUtils {
super(cause);
}
}
+
+ // ------------------------------------------------------------------------
+ // composing futures
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a future that is complete once multiple other futures completed.
+ * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
+ * conjunction fails.
+ *
+ * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
+ * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
+ *
+ * @param futures The futures that make up the conjunction. No null entries are allowed.
+ * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
+ */
+ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
+ checkNotNull(futures, "futures");
+
+ final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
+
+ if (futures.isEmpty()) {
+ conjunct.complete(null);
+ }
+ else {
+ for (Future<?> future : futures) {
+ future.handle(conjunct.completionHandler);
+ }
+ }
+
+ return conjunct;
+ }
+
+ /**
+ * A future that is complete once multiple other futures completed. The futures are not
+ * necessarily of the same type, which is why the type of this Future is {@code Void}.
+ * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
+ * conjunction fails.
+ *
+ * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
+ * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
+ * many of the Futures are already complete.
+ */
+ public interface ConjunctFuture extends CompletableFuture<Void> {
+
+ /**
+ * Gets the total number of Futures in the conjunction.
+ * @return The total number of Futures in the conjunction.
+ */
+ int getNumFuturesTotal();
+
+ /**
+ * Gets the number of Futures in the conjunction that are already complete.
+ * @return The number of Futures in the conjunction that are already complete
+ */
+ int getNumFuturesCompleted();
+ }
+
+ /**
+ * The implementation of the {@link ConjunctFuture}.
+ *
+ * <p>Implementation notice: The member fields all have package-private access, because they are
+ * either accessed by an inner subclass or by the enclosing class.
+ */
+ private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
+
+ /** The total number of futures in the conjunction */
+ final int numTotal;
+
+ /** The number of futures in the conjunction that are already complete */
+ final AtomicInteger numCompleted = new AtomicInteger();
+
+ /** The function that is attached to all futures in the conjunction. Once a future
+ * is complete, this function tracks the completion or fails the conjunct.
+ */
+ final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
+
+ @Override
+ public Void apply(Object o, Throwable throwable) {
+ if (throwable != null) {
+ completeExceptionally(throwable);
+ }
+ else if (numTotal == numCompleted.incrementAndGet()) {
+ complete(null);
+ }
+
+ return null;
+ }
+ };
+
+ ConjunctFutureImpl(int numTotal) {
+ this.numTotal = numTotal;
+ }
+
+ @Override
+ public int getNumFuturesTotal() {
+ return numTotal;
+ }
+
+ @Override
+ public int getNumFuturesCompleted() {
+ return numCompleted.get();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 60e5575..b3fe443 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -249,27 +249,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
*/
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
- if (slotProvider == null) {
- throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
- }
-
- final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
- final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
-
- // sanity check
- if (locationConstraint != null && sharingGroup == null) {
- throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
- }
-
- if (transitionState(CREATED, SCHEDULED)) {
-
- ScheduledUnit toSchedule = locationConstraint == null ?
- new ScheduledUnit(this, sharingGroup) :
- new ScheduledUnit(this, sharingGroup, locationConstraint);
-
- // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
- // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
- final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued);
+ try {
+ final Future<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
// that we directly deploy the tasks if the slot allocation future is completed. This is
@@ -296,28 +277,54 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
});
// if tasks have to scheduled immediately check that the task has been deployed
- // TODO: This might be problematic if the future is not completed right away
- if (!queued) {
- if (!deploymentFuture.isDone()) {
- markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet."));
- }
+ if (!queued && !deploymentFuture.isDone()) {
+ markFailed(new IllegalArgumentException("The slot allocation future has not been completed yet."));
}
-
+
return true;
}
+ catch (IllegalExecutionStateException e) {
+ return false;
+ }
+ }
+
+ public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued)
+ throws IllegalExecutionStateException {
+
+ checkNotNull(slotProvider);
+
+ final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
+ final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
+
+ // sanity check
+ if (locationConstraint != null && sharingGroup == null) {
+ throw new IllegalStateException(
+ "Trying to schedule with co-location constraint but without slot sharing allowed.");
+ }
+
+ // this method only works if the execution is in the state 'CREATED'
+ if (transitionState(CREATED, SCHEDULED)) {
+
+ ScheduledUnit toSchedule = locationConstraint == null ?
+ new ScheduledUnit(this, sharingGroup) :
+ new ScheduledUnit(this, sharingGroup, locationConstraint);
+
+ return slotProvider.allocateSlot(toSchedule, queued);
+ }
else {
// call race, already deployed, or already done
- return false;
+ throw new IllegalExecutionStateException(this, CREATED, state);
}
}
public void deployToSlot(final SimpleSlot slot) throws JobException {
- // sanity checks
- if (slot == null) {
- throw new NullPointerException();
- }
+ checkNotNull(slot);
+
+ // Check if the TaskManager died in the meantime
+ // This only speeds up the response to TaskManagers failing concurrently to deployments.
+ // The more general check is the timeout of the deployment call
if (!slot.isAlive()) {
- throw new JobException("Target slot for deployment is not alive.");
+ throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
}
// make sure exactly one deployment call happens from the correct state
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
new file mode 100644
index 0000000..ea6186e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A pair of an {@link Execution} together with a slot future.
+ */
+public class ExecutionAndSlot {
+
+ public final Execution executionAttempt;
+
+ public final Future<SimpleSlot> slotFuture;
+
+ public ExecutionAndSlot(Execution executionAttempt, Future<SimpleSlot> slotFuture) {
+ this.executionAttempt = checkNotNull(executionAttempt);
+ this.slotFuture = checkNotNull(slotFuture);
+ }
+
+ // -----------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f25120c..ad4347d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.commons.lang3.StringUtils;
+
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
@@ -40,9 +41,14 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -53,6 +59,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
@@ -60,6 +67,7 @@ import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,11 +85,14 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* The execution graph is the central data structure that coordinates the distributed
@@ -158,7 +169,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
private final long[] stateTimestamps;
/** The timeout for all messages that require a response/acknowledgement */
- private final Time timeout;
+ private final Time rpcCallTimeout;
// ------ Configuration of the Execution -------
@@ -171,6 +182,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
+ private final Time scheduleAllocationTimeout;
+
// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
/** Current status of the job execution */
@@ -292,7 +305,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
this.stateTimestamps = new long[JobStatus.values().length];
this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
- this.timeout = timeout;
+ this.rpcCallTimeout = checkNotNull(timeout);
+ this.scheduleAllocationTimeout = checkNotNull(timeout);
this.restartStrategy = restartStrategy;
@@ -695,7 +709,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv =
- new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
+ new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
@@ -717,9 +731,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
- if (slotProvider == null) {
- throw new IllegalArgumentException("Scheduler must not be null.");
- }
+ checkNotNull(slotProvider);
if (this.slotProvider != null && this.slotProvider != slotProvider) {
throw new IllegalArgumentException("Cannot use different slot providers for the same job");
@@ -731,18 +743,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
switch (scheduleMode) {
case LAZY_FROM_SOURCES:
- // simply take the vertices without inputs.
- for (ExecutionJobVertex ejv : this.tasks.values()) {
- if (ejv.getJobVertex().isInputVertex()) {
- ejv.scheduleAll(slotProvider, allowQueuedScheduling);
- }
- }
+ scheduleLazy(slotProvider);
break;
case EAGER:
- for (ExecutionJobVertex ejv : getVerticesTopologically()) {
- ejv.scheduleAll(slotProvider, allowQueuedScheduling);
- }
+ scheduleEager(slotProvider, scheduleAllocationTimeout);
break;
default:
@@ -754,6 +759,139 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
+ private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
+ // simply take the vertices without inputs.
+ for (ExecutionJobVertex ejv : this.tasks.values()) {
+ if (ejv.getJobVertex().isInputVertex()) {
+ ejv.scheduleAll(slotProvider, allowQueuedScheduling);
+ }
+ }
+ }
+
+ /**
+ *
+ *
+ * @param slotProvider The resource provider from which the slots are allocated
+ * @param timeout The maximum time that the deployment may take, before a
+ * TimeoutException is thrown.
+ */
+ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
+ checkState(state == JobStatus.RUNNING, "job is not running currently");
+
+ // Important: reserve all the space we need up front.
+ // that way we do not have any operation that can fail between allocating the slots
+ // and adding them to the list. If we had a failure in between there, that would
+ // cause the slots to get lost
+ final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
+ final boolean queued = allowQueuedScheduling;
+
+ // we use this flag to handle failures in a 'finally' clause
+ // that allows us to not go through clumsy cast-and-rethrow logic
+ boolean successful = false;
+
+ try {
+ // collecting all the slots may resize and fail in that operation without slots getting lost
+ final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+
+ // allocate the slots (obtain all their futures
+ for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+ // these calls are not blocking, they only return futures
+ ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
+
+ // we need to first add the slots to this list, to be safe on release
+ resources.add(slots);
+
+ for (ExecutionAndSlot ens : slots) {
+ slotFutures.add(ens.slotFuture);
+ }
+ }
+
+ // this future is complete once all slot futures are complete.
+ // the future fails once one slot future fails.
+ final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
+
+ // make sure that we fail if the allocation timeout was exceeded
+ final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ // When the timeout triggers, we try to complete the conjunct future with an exception.
+ // Note that this is a no-op if the future is already completed
+ int numTotal = allAllocationsComplete.getNumFuturesTotal();
+ int numComplete = allAllocationsComplete.getNumFuturesCompleted();
+ String message = "Could not allocate all requires slots within timeout of " +
+ timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;
+
+ allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message));
+ }
+ }, timeout.getSize(), timeout.getUnit());
+
+
+ allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() {
+
+ @Override
+ public Void apply(Void ignored, Throwable throwable) {
+ try {
+ // we do not need the cancellation timeout any more
+ timeoutCancelHandle.cancel(false);
+
+ if (throwable == null) {
+ // successfully obtained all slots, now deploy
+
+ for (ExecutionAndSlot[] jobVertexTasks : resources) {
+ for (ExecutionAndSlot execAndSlot : jobVertexTasks) {
+
+ // the futures must all be ready - this is simply a sanity check
+ final SimpleSlot slot;
+ try {
+ slot = execAndSlot.slotFuture.getNow(null);
+ checkNotNull(slot);
+ }
+ catch (ExecutionException | NullPointerException e) {
+ throw new IllegalStateException("SlotFuture is incomplete " +
+ "or erroneous even though all futures completed");
+ }
+
+ // actual deployment
+ execAndSlot.executionAttempt.deployToSlot(slot);
+ }
+ }
+ }
+ else {
+ // let the exception handler deal with this
+ throw throwable;
+ }
+ }
+ catch (Throwable t) {
+ // we catch everything here to make sure cleanup happens and the
+ // ExecutionGraph notices
+ // we need to go into recovery and make sure to release all slots
+ try {
+ fail(t);
+ }
+ finally {
+ ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+ }
+ }
+
+ // Wouldn't it be nice if we could return an actual Void object?
+ // return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
+ return null;
+ }
+ }, futureExecutor);
+
+ // from now on, slots will be rescued by the the futures and their completion, or by the timeout
+ successful = true;
+ }
+ finally {
+ if (!successful) {
+ // we come here only if the 'try' block finished with an exception
+ // we release the slots (possibly failing some executions on the way) and
+ // let the exception bubble up
+ ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+ }
+ }
+ }
+
public void cancel() {
while (true) {
JobStatus current = state;
@@ -971,7 +1109,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
} catch (IOException | ClassNotFoundException e) {
LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", getJobID(), e);
- };
+ }
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
new file mode 100644
index 0000000..cd6d6aa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.List;
+
+/**
+ * Utilities for dealing with the execution graphs and scheduling.
+ */
+public class ExecutionGraphUtils {
+
+ /**
+ * Releases the slot represented by the given future. If the future is complete, the
+ * slot is immediately released. Otherwise, the slot is released as soon as the future
+ * is completed.
+ *
+ * <p>Note that releasing the slot means cancelling any task execution currently
+ * associated with that slot.
+ *
+ * @param slotFuture The future for the slot to release.
+ */
+ public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) {
+ slotFuture.handle(ReleaseSlotFunction.INSTANCE);
+ }
+
+ /**
+ * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot}.
+ * For each future in that collection holds: If the future is complete, its slot is
+ * immediately released. Otherwise, the slot is released as soon as the future
+ * is completed.
+ *
+ * <p>This methods never throws any exceptions (except for fatal exceptions) and continues
+ * to release the remaining slots if one slot release failed.
+ *
+ * <p>Note that releasing the slot means cancelling any task execution currently
+ * associated with that slot.
+ *
+ * @param resources The collection of ExecutionAndSlot whose slots should be released.
+ */
+ public static void releaseAllSlotsSilently(List<ExecutionAndSlot[]> resources) {
+ try {
+ for (ExecutionAndSlot[] jobVertexResources : resources) {
+ if (jobVertexResources != null) {
+ for (ExecutionAndSlot execAndSlot : jobVertexResources) {
+ if (execAndSlot != null) {
+ try {
+ releaseSlotFuture(execAndSlot.slotFuture);
+ }
+ catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalError(t);
+ }
+ }
+ }
+ }
+ }
+ }
+ catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalError(t);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A function to be applied into a future, releasing the slot immediately upon completion.
+ * Completion here refers to both the successful and exceptional completion.
+ */
+ private static final class ReleaseSlotFunction implements BiFunction<SimpleSlot, Throwable, Void> {
+
+ static final ReleaseSlotFunction INSTANCE = new ReleaseSlotFunction();
+
+ @Override
+ public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+ if (simpleSlot != null) {
+ simpleSlot.releaseSlot();
+ }
+ return null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Utility class is not meant to be instantiated */
+ private ExecutionGraphUtils() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3828fc9..754148e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -30,7 +30,9 @@ import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -388,7 +390,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
public void scheduleAll(SlotProvider slotProvider, boolean queued) {
- ExecutionVertex[] vertices = this.taskVertices;
+ final ExecutionVertex[] vertices = this.taskVertices;
// kick off the tasks
for (ExecutionVertex ev : vertices) {
@@ -396,6 +398,48 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
}
+ /**
+ * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
+ * pairs of the slots and execution attempts, to ease correlation between vertices and execution
+ * attempts.
+ *
+ * <p>If this method throws an exception, it makes sure to release all so far requested slots.
+ *
+ * @param resourceProvider The resource provider from whom the slots are requested.
+ */
+ public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
+ final ExecutionVertex[] vertices = this.taskVertices;
+ final ExecutionAndSlot[] slots = new ExecutionAndSlot[vertices.length];
+
+ // try to acquire a slot future for each execution.
+ // we store the execution with the future just to be on the safe side
+ for (int i = 0; i < vertices.length; i++) {
+
+ // we use this flag to handle failures in a 'finally' clause
+ // that allows us to not go through clumsy cast-and-rethrow logic
+ boolean successful = false;
+
+ try {
+ // allocate the next slot (future)
+ final Execution exec = vertices[i].getCurrentExecutionAttempt();
+ final Future<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued);
+ slots[i] = new ExecutionAndSlot(exec, future);
+ successful = true;
+ }
+ finally {
+ if (!successful) {
+ // this is the case if an exception was thrown
+ for (int k = 0; k < i; k++) {
+ ExecutionGraphUtils.releaseSlotFuture(slots[k].slotFuture);
+ }
+ }
+ }
+ }
+
+ // all good, we acquired all slots
+ return slots;
+ }
+
public void cancel() {
for (ExecutionVertex ev : getTaskVertices()) {
ev.cancel();
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 92327fd..ca8e07c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -102,6 +102,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout) {
+
this(
jobVertex,
subTaskIndex,
@@ -133,7 +134,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
this.taskNameWithSubtask = String.format("%s (%d/%d)",
jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());
- this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1);
+ this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);
for (IntermediateResult result : producedDataSets) {
IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex);
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
new file mode 100644
index 0000000..44162ab
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IllegalExecutionStateException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+
+/**
+ * A special {@link IllegalStateException} indicating a mismatch in the expected and actual
+ * {@link ExecutionState} of an {@link Execution}.
+ */
+public class IllegalExecutionStateException extends IllegalStateException {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new IllegalExecutionStateException with the error message indicating
+ * the expected and actual state.
+ *
+ * @param expected The expected state
+ * @param actual The actual state
+ */
+ public IllegalExecutionStateException(ExecutionState expected, ExecutionState actual) {
+ super("Invalid execution state: Expected " + expected + " , found " + actual);
+ }
+
+ /**
+ * Creates a new IllegalExecutionStateException with the error message indicating
+ * the expected and actual state.
+ *
+ * @param expected The expected state
+ * @param actual The actual state
+ */
+ public IllegalExecutionStateException(Execution execution, ExecutionState expected, ExecutionState actual) {
+ super(execution.getVertexWithAttempt() + " is no longer in expected state " + expected +
+ " but in state " + actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 4da6c7b..8ba5040 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1048,9 +1048,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
private final long timestamp;
- SlotAndTimestamp(
- AllocatedSlot slot,
- long timestamp) {
+ SlotAndTimestamp(AllocatedSlot slot, long timestamp) {
this.slot = slot;
this.timestamp = timestamp;
}
@@ -1062,5 +1060,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
public long timestamp() {
return timestamp;
}
+
+ @Override
+ public String toString() {
+ return slot + " @ " + timestamp;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
new file mode 100644
index 0000000..43710cb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the utility methods in {@link FutureUtils}
+ */
+public class FutureUtilsTest {
+
+ @Test
+ public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
+ try {
+ FutureUtils.combineAll(null);
+ fail();
+ } catch (NullPointerException ignored) {}
+
+ try {
+ FutureUtils.combineAll(Arrays.asList(
+ new FlinkCompletableFuture<Object>(),
+ null,
+ new FlinkCompletableFuture<Object>()));
+ fail();
+ } catch (NullPointerException ignored) {}
+ }
+
+ @Test
+ public void testConjunctFutureCompletion() throws Exception {
+ // some futures that we combine
+ CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+
+ // some future is initially completed
+ future2.complete(new Object());
+
+ // build the conjunct future
+ ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+
+ Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {}
+ });
+
+ assertEquals(4, result.getNumFuturesTotal());
+ assertEquals(1, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ // complete two more futures
+ future4.complete(new Object());
+ assertEquals(2, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ future1.complete(new Object());
+ assertEquals(3, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ // complete one future again
+ future1.complete(new Object());
+ assertEquals(3, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ // complete the final future
+ future3.complete(new Object());
+ assertEquals(4, result.getNumFuturesCompleted());
+ assertTrue(result.isDone());
+ assertTrue(resultMapped.isDone());
+ }
+
+ @Test
+ public void testConjunctFutureFailureOnFirst() throws Exception {
+
+ CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+
+ // build the conjunct future
+ ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+
+ Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {}
+ });
+
+ assertEquals(4, result.getNumFuturesTotal());
+ assertEquals(0, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ future2.completeExceptionally(new IOException());
+
+ assertEquals(0, result.getNumFuturesCompleted());
+ assertTrue(result.isDone());
+ assertTrue(resultMapped.isDone());
+
+ try {
+ result.get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ }
+
+ try {
+ resultMapped.get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ }
+ }
+
+ @Test
+ public void testConjunctFutureFailureOnSuccessive() throws Exception {
+
+ CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
+ CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+
+ // build the conjunct future
+ ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
+ assertEquals(4, result.getNumFuturesTotal());
+
+ Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
+ @Override
+ public void accept(Void value) {}
+ });
+
+ future1.complete(new Object());
+ future3.complete(new Object());
+ future4.complete(new Object());
+
+ future2.completeExceptionally(new IOException());
+
+ assertEquals(3, result.getNumFuturesCompleted());
+ assertTrue(result.isDone());
+ assertTrue(resultMapped.isDone());
+
+ try {
+ result.get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ }
+
+ try {
+ resultMapped.get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ }
+ }
+
+ @Test
+ public void testConjunctOfNone() throws Exception {
+ final ConjunctFuture result = FutureUtils.combineAll(Collections.<Future<Object>>emptyList());
+
+ assertEquals(0, result.getNumFuturesTotal());
+ assertEquals(0, result.getNumFuturesCompleted());
+ assertTrue(result.isDone());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
new file mode 100644
index 0000000..9834dc6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.Timeout;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the scheduling of the execution graph. This tests that
+ * for example the order of deployments is correct and that bulk slot allocation
+ * works properly.
+ */
+public class ExecutionGraphSchedulingTest extends TestLogger {
+
+ private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+ @After
+ public void shutdown() {
+ executor.shutdownNow();
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+
+ /**
+ * Tests that with scheduling futures and pipelined deployment, the target vertex will
+ * not deploy its task before the source vertex does.
+ */
+ @Test
+ public void testScheduleSourceBeforeTarget() throws Exception {
+
+ // [pipelined]
+ // we construct a simple graph (source) ----------------> (target)
+
+ final int parallelism = 1;
+
+ final JobVertex sourceVertex = new JobVertex("source");
+ sourceVertex.setParallelism(parallelism);
+ sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+ final JobVertex targetVertex = new JobVertex("target");
+ targetVertex.setParallelism(parallelism);
+ targetVertex.setInvokableClass(NoOpInvokable.class);
+
+ targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ final JobID jobId = new JobID();
+ final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
+
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ //
+ // set up two TaskManager gateways and slots
+
+ final TaskManagerGateway gatewaySource = createTaskManager();
+ final TaskManagerGateway gatewayTarget = createTaskManager();
+
+ final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
+ final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
+
+ final FlinkCompletableFuture<SimpleSlot> sourceFuture = new FlinkCompletableFuture<>();
+ final FlinkCompletableFuture<SimpleSlot> targetFuture = new FlinkCompletableFuture<>();
+
+ ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+ slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
+ slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
+
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.setQueuedSchedulingAllowed(true);
+ eg.scheduleForExecution(slotProvider);
+
+ // job should be running
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ // we fulfill the target slot before the source slot
+ // that should not cause a deployment or deployment related failure
+ targetFuture.complete(targetSlot);
+
+ verify(gatewayTarget, new Timeout(50, times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ // now supply the source slot
+ sourceFuture.complete(sourceSlot);
+
+ // by now, all deployments should have happened
+ verify(gatewaySource, timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+ verify(gatewayTarget, timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+ assertEquals(JobStatus.RUNNING, eg.getState());
+ }
+
+ /**
+ * This test verifies that before deploying a pipelined connected component, the
+ * full set of slots is available, and that not some tasks are deployed, and later the
+ * system realizes that not enough resources are available.
+ */
+ @Test
+ public void testDeployPipelinedConnectedComponentsTogether() throws Exception {
+
+ // [pipelined]
+ // we construct a simple graph (source) ----------------> (target)
+
+ final int parallelism = 8;
+
+ final JobVertex sourceVertex = new JobVertex("source");
+ sourceVertex.setParallelism(parallelism);
+ sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+ final JobVertex targetVertex = new JobVertex("target");
+ targetVertex.setParallelism(parallelism);
+ targetVertex.setInvokableClass(NoOpInvokable.class);
+
+ targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ final JobID jobId = new JobID();
+ final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
+
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ //
+ // Create the slots, futures, and the slot provider
+
+ final TaskManagerGateway[] sourceTaskManagers = new TaskManagerGateway[parallelism];
+ final TaskManagerGateway[] targetTaskManagers = new TaskManagerGateway[parallelism];
+
+ final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
+ final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
+
+ for (int i = 0; i < parallelism; i++) {
+ sourceTaskManagers[i] = createTaskManager();
+ targetTaskManagers[i] = createTaskManager();
+
+ sourceSlots[i] = createSlot(sourceTaskManagers[i], jobId);
+ targetSlots[i] = createSlot(targetTaskManagers[i], jobId);
+
+ sourceFutures[i] = new FlinkCompletableFuture<>();
+ targetFutures[i] = new FlinkCompletableFuture<>();
+ }
+
+ ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+ slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
+ slotProvider.addSlots(targetVertex.getID(), targetFutures);
+
+ //
+ // we complete some of the futures
+
+ for (int i = 0; i < parallelism; i += 2) {
+ sourceFutures[i].complete(sourceSlots[i]);
+ }
+
+ //
+ // kick off the scheduling
+
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.setQueuedSchedulingAllowed(true);
+ eg.scheduleForExecution(slotProvider);
+
+ verifyNothingDeployed(eg, sourceTaskManagers);
+
+ // complete the remaining sources
+ for (int i = 1; i < parallelism; i += 2) {
+ sourceFutures[i].complete(sourceSlots[i]);
+ }
+ verifyNothingDeployed(eg, sourceTaskManagers);
+
+ // complete the targets except for one
+ for (int i = 1; i < parallelism; i++) {
+ targetFutures[i].complete(targetSlots[i]);
+ }
+ verifyNothingDeployed(eg, targetTaskManagers);
+
+ // complete the last target slot future
+ targetFutures[0].complete(targetSlots[0]);
+
+ //
+ // verify that all deployments have happened
+
+ for (TaskManagerGateway gateway : sourceTaskManagers) {
+ verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+ }
+ for (TaskManagerGateway gateway : targetTaskManagers) {
+ verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+ }
+ }
+
+ /**
+ * This test verifies that if one slot future fails, the deployment will be aborted.
+ */
+ @Test
+ public void testOneSlotFailureAbortsDeploy() throws Exception {
+
+ // [pipelined]
+ // we construct a simple graph (source) ----------------> (target)
+
+ final int parallelism = 6;
+
+ final JobVertex sourceVertex = new JobVertex("source");
+ sourceVertex.setParallelism(parallelism);
+ sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+ final JobVertex targetVertex = new JobVertex("target");
+ targetVertex.setParallelism(parallelism);
+ targetVertex.setInvokableClass(NoOpInvokable.class);
+
+ targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+ final JobID jobId = new JobID();
+ final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
+
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+ TerminalJobStatusListener testListener = new TerminalJobStatusListener();
+ eg.registerJobStatusListener(testListener);
+
+ //
+ // Create the slots, futures, and the slot provider
+
+ final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
+ final SlotOwner slotOwner = mock(SlotOwner.class);
+
+ final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
+ final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
+
+ for (int i = 0; i < parallelism; i++) {
+ sourceSlots[i] = createSlot(taskManager, jobId, slotOwner);
+ targetSlots[i] = createSlot(taskManager, jobId, slotOwner);
+
+ sourceFutures[i] = new FlinkCompletableFuture<>();
+ targetFutures[i] = new FlinkCompletableFuture<>();
+ }
+
+ ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+ slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
+ slotProvider.addSlots(targetVertex.getID(), targetFutures);
+
+ //
+ // we complete some of the futures
+
+ for (int i = 0; i < parallelism; i += 2) {
+ sourceFutures[i].complete(sourceSlots[i]);
+ targetFutures[i + 1].complete(targetSlots[i + 1]);
+ }
+
+ //
+ // kick off the scheduling
+
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.setQueuedSchedulingAllowed(true);
+ eg.scheduleForExecution(slotProvider);
+
+ // fail one slot
+ sourceFutures[1].completeExceptionally(new TestRuntimeException());
+
+ // wait until the job failed as a whole
+ testListener.waitForTerminalState(2000);
+
+ // wait until all slots are back
+ verify(slotOwner, new Timeout(2000, times(6))).returnAllocatedSlot(any(Slot.class));
+
+ // no deployment calls must have happened
+ verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+ // all completed futures must have been returns
+ for (int i = 0; i < parallelism; i += 2) {
+ assertTrue(sourceSlots[i].isCanceled());
+ assertTrue(targetSlots[i + 1].isCanceled());
+ }
+ }
+
+ /**
+ * This test verifies that the slot allocations times out after a certain time, and that
+ * all slots are released in that case.
+ */
+ @Test
+ public void testTimeoutForSlotAllocation() throws Exception {
+
+ // we construct a simple graph: (task)
+
+ final int parallelism = 3;
+
+ final JobVertex vertex = new JobVertex("task");
+ vertex.setParallelism(parallelism);
+ vertex.setInvokableClass(NoOpInvokable.class);
+
+ final JobID jobId = new JobID();
+ final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
+
+ final ExecutionGraph eg = createExecutionGraph(jobGraph, Time.milliseconds(20));
+ final TerminalJobStatusListener statusListener = new TerminalJobStatusListener();
+ eg.registerJobStatusListener(statusListener);
+
+ final SlotOwner slotOwner = mock(SlotOwner.class);
+
+ final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
+ final SimpleSlot[] slots = new SimpleSlot[parallelism];
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final FlinkCompletableFuture<SimpleSlot>[] slotFutures = new FlinkCompletableFuture[parallelism];
+
+ for (int i = 0; i < parallelism; i++) {
+ slots[i] = createSlot(taskManager, jobId, slotOwner);
+ slotFutures[i] = new FlinkCompletableFuture<>();
+ }
+
+ ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+ slotProvider.addSlots(vertex.getID(), slotFutures);
+
+ // we complete one future
+ slotFutures[1].complete(slots[1]);
+
+ // kick off the scheduling
+
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.setQueuedSchedulingAllowed(true);
+ eg.scheduleForExecution(slotProvider);
+
+ // we complete another future
+ slotFutures[2].complete(slots[2]);
+
+ // since future[0] is still missing the while operation must time out
+ // we have no restarts allowed, so the job will go terminal
+ statusListener.waitForTerminalState(2000);
+
+ // wait until all slots are back
+ verify(slotOwner, new Timeout(2000, times(2))).returnAllocatedSlot(any(Slot.class));
+
+ // verify that no deployments have happened
+ verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+
+ for (Future<SimpleSlot> future : slotFutures) {
+ if (future.isDone()) {
+ assertTrue(future.get().isCanceled());
+ }
+ }
+ }
+
+ /**
+ * Tests that the {@link ExecutionJobVertex#allocateResourcesForAll(SlotProvider, boolean)} method
+ * releases partially acquired resources upon exception.
+ */
+ @Test
+ public void testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception {
+ final int parallelism = 8;
+
+ final JobVertex vertex = new JobVertex("vertex");
+ vertex.setParallelism(parallelism);
+ vertex.setInvokableClass(NoOpInvokable.class);
+
+ final JobID jobId = new JobID();
+ final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
+
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+ final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
+
+ // set up some available slots and some slot owner that accepts released slots back
+ final List<SimpleSlot> returnedSlots = new ArrayList<>();
+ final SlotOwner recycler = new SlotOwner() {
+ @Override
+ public boolean returnAllocatedSlot(Slot slot) {
+ returnedSlots.add((SimpleSlot) slot);
+ return true;
+ }
+ };
+
+ final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
+ final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
+ createSlot(taskManager, jobId, recycler),
+ createSlot(taskManager, jobId, recycler),
+ createSlot(taskManager, jobId, recycler)));
+
+
+ // slot provider that hand out parallelism / 3 slots, then throws an exception
+ final SlotProvider slots = mock(SlotProvider.class);
+
+ when(slots.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
+ new Answer<Future<SimpleSlot>>() {
+
+ @Override
+ public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+ if (availableSlots.isEmpty()) {
+ throw new TestRuntimeException();
+ } else {
+ return FlinkCompletableFuture.completed(availableSlots.remove(0));
+ }
+ }
+ });
+
+ // acquire resources and check that all are back after the failure
+
+ final int numSlotsToExpectBack = availableSlots.size();
+
+ try {
+ ejv.allocateResourcesForAll(slots, false);
+ fail("should have failed with an exception");
+ }
+ catch (TestRuntimeException e) {
+ // expected
+ }
+
+ assertEquals(numSlotsToExpectBack, returnedSlots.size());
+ }
+
+ /**
+ * Tests that the {@link ExecutionGraph#scheduleForExecution(SlotProvider)} method
+ * releases partially acquired resources upon exception.
+ */
+ @Test
+ public void testExecutionGraphScheduleReleasesResourcesOnException() throws Exception {
+
+ // [pipelined]
+ // we construct a simple graph (source) ----------------> (target)
+
+ final int parallelism = 3;
+
+ final JobVertex sourceVertex = new JobVertex("source");
+ sourceVertex.setParallelism(parallelism);
+ sourceVertex.setInvokableClass(NoOpInvokable.class);
+
+ final JobVertex targetVertex = new JobVertex("target");
+ targetVertex.setParallelism(parallelism);
+ targetVertex.setInvokableClass(NoOpInvokable.class);
+
+ targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ final JobID jobId = new JobID();
+ final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
+
+ final ExecutionGraph eg = createExecutionGraph(jobGraph);
+
+ // set up some available slots and some slot owner that accepts released slots back
+ final List<SimpleSlot> returnedSlots = new ArrayList<>();
+ final SlotOwner recycler = new SlotOwner() {
+ @Override
+ public boolean returnAllocatedSlot(Slot slot) {
+ returnedSlots.add((SimpleSlot) slot);
+ return true;
+ }
+ };
+
+ final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
+ final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
+ createSlot(taskManager, jobId, recycler),
+ createSlot(taskManager, jobId, recycler),
+ createSlot(taskManager, jobId, recycler),
+ createSlot(taskManager, jobId, recycler),
+ createSlot(taskManager, jobId, recycler)));
+
+
+ // slot provider that hand out parallelism / 3 slots, then throws an exception
+ final SlotProvider slots = mock(SlotProvider.class);
+
+ when(slots.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
+ new Answer<Future<SimpleSlot>>() {
+
+ @Override
+ public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+ if (availableSlots.isEmpty()) {
+ throw new TestRuntimeException();
+ } else {
+ return FlinkCompletableFuture.completed(availableSlots.remove(0));
+ }
+ }
+ });
+
+ // acquire resources and check that all are back after the failure
+
+ final int numSlotsToExpectBack = availableSlots.size();
+
+ try {
+ eg.setScheduleMode(ScheduleMode.EAGER);
+ eg.scheduleForExecution(slots);
+ fail("should have failed with an exception");
+ }
+ catch (TestRuntimeException e) {
+ // expected
+ }
+
+ assertEquals(numSlotsToExpectBack, returnedSlots.size());
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
+ return createExecutionGraph(jobGraph, Time.minutes(10));
+ }
+
+ private ExecutionGraph createExecutionGraph(JobGraph jobGraph, Time timeout) throws Exception {
+ return ExecutionGraphBuilder.buildGraph(
+ null,
+ jobGraph,
+ new Configuration(),
+ executor,
+ executor,
+ getClass().getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ timeout,
+ new NoRestartStrategy(),
+ new UnregisteredMetricsGroup(),
+ 1,
+ log);
+ }
+
+ private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) {
+ return createSlot(taskManager, jobId, mock(SlotOwner.class));
+ }
+
+ private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId, SlotOwner slotOwner) {
+ TaskManagerLocation location = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
+
+ AllocatedSlot slot = new AllocatedSlot(
+ new AllocationID(), jobId, location, 0, ResourceProfile.UNKNOWN, taskManager);
+
+ return new SimpleSlot(slot, slotOwner, 0);
+ }
+
+ private static TaskManagerGateway createTaskManager() {
+ TaskManagerGateway tm = mock(TaskManagerGateway.class);
+ when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+ return tm;
+ }
+
+ private static void verifyNothingDeployed(ExecutionGraph eg, TaskManagerGateway[] taskManagers) {
+ // job should still be running
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ // none of the TaskManager should have gotten a deployment call, yet
+ for (TaskManagerGateway gateway : taskManagers) {
+ verify(gateway, new Timeout(50, times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+ }
+ }
+
+ private static class TestRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
new file mode 100644
index 0000000..2e6da98
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the utility methods in the class {@link ExecutionGraphUtils}.
+ */
+public class ExecutionGraphUtilsTest {
+
+ @Test
+ public void testReleaseSlots() {
+ final JobID jid = new JobID();
+ final SlotOwner owner = mock(SlotOwner.class);
+
+ final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
+ final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
+ final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
+
+ final FlinkCompletableFuture<SimpleSlot> incompleteFuture = new FlinkCompletableFuture<>();
+
+ final FlinkCompletableFuture<SimpleSlot> completeFuture = new FlinkCompletableFuture<>();
+ completeFuture.complete(slot2);
+
+ final FlinkCompletableFuture<SimpleSlot> disposedSlotFuture = new FlinkCompletableFuture<>();
+ slot3.releaseSlot();
+ disposedSlotFuture.complete(slot3);
+
+ // release all futures
+ ExecutionGraphUtils.releaseSlotFuture(incompleteFuture);
+ ExecutionGraphUtils.releaseSlotFuture(completeFuture);
+ ExecutionGraphUtils.releaseSlotFuture(disposedSlotFuture);
+
+ // only now complete the incomplete future
+ incompleteFuture.complete(slot1);
+
+ // verify that each slot was returned once to the owner
+ verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
+ verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
+ verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
+ }
+
+ @Test
+ public void testReleaseSlotsWithNulls() {
+ final JobID jid = new JobID();
+ final SlotOwner owner = mock(SlotOwner.class);
+
+ final Execution mockExecution = mock(Execution.class);
+
+ final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
+ final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
+ final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
+ final SimpleSlot slot4 = new SimpleSlot(createAllocatedSlot(jid, 3), owner, 3);
+ final SimpleSlot slot5 = new SimpleSlot(createAllocatedSlot(jid, 4), owner, 4);
+
+ ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] {
+ null,
+ new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot1)),
+ null,
+ new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot2)),
+ null
+ };
+
+ ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] {
+ new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot3)),
+ new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot4)),
+ new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot5))
+ };
+
+ List<ExecutionAndSlot[]> resources = Arrays.asList(null, slots1, new ExecutionAndSlot[0], null, slots2);
+
+ ExecutionGraphUtils.releaseAllSlotsSilently(resources);
+
+ verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
+ verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
+ verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
+ verify(owner, times(1)).returnAllocatedSlot(eq(slot4));
+ verify(owner, times(1)).returnAllocatedSlot(eq(slot5));
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static AllocatedSlot createAllocatedSlot(JobID jid, int num) {
+ TaskManagerLocation loc = new TaskManagerLocation(
+ ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + num);
+
+ return new AllocatedSlot(new AllocationID(), jid, loc, num,
+ ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 7b6c6ea..82561b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -457,7 +457,7 @@ public class ExecutionVertexCancelTest {
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
// 1)
- // scheduling after being created should be tolerated (no exception) because
+ // scheduling after being canceled should be tolerated (no exception) because
// it can occur as the result of races
{
Scheduler scheduler = mock(Scheduler.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 9132aee..1b029e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -104,9 +104,6 @@ public class ExecutionVertexSchedulingTest {
future.complete(slot);
- // wait a second for future's future action be executed
- Thread.sleep(1000);
-
// will have failed
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 3a7e759..006f894 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -348,9 +348,9 @@ public class PointwisePatternTest {
timesUsed[inEdges[0].getSource().getPartitionNumber()]++;
}
-
- for (int i = 0; i < timesUsed.length; i++) {
- assertTrue(timesUsed[i] >= factor && timesUsed[i] <= factor + delta);
+
+ for (int used : timesUsed) {
+ assertTrue(used >= factor && used <= factor + delta);
}
}
@@ -406,9 +406,9 @@ public class PointwisePatternTest {
timesUsed[ee.getSource().getPartitionNumber()]++;
}
}
-
- for (int i = 0; i < timesUsed.length; i++) {
- assertEquals(1, timesUsed[i]);
+
+ for (int used : timesUsed) {
+ assertEquals(1, used);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
new file mode 100644
index 0000000..3acb2eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A slot provider where one can pre-set the slot futures for tasks based on
+ * vertex ID and subtask index.
+ */
+class ProgrammedSlotProvider implements SlotProvider {
+
+ private final Map<JobVertexID, Future<SimpleSlot>[]> slotFutures = new HashMap<>();
+
+ private final int parallelism;
+
+ public ProgrammedSlotProvider(int parallelism) {
+ checkArgument(parallelism > 0);
+ this.parallelism = parallelism;
+ }
+
+ public void addSlot(JobVertexID vertex, int subtaskIndex, Future<SimpleSlot> future) {
+ checkNotNull(vertex);
+ checkNotNull(future);
+ checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
+
+ Future<SimpleSlot>[] futures = slotFutures.get(vertex);
+ if (futures == null) {
+ @SuppressWarnings("unchecked")
+ Future<SimpleSlot>[] newArray = (Future<SimpleSlot>[]) new Future<?>[parallelism];
+ futures = newArray;
+ slotFutures.put(vertex, futures);
+ }
+
+ futures[subtaskIndex] = future;
+ }
+
+ public void addSlots(JobVertexID vertex, Future<SimpleSlot>[] futures) {
+ checkNotNull(vertex);
+ checkNotNull(futures);
+ checkArgument(futures.length == parallelism);
+
+ slotFutures.put(vertex, futures);
+ }
+
+ @Override
+ public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+ JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
+ int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
+
+ Future<SimpleSlot>[] forTask = slotFutures.get(vertexId);
+ if (forTask != null) {
+ Future<SimpleSlot> future = forTask[subtask];
+ if (future != null) {
+ return future;
+ }
+ }
+
+ throw new IllegalArgumentException("No registered slot future for task " + vertexId + " (" + subtask + ')');
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
new file mode 100644
index 0000000..c107d54
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalJobStatusListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A job status listener that waits lets one block until the job is in a terminal state.
+ */
+public class TerminalJobStatusListener implements JobStatusListener {
+
+ private final OneShotLatch terminalStateLatch = new OneShotLatch();
+
+ public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
+ terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+ if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
+ terminalStateLatch.trigger();
+ }
+ }
+}