You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 08:07:55 UTC
[3/3] flink git commit: [FLINK-8085] Thin out LogicalSlot interface
[FLINK-8085] Thin out LogicalSlot interface
Remove isCanceled, isReleased method and decouple logical slot from Execution by
introducing a Payload interface which is set for a LogicalSlot. The Payload interface
is implemented by the Execution and allows to fail an implementation and obtaining
a termination future.
Introduce proper Execution#releaseFuture which is completed once the Execution's
assigned resource has been released.
This closes #5087.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7bca9e46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7bca9e46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7bca9e46
Branch: refs/heads/master
Commit: 7bca9e4613ff30ab6a9c11e673a785f3f5c86e69
Parents: bb9c64b
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 15 14:20:27 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 22:37:50 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 83 +++++++++------
.../runtime/executiongraph/ExecutionGraph.java | 31 ++++--
.../runtime/executiongraph/ExecutionVertex.java | 4 +-
.../failover/RestartIndividualStrategy.java | 2 +-
.../apache/flink/runtime/instance/Instance.java | 33 +++---
.../flink/runtime/instance/LogicalSlot.java | 54 +++++++---
.../flink/runtime/instance/SharedSlot.java | 2 +-
.../flink/runtime/instance/SimpleSlot.java | 101 ++++++++++++-------
.../org/apache/flink/runtime/instance/Slot.java | 5 +-
.../apache/flink/runtime/instance/SlotPool.java | 8 +-
.../instance/SlotSharingGroupAssignment.java | 8 +-
.../scheduler/CoLocationConstraint.java | 2 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 10 +-
.../runtime/jobmanager/slots/SlotOwner.java | 10 +-
.../ExecutionGraphRestartTest.java | 12 +--
.../ExecutionGraphSchedulingTest.java | 24 ++++-
.../ExecutionGraphSuspendTest.java | 2 +-
.../runtime/executiongraph/ExecutionTest.java | 75 ++++++++++++--
.../ExecutionVertexCancelTest.java | 2 +-
.../ExecutionVertexSchedulingTest.java | 4 +-
.../executiongraph/GlobalModVersionTest.java | 2 +-
.../IndividualRestartsConcurrencyTest.java | 2 +-
.../PipelinedRegionFailoverConcurrencyTest.java | 2 +-
.../utils/SimpleSlotProvider.java | 4 +-
.../flink/runtime/instance/InstanceTest.java | 16 +--
.../flink/runtime/instance/SharedSlotsTest.java | 50 ++++-----
.../flink/runtime/instance/SimpleSlotTest.java | 66 ++++--------
.../flink/runtime/instance/SlotPoolTest.java | 6 +-
.../runtime/instance/TestingLogicalSlot.java | 30 +++---
.../flink/runtime/instance/TestingPayload.java | 44 ++++++++
.../scheduler/CoLocationConstraintTest.java | 2 +-
.../scheduler/SchedulerIsolatedTasksTest.java | 9 +-
.../jobmanager/slots/TestingSlotOwner.java | 47 +++++++++
.../runtime/testingUtils/TestingUtils.scala | 5 +-
34 files changed, 497 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 00a452d..12a6749 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -93,7 +93,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
* actions if it is not. Many actions are also idempotent (like canceling).
*/
-public class Execution implements AccessExecution, Archiveable<ArchivedExecution> {
+public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
@@ -135,7 +135,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
/** A future that completes once the Execution reaches a terminal ExecutionState */
- private final CompletableFuture<ExecutionState> terminationFuture;
+ private final CompletableFuture<ExecutionState> terminalStateFuture;
+
+ private final CompletableFuture<?> releaseFuture;
private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
@@ -197,7 +199,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
markTimestamp(ExecutionState.CREATED, startTimestamp);
this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
- this.terminationFuture = new CompletableFuture<>();
+ this.terminalStateFuture = new CompletableFuture<>();
+ this.releaseFuture = new CompletableFuture<>();
this.taskManagerLocationFuture = new CompletableFuture<>();
this.assignedResource = null;
@@ -329,10 +332,21 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* Gets a future that completes once the task execution reaches a terminal state.
* The future will be completed with specific state that the execution reached.
*
- * @return A future for the execution's termination
+ * @return A future which is completed once the execution reaches a terminal state
+ */
+ @Override
+ public CompletableFuture<ExecutionState> getTerminalStateFuture() {
+ return terminalStateFuture;
+ }
+
+ /**
+ * Gets the release future which is completed once the execution reaches a terminal
+ * state and the assigned resource has been released.
+ *
+ * @return A future which is completed once the assigned resource has been released
*/
- public CompletableFuture<ExecutionState> getTerminationFuture() {
- return terminationFuture;
+ public CompletableFuture<?> getReleaseFuture() {
+ return releaseFuture;
}
// --------------------------------------------------------------------------------------------
@@ -493,7 +507,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
try {
// good, we are allowed to deploy
- if (!slot.setExecution(this)) {
+ if (!slot.tryAssignPayload(this)) {
throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
}
@@ -608,15 +622,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
try {
vertex.getExecutionGraph().deregisterExecution(this);
- final LogicalSlot slot = assignedResource;
-
- if (slot != null) {
- slot.releaseSlot();
- }
+ releaseAssignedResource();
}
finally {
vertex.executionCanceled(this);
- terminationFuture.complete(CANCELED);
}
return;
}
@@ -757,6 +766,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
*
* @param t The exception that caused the task to fail.
*/
+ @Override
public void fail(Throwable t) {
processFail(t, false);
}
@@ -880,17 +890,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics);
- final LogicalSlot slot = assignedResource;
-
- if (slot != null) {
- slot.releaseSlot();
- }
+ releaseAssignedResource();
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
vertex.executionFinished(this);
- terminationFuture.complete(FINISHED);
}
return;
}
@@ -938,17 +943,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (transitionState(current, CANCELED)) {
try {
- final LogicalSlot slot = assignedResource;
-
- if (slot != null) {
- slot.releaseSlot();
- }
+ releaseAssignedResource();
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
vertex.executionCanceled(this);
- terminationFuture.complete(CANCELED);
}
return;
}
@@ -1035,15 +1035,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics);
try {
- final LogicalSlot slot = assignedResource;
- if (slot != null) {
- slot.releaseSlot();
- }
+ releaseAssignedResource();
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
vertex.executionFailed(this, t);
- terminationFuture.complete(FAILED);
}
if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
@@ -1177,6 +1173,28 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
}
+ /**
+ * Releases the assigned resource and completes the release future
+ * once the assigned resource has been successfully released
+ */
+ private void releaseAssignedResource() {
+ final LogicalSlot slot = assignedResource;
+
+ if (slot != null) {
+ slot.releaseSlot().whenComplete(
+ (Object ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ releaseFuture.completeExceptionally(throwable);
+ } else {
+ releaseFuture.complete(null);
+ }
+ });
+ } else {
+ // no assigned resource --> we can directly complete the release future
+ releaseFuture.complete(null);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
@@ -1240,6 +1258,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState, error);
}
+ if (targetState.isTerminal()) {
+ // complete the terminal state future
+ terminalStateFuture.complete(targetState);
+ }
+
// make sure that the state transition completes normally.
// potential errors (in listeners may not affect the main logic)
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 002f9a0..c4ff6fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -965,11 +965,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// we build a future that is complete once all vertices have reached a terminal state
final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
- allTerminal.thenAccept(
- (Void value) -> {
- // cancellations may currently be overridden by failures which trigger
- // restarts, so we need to pass a proper restart global version here
- allVerticesInTerminalState(globalVersionForRestart);
+ allTerminal.whenComplete(
+ (Void value, Throwable throwable) -> {
+ if (throwable != null) {
+ transitionState(
+ JobStatus.CANCELLING,
+ JobStatus.FAILED,
+ new FlinkException(
+ "Could not cancel job " + getJobName() + " because not all execution job vertices could be cancelled.",
+ throwable));
+ } else {
+ // cancellations may currently be overridden by failures which trigger
+ // restarts, so we need to pass a proper restart global version here
+ allVerticesInTerminalState(globalVersionForRestart);
+ }
}
);
@@ -1125,7 +1134,17 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
- allTerminal.thenAccept((Void value) -> allVerticesInTerminalState(globalVersionForRestart));
+ allTerminal.whenComplete(
+ (Void ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ transitionState(
+ JobStatus.FAILING,
+ JobStatus.FAILED,
+ new FlinkException("Could not cancel all execution job vertices properly.", throwable));
+ } else {
+ allVerticesInTerminalState(globalVersionForRestart);
+ }
+ });
return;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c2c986f..27f2d5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -631,12 +631,12 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
*
* @return A future that completes once the execution has reached its final state.
*/
- public CompletableFuture<ExecutionState> cancel() {
+ public CompletableFuture<?> cancel() {
// to avoid any case of mixup in the presence of concurrent calls,
// we copy a reference to the stack to make sure both calls go to the same Execution
final Execution exec = this.currentExecution;
exec.cancel();
- return exec.getTerminationFuture();
+ return exec.getReleaseFuture();
}
public void stop() {
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
index 80f1d2f..f133d34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
@@ -106,7 +106,7 @@ public class RestartIndividualStrategy extends FailoverStrategy {
// Note: currently all tasks passed here are already in their terminal state,
// so we could actually avoid the future. We use it anyways because it is cheap and
// it helps to support better testing
- final CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
+ final CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminalStateFuture();
final ExecutionVertex vertexToRecover = taskExecution.getVertex();
final long globalModVersion = taskExecution.getGlobalModVersion();
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index b5c6f23..d099f6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -18,26 +18,27 @@
package org.apache.flink.runtime.instance;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
@@ -164,7 +165,7 @@ public class Instance implements SlotOwner {
* the instance lock
*/
for (Slot slot : slots) {
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
}
}
@@ -285,10 +286,10 @@ public class Instance implements SlotOwner {
* "released", this method will do nothing.</p>
*
* @param slot The slot to return.
- * @return True, if the slot was returned, false if not.
+ * @return Future which is completed with true, if the slot was returned, false if not.
*/
@Override
- public boolean returnAllocatedSlot(Slot slot) {
+ public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
checkNotNull(slot);
checkArgument(!slot.isAlive(), "slot is still alive");
checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");
@@ -297,7 +298,7 @@ public class Instance implements SlotOwner {
LOG.debug("Return allocated slot {}.", slot);
synchronized (instanceLock) {
if (isDead) {
- return false;
+ return CompletableFuture.completedFuture(false);
}
if (this.allocatedSlots.remove(slot)) {
@@ -307,7 +308,7 @@ public class Instance implements SlotOwner {
this.slotAvailabilityListener.newSlotAvailable(this);
}
- return true;
+ return CompletableFuture.completedFuture(true);
}
else {
throw new IllegalArgumentException("Slot was not allocated from this TaskManager.");
@@ -315,7 +316,7 @@ public class Instance implements SlotOwner {
}
}
else {
- return false;
+ return CompletableFuture.completedFuture(false);
}
}
@@ -327,7 +328,7 @@ public class Instance implements SlotOwner {
}
for (Slot slot : copy) {
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
index 3ebe107..e663265 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
@@ -19,10 +19,13 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
/**
* A logical slot represents a resource on a TaskManager into
* which a single task can be deployed.
@@ -51,31 +54,29 @@ public interface LogicalSlot {
boolean isAlive();
/**
- * True if the slot is canceled.
- *
- * @return True if the slot is canceled, otherwise false
- */
- boolean isCanceled();
-
- /**
- * True if the slot is released.
+ * Tries to assign a payload to this slot. This can only happens
+ * exactly once.
*
- * @return True if the slot is released, otherwise false
+ * @param payload to be assigned to this slot.
+ * @return true if the payload could be set, otherwise false
*/
- boolean isReleased();
+ boolean tryAssignPayload(Payload payload);
/**
- * Sets the execution for this slot.
+ * Returns the set payload or null if none.
*
- * @param execution to set for this slot
- * @return true if the slot could be set, otherwise false
+ * @return Payload of this slot of null if none
*/
- boolean setExecution(Execution execution);
+ @Nullable
+ Payload getPayload();
/**
* Releases this slot.
+ *
+ * @return Future which is completed once the slot has been released,
+ * in case of a failure it is completed exceptionally
*/
- void releaseSlot();
+ CompletableFuture<?> releaseSlot();
/**
* Gets the slot number on the TaskManager.
@@ -90,4 +91,25 @@ public interface LogicalSlot {
* @return allocation id of this slot
*/
AllocationID getAllocationId();
+
+ /**
+ * Payload for a logical slot.
+ */
+ interface Payload {
+
+ /**
+ * Fail the payload with the given cause.
+ *
+ * @param cause of the failure
+ */
+ void fail(Throwable cause);
+
+ /**
+ * Gets the terminal state future which is completed once the payload
+ * has reached a terminal state.
+ *
+ * @return Terminal state future
+ */
+ CompletableFuture<?> getTerminalStateFuture();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 106a8ef..2ce4fc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -188,7 +188,7 @@ public class SharedSlot extends Slot {
}
@Override
- public void releaseSlot() {
+ public void releaseInstanceSlot() {
assignmentGroup.releaseSharedSlot(this);
if (!(isReleased() && subSlots.isEmpty())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 9591028..0c9e11c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -18,18 +18,21 @@
package org.apache.flink.runtime.instance;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
@@ -40,14 +43,16 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
*/
public class SimpleSlot extends Slot implements LogicalSlot {
- /** The updater used to atomically swap in the execution */
- private static final AtomicReferenceFieldUpdater<SimpleSlot, Execution> VERTEX_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Execution.class, "executedTask");
+ /** The updater used to atomically swap in the payload */
+ private static final AtomicReferenceFieldUpdater<SimpleSlot, Payload> PAYLOAD_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(SimpleSlot.class, Payload.class, "payload");
// ------------------------------------------------------------------------
- /** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
- private volatile Execution executedTask;
+ private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+
+ /** Id of the task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
+ private volatile Payload payload;
/** The locality attached to the slot, defining whether the slot was allocated at the desired location. */
private volatile Locality locality = Locality.UNCONSTRAINED;
@@ -149,26 +154,14 @@ public class SimpleSlot extends Slot implements LogicalSlot {
}
/**
- * Gets the task execution attempt currently executed in this slot. This may return null, if no
- * task execution attempt has been placed into this slot.
- *
- * @return The slot's task execution attempt, or null, if no task is executed in this slot, yet.
- */
- public Execution getExecutedVertex() {
- return executedTask;
- }
-
- /**
* Atomically sets the executed vertex, if no vertex has been assigned to this slot so far.
*
- * @param executedVertex The vertex to assign to this slot.
+ * @param payload The vertex to assign to this slot.
* @return True, if the vertex was assigned, false, otherwise.
*/
@Override
- public boolean setExecution(Execution executedVertex) {
- if (executedVertex == null) {
- throw new NullPointerException();
- }
+ public boolean tryAssignPayload(Payload payload) {
+ Preconditions.checkNotNull(payload);
// check that we can actually run in this slot
if (isCanceled()) {
@@ -176,19 +169,25 @@ public class SimpleSlot extends Slot implements LogicalSlot {
}
// atomically assign the vertex
- if (!VERTEX_UPDATER.compareAndSet(this, null, executedVertex)) {
+ if (!PAYLOAD_UPDATER.compareAndSet(this, null, payload)) {
return false;
}
// we need to do a double check that we were not cancelled in the meantime
if (isCanceled()) {
- this.executedTask = null;
+ this.payload = null;
return false;
}
return true;
}
+ @Nullable
+ @Override
+ public Payload getPayload() {
+ return payload;
+ }
+
/**
* Gets the locality information attached to this slot.
* @return The locality attached to the slot.
@@ -210,27 +209,51 @@ public class SimpleSlot extends Slot implements LogicalSlot {
// ------------------------------------------------------------------------
@Override
- public void releaseSlot() {
+ public void releaseInstanceSlot() {
+ releaseSlot();
+ }
+
+ @Override
+ public CompletableFuture<?> releaseSlot() {
if (!isCanceled()) {
+ final CompletableFuture<?> terminationFuture;
- // kill all tasks currently running in this slot
- Execution exec = this.executedTask;
- if (exec != null && !exec.isFinished()) {
- exec.fail(new Exception("TaskManager was lost/killed: " + getTaskManagerLocation()));
- }
+ if (payload != null) {
+ // trigger the failure of the slot payload
+ payload.fail(new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation()));
- // release directly (if we are directly allocated),
- // otherwise release through the parent shared slot
- if (getParent() == null) {
- // we have to give back the slot to the owning instance
- if (markCancelled()) {
- getOwner().returnAllocatedSlot(this);
- }
+ // wait for the termination of the payload before releasing the slot
+ terminationFuture = payload.getTerminalStateFuture();
} else {
- // we have to ask our parent to dispose us
- getParent().releaseChild(this);
+ terminationFuture = CompletableFuture.completedFuture(null);
}
+
+ terminationFuture.whenComplete(
+ (Object ignored, Throwable throwable) -> {
+ // release directly (if we are directly allocated),
+ // otherwise release through the parent shared slot
+ if (getParent() == null) {
+ // we have to give back the slot to the owning instance
+ if (markCancelled()) {
+ getOwner().returnAllocatedSlot(this).whenComplete(
+ (value, t) -> {
+ if (t != null) {
+ releaseFuture.completeExceptionally(t);
+ } else {
+ releaseFuture.complete(null);
+ }
+ });
+ }
+ } else {
+ // we have to ask our parent to dispose us
+ getParent().releaseChild(this);
+
+ releaseFuture.complete(null);
+ }
+ });
}
+
+ return releaseFuture;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index d6d8f12..804682b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.instance;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -26,9 +27,9 @@ import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
-import org.apache.flink.api.common.JobID;
import javax.annotation.Nullable;
+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -343,7 +344,7 @@ public abstract class Slot {
* If this slot is a simple slot, it will be returned to its instance. If it is a shared slot,
* it will release all of its sub-slots and release itself.
*/
- public abstract void releaseSlot();
+ public abstract void releaseInstanceSlot();
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 66af865..771d690 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -643,7 +643,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
if (slot != null) {
// release the slot.
// since it is not in 'allocatedSlots' any more, it will be dropped o return'
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
}
else {
LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
@@ -683,7 +683,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
final Set<Slot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
for (Slot slot : allocatedSlotsForResource) {
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
}
}
@@ -1081,9 +1081,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
@Override
- public boolean returnAllocatedSlot(Slot slot) {
+ public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
gateway.returnAllocatedSlot(slot);
- return true;
+ return CompletableFuture.completedFuture(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 4371290..45b4a96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -215,7 +215,7 @@ public class SlotSharingGroupAssignment {
// note that this does implicitly release the slot we have just added
// as well, because we release its last child slot. That is expected
// and desired.
- constraintGroupSlot.releaseSlot();
+ constraintGroupSlot.releaseInstanceSlot();
}
}
else {
@@ -507,7 +507,7 @@ public class SlotSharingGroupAssignment {
}
/**
- * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseSlot()}.
+ * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseInstanceSlot()}.
*
* @param sharedSlot The slot to be released.
*/
@@ -520,7 +520,7 @@ public class SlotSharingGroupAssignment {
// by simply releasing all children, we should eventually release this slot.
Set<Slot> children = sharedSlot.getSubSlots();
while (children.size() > 0) {
- children.iterator().next().releaseSlot();
+ children.iterator().next().releaseInstanceSlot();
}
}
else {
@@ -551,7 +551,7 @@ public class SlotSharingGroupAssignment {
if (parent == null) {
// root slot, return to the instance.
sharedSlot.getOwner().returnAllocatedSlot(sharedSlot);
-
+
// also, make sure we remove this slot from everywhere
allSlots.remove(sharedSlot);
removeSlotFromAllEntries(availableSlotsPerJid, sharedSlot);
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index c41f7bf..ffc1a7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -141,7 +141,7 @@ public class CoLocationConstraint {
"Cannot assign different location to a constraint whose location is locked.");
}
if (this.sharedSlot.isAlive()) {
- this.sharedSlot.releaseSlot();
+ this.sharedSlot.releaseInstanceSlot();
}
this.sharedSlot = newSlot;
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 2715146..8857be7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -274,7 +274,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
// if there is no slot from the group, or the new slot is local,
// then we use the new slot
if (slotFromGroup != null) {
- slotFromGroup.releaseSlot();
+ slotFromGroup.releaseInstanceSlot();
}
toUse = newSlot;
}
@@ -282,7 +282,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
// both are available and usable. neither is local. in that case, we may
// as well use the slot from the sharing group, to minimize the number of
// instances that the job occupies
- newSlot.releaseSlot();
+ newSlot.releaseInstanceSlot();
toUse = slotFromGroup;
}
@@ -299,10 +299,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
catch (Throwable t) {
if (slotFromGroup != null) {
- slotFromGroup.releaseSlot();
+ slotFromGroup.releaseInstanceSlot();
}
if (newSlot != null) {
- newSlot.releaseSlot();
+ newSlot.releaseInstanceSlot();
}
ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");
@@ -444,7 +444,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
else {
// could not add and allocate the sub-slot, so release shared slot
- sharedSlot.releaseSlot();
+ sharedSlot.releaseInstanceSlot();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
index ad9c784..cb4488d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
@@ -20,10 +20,18 @@ package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.runtime.instance.Slot;
+import java.util.concurrent.CompletableFuture;
+
/**
* Interface for components that hold slots and to which slots get released / recycled.
*/
public interface SlotOwner {
- boolean returnAllocatedSlot(Slot slot);
+ /**
+ * Return the given slot to the slot owner.
+ *
+ * @param slot to return
+ * @return Future which is completed with true if the slot could be returned, otherwise with false
+ */
+ CompletableFuture<Boolean> returnAllocatedSlot(Slot slot);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 8770b06..80df852 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -31,10 +31,10 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
@@ -64,9 +64,6 @@ import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Test;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
@@ -79,6 +76,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
@@ -90,10 +90,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import static org.mockito.Mockito.spy;
-
public class ExecutionGraphRestartTest extends TestLogger {
private final static int NUM_TASKS = 31;
@@ -473,7 +471,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
v.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
}
- assertEquals(JobStatus.CANCELED, eg.getState());
+ assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 71ca3a5..2e6558a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -52,12 +53,15 @@ import org.junit.Test;
import org.mockito.verification.Timeout;
import java.net.InetAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
@@ -278,7 +282,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
// Create the slots, futures, and the slot provider
final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
- final SlotOwner slotOwner = mock(SlotOwner.class);
+ final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
+ final TestingSlotOwner slotOwner = new TestingSlotOwner();
+ slotOwner.setReturnAllocatedSlotConsumer(
+ (Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@@ -324,7 +331,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
// wait until all slots are back
- verify(slotOwner, new Timeout(2000, times(6))).returnAllocatedSlot(any(Slot.class));
+ for (int i = 0; i < parallelism; i++) {
+ returnedSlots.poll(2000L, TimeUnit.MILLISECONDS);
+ }
// no deployment calls must have happened
verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
@@ -354,7 +363,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
- final SlotOwner slotOwner = mock(SlotOwner.class);
+ final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(2);
+ final TestingSlotOwner slotOwner = new TestingSlotOwner();
+ slotOwner.setReturnAllocatedSlotConsumer(
+ (Slot slot) -> returnedSlots.offer(slot.getAllocatedSlot().getSlotAllocationId()));
final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
final SimpleSlot[] slots = new SimpleSlot[parallelism];
@@ -388,14 +400,16 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
eg.getTerminationFuture().get(2000, TimeUnit.MILLISECONDS);
// wait until all slots are back
- verify(slotOwner, new Timeout(2000, times(2))).returnAllocatedSlot(any(Slot.class));
+ for (int i = 0; i < parallelism - 1; i++) {
+ returnedSlots.poll(2000, TimeUnit.MILLISECONDS);
+ }
// verify that no deployments have happened
verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
for (CompletableFuture<LogicalSlot> future : slotFutures) {
if (future.isDone()) {
- assertTrue(future.get().isCanceled());
+ assertFalse(future.get().isAlive());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index f0adc32..65a52bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -219,7 +219,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
validateCancelRpcCalls(gateway, parallelism);
ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
- assertEquals(JobStatus.CANCELED, eg.getState());
+ assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
// suspend
eg.suspend(new Exception("suspend"));
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 43a6432..c6fb836 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
@@ -77,7 +78,7 @@ public class ExecutionTest extends TestLogger {
final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
- final TestingSlotOwner slotOwner = new TestingSlotOwner();
+ final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
new JobID(),
@@ -117,7 +118,7 @@ public class ExecutionTest extends TestLogger {
final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
jobVertex.setInvokableClass(NoOpInvokable.class);
- final TestingSlotOwner slotOwner = new TestingSlotOwner();
+ final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
new JobID(),
@@ -167,7 +168,7 @@ public class ExecutionTest extends TestLogger {
final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
jobVertex.setInvokableClass(NoOpInvokable.class);
- final TestingSlotOwner slotOwner = new TestingSlotOwner();
+ final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
final SimpleSlot slot = new SimpleSlot(
new JobID(),
@@ -269,9 +270,71 @@ public class ExecutionTest extends TestLogger {
}
/**
+ * Checks that the {@link Execution} termination future is only completed after the
+ * assigned slot has been released.
+ *
+ * <p>NOTE: This test only fails spuriously without the fix of this commit. Thus, one has
+ * to execute this test multiple times to see the failure.
+ */
+ @Test
+ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
+ final JobVertexID jobVertexId = new JobVertexID();
+ final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
+
+ final SimpleSlot slot = new SimpleSlot(
+ new JobID(),
+ slotOwner,
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway());
+
+ final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
+ slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+
+ ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+ new JobID(),
+ slotProvider,
+ new NoRestartStrategy(),
+ jobVertex);
+
+ ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+ ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+
+ assertTrue(executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY));
+
+ Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
+
+ CompletableFuture<Slot> returnedSlotFuture = slotOwner.getReturnedSlotFuture();
+ CompletableFuture<?> terminationFuture = executionVertex.cancel();
+
+ // run canceling in a separate thread to allow an interleaving between termination
+ // future callback registrations
+ CompletableFuture.runAsync(
+ () -> currentExecutionAttempt.cancelingComplete(),
+ TestingUtils.defaultExecutor());
+
+ // to increase probability for problematic interleaving, let the current thread yield the processor
+ Thread.yield();
+
+ CompletableFuture<Boolean> restartFuture = terminationFuture.thenApply(
+ ignored -> {
+ assertTrue(returnedSlotFuture.isDone());
+ return true;
+ });
+
+
+ // check if the returned slot future was completed first
+ restartFuture.get();
+ }
+
+ /**
* Slot owner which records the first returned slot.
*/
- public static final class TestingSlotOwner implements SlotOwner {
+ private static final class SingleSlotTestingSlotOwner implements SlotOwner {
final CompletableFuture<Slot> returnedSlot = new CompletableFuture<>();
@@ -280,8 +343,8 @@ public class ExecutionTest extends TestLogger {
}
@Override
- public boolean returnAllocatedSlot(Slot slot) {
- return returnedSlot.complete(slot);
+ public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+ return CompletableFuture.completedFuture(returnedSlot.complete(slot));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index cb31d15..1b8daca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -423,7 +423,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
exec.markFailed(new Exception("test"));
assertTrue(exec.getState() == ExecutionState.FAILED || exec.getState() == ExecutionState.CANCELED);
- assertTrue(exec.getAssignedResource().isCanceled());
+ assertFalse(exec.getAssignedResource().isAlive());
assertEquals(vertices.length - 1, exec.getVertex().getExecutionGraph().getRegisteredExecutions().size());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 27f7f51..2941739 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -59,7 +59,7 @@ public class ExecutionVertexSchedulingTest {
final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
assertTrue(slot.isReleased());
Scheduler scheduler = mock(Scheduler.class);
@@ -91,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
assertTrue(slot.isReleased());
final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index 534c33d..bfad327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -88,7 +88,7 @@ public class GlobalModVersionTest extends TestLogger {
exec.cancelingComplete();
}
- assertEquals(JobStatus.CANCELED, graph.getState());
+ assertEquals(JobStatus.CANCELED, graph.getTerminationFuture().get());
// no failure notification at all
verify(mockStrategy, times(0)).onTaskFailure(any(Execution.class), any(Throwable.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index c977503..85a6c2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -139,7 +139,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
// now report that cancelling is complete for the other vertex
vertex2.getCurrentExecutionAttempt().cancelingComplete();
- assertEquals(JobStatus.CANCELED, graph.getState());
+ assertEquals(JobStatus.CANCELED, graph.getTerminationFuture().get());
assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index 124a5b7..656c372 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -113,7 +113,7 @@ public class PipelinedRegionFailoverConcurrencyTest extends TestLogger {
// now report that cancelling is complete for the other vertex
vertex2.getCurrentExecutionAttempt().cancelingComplete();
- assertEquals(JobStatus.CANCELED, graph.getState());
+ assertEquals(JobStatus.CANCELED, graph.getTerminationFuture().get());
assertTrue(vertex1.getCurrentExecutionAttempt().getState().isTerminal());
assertTrue(vertex2.getCurrentExecutionAttempt().getState().isTerminal());
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 3d28983..14e0e66 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -96,11 +96,11 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
}
@Override
- public boolean returnAllocatedSlot(Slot slot) {
+ public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
synchronized (slots) {
slots.add(slot.getAllocatedSlot());
}
- return true;
+ return CompletableFuture.completedFuture(true);
}
public int getNumberOfAvailableSlots() {
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 2bea039..5b85f72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -79,18 +79,18 @@ public class InstanceTest {
}
// release the slots. this returns them to the instance
- slot1.releaseSlot();
- slot2.releaseSlot();
- slot3.releaseSlot();
- slot4.releaseSlot();
+ slot1.releaseInstanceSlot();
+ slot2.releaseInstanceSlot();
+ slot3.releaseInstanceSlot();
+ slot4.releaseInstanceSlot();
assertEquals(4, instance.getNumberOfAvailableSlots());
assertEquals(0, instance.getNumberOfAllocatedSlots());
- assertFalse(instance.returnAllocatedSlot(slot1));
- assertFalse(instance.returnAllocatedSlot(slot2));
- assertFalse(instance.returnAllocatedSlot(slot3));
- assertFalse(instance.returnAllocatedSlot(slot4));
+ assertFalse(instance.returnAllocatedSlot(slot1).get());
+ assertFalse(instance.returnAllocatedSlot(slot2).get());
+ assertFalse(instance.returnAllocatedSlot(slot3).get());
+ assertFalse(instance.returnAllocatedSlot(slot4).get());
assertEquals(4, instance.getNumberOfAvailableSlots());
assertEquals(0, instance.getNumberOfAllocatedSlots());
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 4a6bf75..1e2b6af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -80,7 +80,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(0, slot.getRootSlotNumber());
// release the slot immediately.
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
assertTrue(slot.isCanceled());
assertTrue(slot.isReleased());
@@ -129,7 +129,7 @@ public class SharedSlotsTest extends TestLogger {
SimpleSlot sub1 = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.LOCAL, vid1);
assertNotNull(sub1);
- assertNull(sub1.getExecutedVertex());
+ assertNull(sub1.getPayload());
assertEquals(Locality.LOCAL, sub1.getLocality());
assertEquals(1, sub1.getNumberLeaves());
assertEquals(vid1, sub1.getGroupID());
@@ -148,7 +148,7 @@ public class SharedSlotsTest extends TestLogger {
SimpleSlot sub2 = assignment.getSlotForTask(vid2, NO_LOCATION);
assertNotNull(sub2);
- assertNull(sub2.getExecutedVertex());
+ assertNull(sub2.getPayload());
assertEquals(Locality.UNCONSTRAINED, sub2.getLocality());
assertEquals(1, sub2.getNumberLeaves());
assertEquals(vid2, sub2.getGroupID());
@@ -167,7 +167,7 @@ public class SharedSlotsTest extends TestLogger {
SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(instance.getTaskManagerLocation()));
assertNotNull(sub3);
- assertNull(sub3.getExecutedVertex());
+ assertNull(sub3.getPayload());
assertEquals(Locality.LOCAL, sub3.getLocality());
assertEquals(1, sub3.getNumberLeaves());
assertEquals(vid3, sub3.getGroupID());
@@ -187,7 +187,7 @@ public class SharedSlotsTest extends TestLogger {
Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getTaskManagerLocation()));
assertNotNull(sub4);
- assertNull(sub4.getExecutedVertex());
+ assertNull(sub4.getPayload());
assertEquals(Locality.NON_LOCAL, sub4.getLocality());
assertEquals(1, sub4.getNumberLeaves());
assertEquals(vid4, sub4.getGroupID());
@@ -204,7 +204,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4));
// release from the root.
- sharedSlot.releaseSlot();
+ sharedSlot.releaseInstanceSlot();
assertTrue(sharedSlot.isReleased());
assertTrue(sub1.isReleased());
@@ -264,7 +264,7 @@ public class SharedSlotsTest extends TestLogger {
// release from the leaves.
- sub2.releaseSlot();
+ sub2.releaseInstanceSlot();
assertTrue(sharedSlot.isAlive());
assertTrue(sub1.isAlive());
@@ -279,7 +279,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(2, sharedSlot.getNumberLeaves());
- sub1.releaseSlot();
+ sub1.releaseInstanceSlot();
assertTrue(sharedSlot.isAlive());
assertTrue(sub1.isReleased());
@@ -293,7 +293,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, sharedSlot.getNumberLeaves());
- sub3.releaseSlot();
+ sub3.releaseInstanceSlot();
assertTrue(sharedSlot.isReleased());
assertTrue(sub1.isReleased());
@@ -348,7 +348,7 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(1, assignment.getNumberOfSlots());
- sub2.releaseSlot();
+ sub2.releaseInstanceSlot();
assertEquals(1, sharedSlot.getNumberLeaves());
assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
@@ -366,8 +366,8 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
assertEquals(1, assignment.getNumberOfSlots());
- sub3.releaseSlot();
- sub1.releaseSlot();
+ sub3.releaseInstanceSlot();
+ sub1.releaseInstanceSlot();
assertTrue(sharedSlot.isReleased());
assertEquals(0, sharedSlot.getNumberLeaves());
@@ -443,7 +443,7 @@ public class SharedSlotsTest extends TestLogger {
assertFalse(constraint.isAssigned());
// we do not immediately lock the location
- headSlot.releaseSlot();
+ headSlot.releaseInstanceSlot();
assertEquals(1, sharedSlot.getNumberLeaves());
assertNotNull(constraint.getSharedSlot());
@@ -468,8 +468,8 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(4, sharedSlot.getNumberLeaves());
// we release our co-location constraint tasks
- headSlot.releaseSlot();
- tailSlot.releaseSlot();
+ headSlot.releaseInstanceSlot();
+ tailSlot.releaseInstanceSlot();
assertEquals(2, sharedSlot.getNumberLeaves());
assertTrue(headSlot.isReleased());
@@ -501,10 +501,10 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(constraint.getGroupId(), constraint.getSharedSlot().getGroupID());
// release all
- sourceSlot.releaseSlot();
- headSlot.releaseSlot();
- tailSlot.releaseSlot();
- sinkSlot.releaseSlot();
+ sourceSlot.releaseInstanceSlot();
+ headSlot.releaseInstanceSlot();
+ tailSlot.releaseInstanceSlot();
+ sinkSlot.releaseInstanceSlot();
assertTrue(sharedSlot.isReleased());
assertTrue(sourceSlot.isReleased());
@@ -577,10 +577,10 @@ public class SharedSlotsTest extends TestLogger {
assertEquals(4, sharedSlot.getNumberLeaves());
// release all
- sourceSlot.releaseSlot();
- headSlot.releaseSlot();
- tailSlot.releaseSlot();
- sinkSlot.releaseSlot();
+ sourceSlot.releaseInstanceSlot();
+ headSlot.releaseInstanceSlot();
+ tailSlot.releaseInstanceSlot();
+ sinkSlot.releaseInstanceSlot();
assertTrue(sharedSlot.isReleased());
assertTrue(sourceSlot.isReleased());
@@ -618,7 +618,7 @@ public class SharedSlotsTest extends TestLogger {
SharedSlot sharedSlot = instance.allocateSharedSlot(jobId, assignment);
SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
- sub.releaseSlot();
+ sub.releaseInstanceSlot();
assertTrue(sub.isReleased());
assertTrue(sharedSlot.isReleased());
@@ -654,7 +654,7 @@ public class SharedSlotsTest extends TestLogger {
assertNull(sub.getGroupID());
assertEquals(constraint.getSharedSlot(), sub.getParent());
- sub.releaseSlot();
+ sub.releaseInstanceSlot();
assertTrue(sub.isReleased());
assertTrue(sharedSlot.isReleased());
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index db71210..42cbbbf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -18,22 +18,21 @@
package org.apache.flink.runtime.instance;
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-import java.net.InetAddress;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.api.common.JobID;
-
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import org.mockito.Matchers;
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class SimpleSlotTest extends TestLogger {
@@ -45,7 +44,7 @@ public class SimpleSlotTest extends TestLogger {
SimpleSlot slot = getSlot();
assertTrue(slot.isAlive());
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
assertFalse(slot.isAlive());
assertTrue(slot.isCanceled());
assertTrue(slot.isReleased());
@@ -76,19 +75,19 @@ public class SimpleSlotTest extends TestLogger {
@Test
public void testSetExecutionVertex() {
try {
- Execution ev = mock(Execution.class);
- Execution ev_2 = mock(Execution.class);
+ TestingPayload payload1 = new TestingPayload();
+ TestingPayload payload2 = new TestingPayload();
// assign to alive slot
{
SimpleSlot slot = getSlot();
- assertTrue(slot.setExecution(ev));
- assertEquals(ev, slot.getExecutedVertex());
+ assertTrue(slot.tryAssignPayload(payload1));
+ assertEquals(payload1, slot.getPayload());
// try to add another one
- assertFalse(slot.setExecution(ev_2));
- assertEquals(ev, slot.getExecutedVertex());
+ assertFalse(slot.tryAssignPayload(payload2));
+ assertEquals(payload1, slot.getPayload());
}
// assign to canceled slot
@@ -96,8 +95,8 @@ public class SimpleSlotTest extends TestLogger {
SimpleSlot slot = getSlot();
assertTrue(slot.markCancelled());
- assertFalse(slot.setExecution(ev));
- assertNull(slot.getExecutedVertex());
+ assertFalse(slot.tryAssignPayload(payload1));
+ assertNull(slot.getPayload());
}
// assign to released marked slot
@@ -106,17 +105,17 @@ public class SimpleSlotTest extends TestLogger {
assertTrue(slot.markCancelled());
assertTrue(slot.markReleased());
- assertFalse(slot.setExecution(ev));
- assertNull(slot.getExecutedVertex());
+ assertFalse(slot.tryAssignPayload(payload1));
+ assertNull(slot.getPayload());
}
// assign to released
{
SimpleSlot slot = getSlot();
- slot.releaseSlot();
+ slot.releaseInstanceSlot();
- assertFalse(slot.setExecution(ev));
- assertNull(slot.getExecutedVertex());
+ assertFalse(slot.tryAssignPayload(payload1));
+ assertNull(slot.getPayload());
}
}
catch (Exception e) {
@@ -125,27 +124,6 @@ public class SimpleSlotTest extends TestLogger {
}
}
- @Test
- public void testReleaseCancelsVertex() {
- try {
- Execution ev = mock(Execution.class);
-
- SimpleSlot slot = getSlot();
- assertTrue(slot.setExecution(ev));
- assertEquals(ev, slot.getExecutedVertex());
-
- slot.releaseSlot();
- slot.releaseSlot();
- slot.releaseSlot();
-
- verify(ev, times(1)).fail(Matchers.any(Throwable.class));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
public static SimpleSlot getSlot() throws Exception {
ResourceID resourceID = ResourceID.generate();
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 271bc2a..450d377 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -154,7 +154,7 @@ public class SlotPoolTest extends TestLogger {
assertTrue(future2.isDone());
assertNotEquals(slot1, slot2);
- assertTrue(slot1.isReleased());
+ assertFalse(slot1.isAlive());
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
@@ -198,7 +198,7 @@ public class SlotPoolTest extends TestLogger {
assertTrue(future2.isDone());
assertNotEquals(slot1, slot2);
- assertTrue(slot1.isReleased());
+ assertFalse(slot1.isAlive());
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
@@ -294,7 +294,7 @@ public class SlotPoolTest extends TestLogger {
// wait until the slot has been returned
slotReturnFuture.get();
- assertTrue(slot1.isReleased());
+ assertFalse(slot1.isAlive());
// slot released and not usable, second allocation still not fulfilled
Thread.sleep(10);
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
index 36a47b7..925933d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
@@ -19,13 +19,14 @@
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -38,12 +39,12 @@ public class TestingLogicalSlot implements LogicalSlot {
private final TaskManagerGateway taskManagerGateway;
- private final CompletableFuture<?> releaseFuture;
-
- private final AtomicReference<Execution> executionReference;
+ private final AtomicReference<Payload> payloadReference;
private final int slotNumber;
+ private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
+
private final AllocationID allocationId;
public TestingLogicalSlot() {
@@ -61,8 +62,7 @@ public class TestingLogicalSlot implements LogicalSlot {
AllocationID allocationId) {
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
- this.releaseFuture = new CompletableFuture<>();
- this.executionReference = new AtomicReference<>();
+ this.payloadReference = new AtomicReference<>();
this.slotNumber = slotNumber;
this.allocationId = Preconditions.checkNotNull(allocationId);
}
@@ -83,23 +83,21 @@ public class TestingLogicalSlot implements LogicalSlot {
}
@Override
- public boolean isCanceled() {
- return releaseFuture.isDone();
+ public boolean tryAssignPayload(Payload payload) {
+ return payloadReference.compareAndSet(null, payload);
}
+ @Nullable
@Override
- public boolean isReleased() {
- return releaseFuture.isDone();
+ public Payload getPayload() {
+ return payloadReference.get();
}
@Override
- public boolean setExecution(Execution execution) {
- return executionReference.compareAndSet(null, execution);
- }
-
- @Override
- public void releaseSlot() {
+ public CompletableFuture<?> releaseSlot() {
releaseFuture.complete(null);
+
+ return releaseFuture;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
new file mode 100644
index 0000000..3369882
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingPayload.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Simple payload implementation for testing purposes.
+ */
+public class TestingPayload implements LogicalSlot.Payload {
+
+ private final CompletableFuture<?> terminationFuture;
+
+ public TestingPayload() {
+ this.terminationFuture = new CompletableFuture<>();
+ }
+
+
+ @Override
+ public void fail(Throwable cause) {
+ terminationFuture.complete(null);
+ }
+
+ @Override
+ public CompletableFuture<?> getTerminalStateFuture() {
+ return terminationFuture;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 1344aef..3f267ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -143,7 +143,7 @@ public class CoLocationConstraintTest {
assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation());
// release the slot
- slot2_1.releaseSlot();
+ slot2_1.releaseInstanceSlot();
// we should still have a location
assertTrue(constraint.isAssigned());
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 371cca7..2ece70f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -49,7 +50,7 @@ import static org.junit.Assert.fail;
/**
* Tests for the {@link Scheduler} when scheduling individual tasks.
*/
-public class SchedulerIsolatedTasksTest {
+public class SchedulerIsolatedTasksTest extends TestLogger {
@Test
public void testAddAndRemoveInstance() {
@@ -297,10 +298,10 @@ public class SchedulerIsolatedTasksTest {
i2.markDead();
for (LogicalSlot slot : slots) {
- if (Objects.equals(slot.getTaskManagerLocation().getResourceID(), i2.getTaskManagerID())) {
- assertTrue(slot.isCanceled());
+ if (slot.getTaskManagerLocation().getResourceID().equals(i2.getTaskManagerID())) {
+ assertFalse(slot.isAlive());
} else {
- assertFalse(slot.isCanceled());
+ assertTrue(slot.isAlive());
}
slot.releaseSlot();
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
new file mode 100644
index 0000000..7c124ef
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * {@link SlotOwner} implementation for testing purposes.
+ */
+public class TestingSlotOwner implements SlotOwner {
+
+ private volatile Consumer<Slot> returnAllocatedSlotConsumer;
+
+ public void setReturnAllocatedSlotConsumer(Consumer<Slot> returnAllocatedSlotConsumer) {
+ this.returnAllocatedSlotConsumer = returnAllocatedSlotConsumer;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+ final Consumer<Slot> currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer;
+
+ if (currentReturnAllocatedSlotConsumer != null) {
+ currentReturnAllocatedSlotConsumer.accept(slot);
+ }
+
+ return CompletableFuture.completedFuture(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7bca9e46/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 2de6f9e..2d8d02d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.testingUtils
-import java.net.InetAddress
import java.util
import java.util.concurrent._
import java.util.{Collections, UUID}
@@ -39,9 +38,7 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
import org.apache.flink.runtime.jobmaster.JobMaster
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager}
-import org.apache.flink.runtime.metrics.{MetricRegistryImpl, MetricRegistryConfiguration}
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
-import org.apache.flink.runtime.taskexecutor.{TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistryImpl}
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testutils.TestingResourceManager
import org.apache.flink.runtime.util.LeaderRetrievalUtils