You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:05:58 UTC

[01/11] flink git commit: [FLINK-9911][JM] Use SlotPoolGateway to call failAllocation

Repository: flink
Updated Branches:
  refs/heads/master 57b3cde86 -> f1c83a158


[FLINK-9911][JM] Use SlotPoolGateway to call failAllocation

Since the SlotPool is an actor, we must use the SlotPoolGateway to interact with
the SlotPool. Otherwise, we might risk an inconsistent state since there are
multiple threads modifying the component.

This closes #6386.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1c83a15
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1c83a15
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1c83a15

Branch: refs/heads/master
Commit: f1c83a15852b50b004b5916d7da3212b68a89594
Parents: 0180d06
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 21:57:59 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/runtime/jobmaster/JobMaster.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1c83a15/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1660f95..c47f4fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -983,7 +983,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	@Override
 	public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
-		slotPool.failAllocation(allocationID, cause);
+		slotPoolGateway.failAllocation(allocationID, cause);
 	}
 
 	//----------------------------------------------------------------------------------------------


[11/11] flink git commit: [hotfix] Fix checkstyle violations in ExecutionJobVertex

Posted by tr...@apache.org.
[hotfix] Fix checkstyle violations in ExecutionJobVertex


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc5d29ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc5d29ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc5d29ea

Branch: refs/heads/master
Commit: cc5d29ea3c45848f79f20c21bce0b3e773da21eb
Parents: 88572dd
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:46:37 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../executiongraph/ExecutionJobVertex.java      | 44 ++++++++++----------
 1 file changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc5d29ea/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 0691cc7..6da1e0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -317,7 +317,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	public ExecutionGraph getGraph() {
 		return graph;
 	}
-	
+
 	public JobVertex getJobVertex() {
 		return jobVertex;
 	}
@@ -344,33 +344,33 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	public JobID getJobId() {
 		return graph.getJobID();
 	}
-	
+
 	@Override
 	public JobVertexID getJobVertexId() {
 		return jobVertex.getID();
 	}
-	
+
 	@Override
 	public ExecutionVertex[] getTaskVertices() {
 		return taskVertices;
 	}
-	
+
 	public IntermediateResult[] getProducedDataSets() {
 		return producedDataSets;
 	}
-	
+
 	public InputSplitAssigner getSplitAssigner() {
 		return splitAssigner;
 	}
-	
+
 	public SlotSharingGroup getSlotSharingGroup() {
 		return slotSharingGroup;
 	}
-	
+
 	public CoLocationGroup getCoLocationGroup() {
 		return coLocationGroup;
 	}
-	
+
 	public List<IntermediateResult> getInputs() {
 		return inputs;
 	}
@@ -423,28 +423,28 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 
 	//---------------------------------------------------------------------------------------------
-	
+
 	public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
-		
+
 		List<JobEdge> inputs = jobVertex.getInputs();
-		
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
 		}
-		
+
 		for (int num = 0; num < inputs.size(); num++) {
 			JobEdge edge = inputs.get(num);
-			
+
 			if (LOG.isDebugEnabled()) {
 				if (edge.getSource() == null) {
-					LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", 
+					LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
 							num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
 				} else {
 					LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
 							num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
 				}
 			}
-			
+
 			// fetch the intermediate result via ID. if it does not exist, then it either has not been created, or the order
 			// in which this method is called for the job vertices is not a topological order
 			IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
@@ -452,18 +452,18 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 				throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
 						+ edge.getSourceId());
 			}
-			
+
 			this.inputs.add(ires);
-			
+
 			int consumerIndex = ires.registerConsumer();
-			
+
 			for (int i = 0; i < parallelism; i++) {
 				ExecutionVertex ev = taskVertices[i];
 				ev.connectSource(num, ires, edge, consumerIndex);
 			}
 		}
 	}
-	
+
 	//---------------------------------------------------------------------------------------------
 	//  Actions
 	//---------------------------------------------------------------------------------------------
@@ -480,7 +480,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			SlotProvider slotProvider,
 			boolean queued,
 			LocationPreferenceConstraint locationPreferenceConstraint) {
-		
+
 		final ExecutionVertex[] vertices = this.taskVertices;
 
 		final ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<>(vertices.length);
@@ -497,9 +497,9 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
 	 * pairs of the slots and execution attempts, to ease correlation between vertices and execution
 	 * attempts.
-	 * 
+	 *
 	 * <p>If this method throws an exception, it makes sure to release all so far requested slots.
-	 * 
+	 *
 	 * @param resourceProvider The resource provider from whom the slots are requested.
 	 * @param queued if the allocation can be queued
 	 * @param locationPreferenceConstraint constraint for the location preferences


[09/11] flink git commit: [hotfix] Fix checkstyle violations in ExecutionVertex

Posted by tr...@apache.org.
[hotfix] Fix checkstyle violations in ExecutionVertex


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88572dd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88572dd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88572dd3

Branch: refs/heads/master
Commit: 88572dd3821cce33066e226a3d33bc8b073e2109
Parents: 3422ee8
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:43:44 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java | 80 ++++++++++----------
 1 file changed, 39 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88572dd3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 8b57a7a..e385318 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -33,9 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -46,6 +44,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -96,12 +96,12 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private final Time timeout;
 
-	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
 	private final String taskNameWithSubtask;
 
 	private volatile CoLocationConstraint locationConstraint;
 
-	/** The current or latest execution attempt of this vertex's task */
+	/** The current or latest execution attempt of this vertex's task. */
 	private volatile Execution currentExecution;	// this field must never be null
 
 	// --------------------------------------------------------------------------------------------
@@ -117,17 +117,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			Time timeout) {
 
 		this(
-				jobVertex,
-				subTaskIndex,
-				producedDataSets,
-				timeout,
-				1L,
-				System.currentTimeMillis(),
-				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
+			jobVertex,
+			subTaskIndex,
+			producedDataSets,
+			timeout,
+			1L,
+			System.currentTimeMillis(),
+			JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
 	}
 
 	/**
-	 * 
+	 * Creates an ExecutionVertex.
+	 *
 	 * @param timeout
 	 *            The RPC timeout to use for deploy / cancel calls
 	 * @param initialGlobalModVersion
@@ -311,7 +312,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	/**
 	 * Gets the location where the latest completed/canceled/failed execution of the vertex's
 	 * task happened.
-	 * 
+	 *
 	 * @return The latest prior execution location, or null, if there is none, yet.
 	 */
 	public TaskManagerLocation getLatestPriorLocation() {
@@ -444,36 +445,36 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	/**
 	 * Gets the overall preferred execution location for this vertex's current execution.
 	 * The preference is determined as follows:
-	 * 
+	 *
 	 * <ol>
 	 *     <li>If the task execution has state to load (from a checkpoint), then the location preference
 	 *         is the location of the previous execution (if there is a previous execution attempt).
 	 *     <li>If the task execution has no state or no previous location, then the location preference
 	 *         is based on the task's inputs.
 	 * </ol>
-	 * 
-	 * These rules should result in the following behavior:
-	 * 
+	 *
+	 * <p>These rules should result in the following behavior:
+	 *
 	 * <ul>
 	 *     <li>Stateless tasks are always scheduled based on co-location with inputs.
 	 *     <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
 	 *     <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
 	 * </ul>
-	 * 
-	 * @return The preferred execution locations for the execution attempt.
-	 * 
+	 *
 	 * @see #getPreferredLocationsBasedOnState()
-	 * @see #getPreferredLocationsBasedOnInputs() 
+	 * @see #getPreferredLocationsBasedOnInputs()
+	 *
+	 * @return The preferred execution locations for the execution attempt.
 	 */
 	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
 		Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState();
 		return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
 	}
-	
+
 	/**
 	 * Gets the preferred location to execute the current task execution attempt, based on the state
 	 * that the execution attempt will resume.
-	 * 
+	 *
 	 * @return A size-one collection with the location preference, or null, if there is no
 	 *         location preference based on the state.
 	 */
@@ -542,27 +543,25 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	/**
 	 * Archives the current Execution and creates a new Execution for this vertex.
-	 * 
+	 *
 	 * <p>This method atomically checks if the ExecutionGraph is still of an expected
 	 * global mod. version and replaces the execution if that is the case. If the ExecutionGraph
 	 * has increased its global mod. version in the meantime, this operation fails.
-	 * 
+	 *
 	 * <p>This mechanism can be used to prevent conflicts between various concurrent recovery and
 	 * reconfiguration actions in a similar way as "optimistic concurrency control".
-	 * 
+	 *
 	 * @param timestamp
 	 *             The creation timestamp for the new Execution
 	 * @param originatingGlobalModVersion
-	 *             The 
-	 * 
-	 * @return Returns the new created Execution. 
-	 * 
+	 *
+	 * @return Returns the new created Execution.
+	 *
 	 * @throws GlobalModVersionMismatch Thrown, if the execution graph has a new global mod
 	 *                                  version than the one passed to this message.
 	 */
 	public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion)
-			throws GlobalModVersionMismatch
-	{
+			throws GlobalModVersionMismatch {
 		LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
 
 		synchronized (priorExecutions) {
@@ -642,12 +641,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	}
 
 	/**
-	 *  
+	 * Cancels this ExecutionVertex.
+	 *
 	 * @return A future that completes once the execution has reached its final state.
 	 */
 	public CompletableFuture<?> cancel() {
 		// to avoid any case of mixup in the presence of concurrent calls,
-		// we copy a reference to the stack to make sure both calls go to the same Execution 
+		// we copy a reference to the stack to make sure both calls go to the same Execution
 		final Execution exec = this.currentExecution;
 		exec.cancel();
 		return exec.getReleaseFuture();
@@ -742,7 +742,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Simply forward this notification
+	 * Simply forward this notification.
 	 */
 	void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) {
 		// only forward this notification if the execution is still the current execution
@@ -754,7 +754,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	/**
 	 * Creates a task deployment descriptor to deploy a subtask to the given target slot.
-	 *
 	 * TODO: This should actually be in the EXECUTION
 	 */
 	TaskDeploymentDescriptor createDeploymentDescriptor(
@@ -762,13 +761,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			LogicalSlot targetSlot,
 			@Nullable JobManagerTaskRestore taskRestore,
 			int attemptNumber) throws ExecutionGraphException {
-		
+
 		// Produced intermediate results
 		List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<>(resultPartitions.size());
-		
+
 		// Consumed intermediate results
 		List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<>(inputEdges.length);
-		
+
 		boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
 		for (IntermediateResultPartition partition : resultPartitions.values()) {
@@ -791,8 +790,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling));
 			}
 		}
-		
-		
+
 		for (ExecutionEdge[] edges : inputEdges) {
 			InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges(
 				edges,


[06/11] flink git commit: [FLINK-9908][scheduling] Do not cancel individual scheduling future

Posted by tr...@apache.org.
[FLINK-9908][scheduling] Do not cancel individual scheduling future

Since the individual scheduling futures contain logic to release the slot if it cannot
be assigned to the Execution, we must not cancel them. Otherwise we might risk that
slots are not returned to the SlotPool leaving it in an inconsistent state.

This closes #6383.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c897471d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c897471d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c897471d

Branch: refs/heads/master
Commit: c897471ddbe25ed85b9f9ec6b15fdbd11cf0cec5
Parents: 19d39ec
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:05:05 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |   8 --
 .../ExecutionGraphSchedulingTest.java           | 111 ++++++++++++++++++-
 .../executiongraph/TestingSlotProvider.java     |  82 ++++++++++++++
 .../jobmanager/slots/TestingSlotOwner.java      |   2 +-
 4 files changed, 193 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c897471d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 22c11ef..acb1e16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -989,14 +989,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 					throw new CompletionException(resultThrowable);
 				});
 
-		currentSchedulingFuture.whenComplete(
-			(Void ignored, Throwable throwable) -> {
-				if (throwable instanceof CancellationException) {
-					// cancel the individual allocation futures
-					allAllocationsFuture.cancel(false);
-				}
-			});
-
 		return currentSchedulingFuture;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c897471d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index e43137b..6092f52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
@@ -46,6 +48,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
@@ -62,14 +65,19 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.net.InetAddress;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -504,6 +512,89 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED));
 	}
 
+	/**
+	 * Tests that all slots are being returned to the {@link SlotOwner} if the
+	 * {@link ExecutionGraph} is being cancelled. See FLINK-9908
+	 */
+	@Test
+	public void testCancellationOfIncompleteScheduling() throws Exception {
+		final int parallelism = 10;
+
+		final JobVertex jobVertex = new JobVertex("Test job vertex");
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+		jobVertex.setParallelism(parallelism);
+
+		final JobGraph jobGraph = new JobGraph(jobVertex);
+		jobGraph.setAllowQueuedScheduling(true);
+		jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+		final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+
+		final ConcurrentMap<SlotRequestId, Integer> slotRequestIds = new ConcurrentHashMap<>(parallelism);
+		final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism);
+
+		final TestingSlotProvider slotProvider = new TestingSlotProvider(
+			(SlotRequestId slotRequestId) -> {
+				slotRequestIds.put(slotRequestId, 1);
+				requestedSlotsLatch.countDown();
+				return new CompletableFuture<>();
+			});
+
+
+		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider);
+
+		executionGraph.scheduleForExecution();
+
+		// wait until we have requested all slots
+		requestedSlotsLatch.await();
+
+		final Set<SlotRequestId> slotRequestIdsToReturn = ConcurrentHashMap.newKeySet(slotRequestIds.size());
+		slotRequestIdsToReturn.addAll(slotRequestIds.keySet());
+		final CountDownLatch countDownLatch = new CountDownLatch(slotRequestIds.size());
+
+		slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> {
+			slotRequestIdsToReturn.remove(logicalSlot.getSlotRequestId());
+			countDownLatch.countDown();
+		});
+		slotProvider.setSlotCanceller(slotRequestId -> {
+			slotRequestIdsToReturn.remove(slotRequestId);
+			countDownLatch.countDown();
+		});
+
+		final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch();
+
+		// start completing the slot requests asynchronously
+		executor.execute(
+			() -> {
+				slotRequestsBeingFulfilled.trigger();
+
+				for (SlotRequestId slotRequestId : slotRequestIds.keySet()) {
+					final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(slotOwner, taskManagerGateway, slotRequestId);
+					slotProvider.complete(slotRequestId, singleLogicalSlot);
+				}
+			});
+
+		// make sure that we complete cancellations of deployed tasks
+		taskManagerGateway.setCancelConsumer(
+			(ExecutionAttemptID executionAttemptId) -> {
+				final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttemptId);
+
+				// if the execution was cancelled in state SCHEDULING, then it might already have been removed
+				if (execution != null) {
+					execution.cancelingComplete();
+				}
+			}
+		);
+
+		slotRequestsBeingFulfilled.await();
+
+		executionGraph.cancel();
+
+		countDownLatch.await();
+		assertThat(slotRequestIdsToReturn, is(empty()));
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -548,11 +639,29 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		return new SimpleSlot(slot, slotOwner, 0);
 	}
 
+	@Nonnull
+	private SingleLogicalSlot createSingleLogicalSlot(TestingSlotOwner slotOwner, SimpleAckingTaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
+		TaskManagerLocation location = new TaskManagerLocation(
+			ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
+
+		SimpleSlotContext slotContext = new SimpleSlotContext(
+			new AllocationID(),
+			location,
+			0,
+			taskManagerGateway);
+
+		return new SingleLogicalSlot(
+			slotRequestId,
+			slotContext,
+			null,
+			Locality.LOCAL,
+			slotOwner);
+	}
+
 	private static TaskManagerGateway createTaskManager() {
 		TaskManagerGateway tm = mock(TaskManagerGateway.class);
 		when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
 				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
-
 		return tm;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c897471d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java
new file mode 100644
index 0000000..ed8fe13
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * {@link SlotProvider} implementation for testing purposes.
+ */
+final class TestingSlotProvider implements SlotProvider {
+
+	private final ConcurrentMap<SlotRequestId, CompletableFuture<LogicalSlot>> slotFutures;
+
+	private final Function<SlotRequestId, CompletableFuture<LogicalSlot>> slotFutureCreator;
+
+	private volatile Consumer<SlotRequestId> slotCanceller = ignored -> {};
+
+	TestingSlotProvider(Function<SlotRequestId, CompletableFuture<LogicalSlot>> slotFutureCreator) {
+		this.slotFutureCreator = slotFutureCreator;
+		this.slotFutures = new ConcurrentHashMap<>(4);
+	}
+
+	public void setSlotCanceller(Consumer<SlotRequestId> slotCanceller) {
+		this.slotCanceller = slotCanceller;
+	}
+
+	@Override
+	public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, boolean allowQueued, SlotProfile slotProfile, Time timeout) {
+		Preconditions.checkState(!slotFutures.containsKey(slotRequestId));
+		final CompletableFuture<LogicalSlot> slotFuture = slotFutureCreator.apply(slotRequestId);
+
+		slotFutures.put(slotRequestId, slotFuture);
+
+		return slotFuture;
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
+		final CompletableFuture<LogicalSlot> slotFuture = slotFutures.remove(slotRequestId);
+		slotFuture.cancel(false);
+
+		slotCanceller.accept(slotRequestId);
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	public void complete(SlotRequestId slotRequestId, LogicalSlot logicalSlot) {
+		slotFutures.get(slotRequestId).complete(logicalSlot);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c897471d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
index 727c0b5..b922204 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
@@ -37,7 +37,7 @@ public class TestingSlotOwner implements SlotOwner {
 
 	@Override
 	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
-		final Consumer<LogicalSlot> currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer;
+		final Consumer<LogicalSlot> currentReturnAllocatedSlotConsumer = returnAllocatedSlotConsumer;
 
 		if (currentReturnAllocatedSlotConsumer != null) {
 			currentReturnAllocatedSlotConsumer.accept(logicalSlot);


[02/11] flink git commit: [hotfix] Fix checkstyle violations in Execution

Posted by tr...@apache.org.
[hotfix] Fix checkstyle violations in Execution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1391a09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1391a09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1391a09

Branch: refs/heads/master
Commit: b1391a09d52f039c5061c1ead52d536c6c0e64da
Parents: cc5d29e
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:48:53 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 27 ++++++++++----------
 1 file changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1391a09/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 57aa0d5..f8419d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -179,7 +179,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	/**
 	 * Creates a new Execution attempt.
-	 * 
+	 *
 	 * @param executor
 	 *             The executor used to dispatch callbacks from futures and asynchronous RPC calls.
 	 * @param vertex
@@ -250,7 +250,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	/**
 	 * Gets the global modification version of the execution graph when this execution was created.
-	 * 
+	 *
 	 * <p>This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
 	 * to resolve conflicts between concurrent modification by global and local failover actions.
 	 */
@@ -391,7 +391,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
 	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
 	 *       error sets the vertex state to failed and triggers the recovery logic.
-	 * 
+	 *
 	 * @param slotProvider The slot provider to use to allocate slot for this execution attempt.
 	 * @param queued Flag to indicate whether the scheduler may queue this task if it cannot
 	 *               immediately deploy it.
@@ -773,11 +773,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					final TaskManagerLocation partitionTaskManagerLocation = partition.getProducer()
 							.getCurrentAssignedResource().getTaskManagerLocation();
 					final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
-					
+
 					final ResourceID consumerTaskManager = consumerSlot.getTaskManagerLocation().getResourceID();
 
 					final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId);
-					
 
 					final ResultPartitionLocation partitionLocation;
 
@@ -986,7 +985,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	void cancelingComplete() {
 		cancelingComplete(null, null);
 	}
-	
+
 	void cancelingComplete(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
 
 		// the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
@@ -1148,7 +1147,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			else if (currentState == CANCELING || currentState == FAILED) {
 				if (LOG.isDebugEnabled()) {
 					// this log statement is guarded because the 'getVertexWithAttempt()' method
-					// performs string concatenations 
+					// performs string concatenations
 					LOG.debug("Concurrent canceling/failing of {} while deployment was in progress.", getVertexWithAttempt());
 				}
 				sendCancelRpcCall();
@@ -1175,7 +1174,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	/**
 	 * This method sends a CancelTask message to the instance of the assigned slot.
 	 *
-	 * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
+	 * <p>The sending is tried up to NUM_CANCEL_CALL_TRIES times.
 	 */
 	private void sendCancelRpcCall() {
 		final LogicalSlot slot = assignedResource;
@@ -1238,9 +1237,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	/**
 	 * Releases the assigned resource and completes the release future
-	 * once the assigned resource has been successfully released
+	 * once the assigned resource has been successfully released.
 	 *
-	 * @param cause for the resource release, null if none 	 
+	 * @param cause for the resource release, null if none
 	 */
 	private void releaseAssignedResource(@Nullable Throwable cause) {
 		final LogicalSlot slot = assignedResource;
@@ -1357,7 +1356,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	// ------------------------------------------------------------------------
 	//  Accumulators
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Update accumulators (discarded when the Execution has already been terminated).
 	 * @param userAccumulators the user accumulators
@@ -1369,7 +1368,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			}
 		}
 	}
-	
+
 	public Map<String, Accumulator<?, ?>> getUserAccumulators() {
 		return userAccumulators;
 	}
@@ -1389,7 +1388,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	public int getParallelSubtaskIndex() {
 		return getVertex().getParallelSubtaskIndex();
 	}
-		
+
 	@Override
 	public IOMetrics getIOMetrics() {
 		return ioMetrics;
@@ -1409,7 +1408,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	// ------------------------------------------------------------------------
 	//  Standard utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		final LogicalSlot slot = assignedResource;


[04/11] flink git commit: [FLINK-9838][logging] Don't log slot request failures on the ResourceManager

Posted by tr...@apache.org.
[FLINK-9838][logging] Don't log slot request failures on the ResourceManager

This closes #6373.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e616a83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e616a83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e616a83

Branch: refs/heads/master
Commit: 4e616a8362744c15a71e0e57ad68fbb52266c837
Parents: 57b3cde
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 19 13:07:44 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java    | 4 ++--
 .../apache/flink/runtime/resourcemanager/ResourceManager.java    | 1 -
 2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e616a83/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 27440a3..829c82e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -1011,7 +1011,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			failPendingRequest(pendingRequest, cause);
 		}
 		else if (availableSlots.tryRemove(allocationID)) {
-			log.debug("Failed available slot [{}] with ", allocationID, cause);
+			log.debug("Failed available slot with allocation id {}.", allocationID, cause);
 		}
 		else {
 			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlot.releasePayload(cause);
 			}
 			else {
-				log.trace("Outdated request to fail slot [{}] with ", allocationID, cause);
+				log.trace("Outdated request to fail slot with allocation id {}.", allocationID, cause);
 			}
 		}
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase

http://git-wip-us.apache.org/repos/asf/flink/blob/4e616a83/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 453ec8b..a992632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1005,7 +1005,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		@Override
 		public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
 			validateRunsInMainThread();
-			log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
 
 			JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
 			if (jobManagerRegistration != null) {


[08/11] flink git commit: [FLINK-9909][core] ConjunctFuture does not cancel input futures

Posted by tr...@apache.org.
[FLINK-9909][core] ConjunctFuture does not cancel input futures

If a ConjunctFuture is cancelled, then it won't cancel all of its input
futures automatically. If the users needs this behaviour then he has to
implement it explicitly. The reason for this change is that an implicit
cancellation can have unwanted side effects, because all of the cancelled
input futures' producers won't be executed.

This closes #6384.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9afda733
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9afda733
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9afda733

Branch: refs/heads/master
Commit: 9afda733a90a72be75ced9567452c6a7a5e3dc8c
Parents: c897471
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:17:11 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 40 --------------------
 .../runtime/concurrent/FutureUtilsTest.java     | 34 -----------------
 2 files changed, 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9afda733/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 1cffaab..3a7e800 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -508,22 +508,6 @@ public class FutureUtils {
 		 * @return The number of Futures in the conjunction that are already complete
 		 */
 		public abstract int getNumFuturesCompleted();
-
-		/**
-		 * Gets the individual futures which make up the {@link ConjunctFuture}.
-		 *
-		 * @return Collection of futures which make up the {@link ConjunctFuture}
-		 */
-		protected abstract Collection<? extends CompletableFuture<?>> getConjunctFutures();
-
-		@Override
-		public boolean cancel(boolean mayInterruptIfRunning) {
-			for (CompletableFuture<?> completableFuture : getConjunctFutures()) {
-				completableFuture.cancel(mayInterruptIfRunning);
-			}
-
-			return super.cancel(mayInterruptIfRunning);
-		}
 	}
 
 	/**
@@ -531,8 +515,6 @@ public class FutureUtils {
 	 */
 	private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> {
 
-		private final Collection<? extends CompletableFuture<? extends T>> resultFutures;
-
 		/** The total number of futures in the conjunction. */
 		private final int numTotal;
 
@@ -564,7 +546,6 @@ public class FutureUtils {
 
 		@SuppressWarnings("unchecked")
 		ResultConjunctFuture(Collection<? extends CompletableFuture<? extends T>> resultFutures) {
-			this.resultFutures = checkNotNull(resultFutures);
 			this.numTotal = resultFutures.size();
 			results = (T[]) new Object[numTotal];
 
@@ -587,11 +568,6 @@ public class FutureUtils {
 		public int getNumFuturesCompleted() {
 			return numCompleted.get();
 		}
-
-		@Override
-		protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
-			return resultFutures;
-		}
 	}
 
 	/**
@@ -600,8 +576,6 @@ public class FutureUtils {
 	 */
 	private static final class WaitingConjunctFuture extends ConjunctFuture<Void> {
 
-		private final Collection<? extends CompletableFuture<?>> futures;
-
 		/** Number of completed futures. */
 		private final AtomicInteger numCompleted = new AtomicInteger(0);
 
@@ -620,7 +594,6 @@ public class FutureUtils {
 		}
 
 		private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> futures) {
-			this.futures = checkNotNull(futures);
 			this.numTotal = futures.size();
 
 			if (futures.isEmpty()) {
@@ -641,11 +614,6 @@ public class FutureUtils {
 		public int getNumFuturesCompleted() {
 			return numCompleted.get();
 		}
-
-		@Override
-		protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
-			return futures;
-		}
 	}
 
 	/**
@@ -673,14 +641,11 @@ public class FutureUtils {
 
 		private final int numFuturesTotal;
 
-		private final Collection<? extends CompletableFuture<?>> futuresToComplete;
-
 		private int futuresCompleted;
 
 		private Throwable globalThrowable;
 
 		private CompletionConjunctFuture(Collection<? extends CompletableFuture<?>> futuresToComplete) {
-			this.futuresToComplete = checkNotNull(futuresToComplete);
 			numFuturesTotal = futuresToComplete.size();
 
 			futuresCompleted = 0;
@@ -725,11 +690,6 @@ public class FutureUtils {
 				return futuresCompleted;
 			}
 		}
-
-		@Override
-		protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
-			return futuresToComplete;
-		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9afda733/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 07bc4c1..1639c91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -42,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.hamcrest.CoreMatchers.containsString;
@@ -548,38 +546,6 @@ public class FutureUtilsTest extends TestLogger {
 	}
 
 	@Test
-	public void testCancelWaitingConjunctFuture() {
-		cancelConjunctFuture(inputFutures -> FutureUtils.waitForAll(inputFutures));
-	}
-
-	@Test
-	public void testCancelResultConjunctFuture() {
-		cancelConjunctFuture(inputFutures -> FutureUtils.combineAll(inputFutures));
-	}
-
-	@Test
-	public void testCancelCompleteConjunctFuture() {
-		cancelConjunctFuture(inputFutures -> FutureUtils.completeAll(inputFutures));
-	}
-
-	private void cancelConjunctFuture(Function<Collection<? extends CompletableFuture<?>>, FutureUtils.ConjunctFuture<?>> conjunctFutureFactory) {
-		final int numInputFutures = 10;
-		final Collection<CompletableFuture<Void>> inputFutures = new ArrayList<>(numInputFutures);
-
-		for (int i = 0; i < numInputFutures; i++) {
-			inputFutures.add(new CompletableFuture<>());
-		}
-
-		final FutureUtils.ConjunctFuture<?> conjunctFuture = conjunctFutureFactory.apply(inputFutures);
-
-		conjunctFuture.cancel(false);
-
-		for (CompletableFuture<Void> inputFuture : inputFutures) {
-			assertThat(inputFuture.isCancelled(), is(true));
-		}
-	}
-
-	@Test
 	public void testSupplyAsyncFailure() throws Exception {
 		final String exceptionMessage = "Test exception";
 		final FlinkException testException = new FlinkException(exceptionMessage);


[05/11] flink git commit: [hotfix] Improve logging of SlotPool and SlotSharingManager

Posted by tr...@apache.org.
[hotfix] Improve logging of SlotPool and SlotSharingManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19d39ec7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19d39ec7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19d39ec7

Branch: refs/heads/master
Commit: 19d39ec7ad2ed69fe81cea72299466bd7d6965e5
Parents: 4e616a8
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 19 13:41:03 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/AllocationID.java    |  5 +++
 .../flink/runtime/jobmaster/SlotRequestId.java  |  5 +++
 .../runtime/jobmaster/slotpool/SlotPool.java    | 36 ++++++++++----------
 .../jobmaster/slotpool/SlotSharingManager.java  | 21 ++++++++++--
 4 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
index e722e9f..7004eff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java
@@ -52,4 +52,9 @@ public class AllocationID extends AbstractID {
 	public AllocationID(long lowerPart, long upperPart) {
 		super(lowerPart, upperPart);
 	}
+
+	@Override
+	public String toString() {
+		return "AllocationID{" + super.toString() + '}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
index 203139c..5ac200d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotRequestId.java
@@ -40,4 +40,9 @@ public final class SlotRequestId extends AbstractID {
 	}
 
 	public SlotRequestId() {}
+
+	@Override
+	public String toString() {
+		return "SlotRequestId{" + super.toString() + '}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 829c82e..13f0462 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -323,7 +323,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			boolean allowQueuedScheduling,
 			Time allocationTimeout) {
 
-		log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute());
+		log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute());
 
 		final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
 
@@ -686,7 +686,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		checkNotNull(resourceManagerGateway);
 		checkNotNull(pendingRequest);
 
-		log.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
+		log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile());
 
 		final AllocationID allocationId = new AllocationID();
 
@@ -723,7 +723,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 					"No pooled slot available and request to ResourceManager for new slot failed", failure));
 		} else {
 			if (log.isDebugEnabled()) {
-				log.debug("Unregistered slot request {} failed.", slotRequestID, failure);
+				log.debug("Unregistered slot request [{}] failed.", slotRequestID, failure);
 			}
 		}
 	}
@@ -731,7 +731,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
 
 		log.info("Cannot serve slot request, no ResourceManager connected. " +
-				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
+				"Adding as pending request [{}]",  pendingRequest.getSlotRequestId());
 
 		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
 	}
@@ -742,7 +742,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 	@Override
 	public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
-		log.debug("Releasing slot with slot request id {} because of {}.", slotRequestId, cause != null ? cause.getMessage() : "null");
+		log.debug("Releasing slot [{}] because: {}", slotRequestId, cause != null ? cause.getMessage() : "null");
 
 		if (slotSharingGroupId != null) {
 			final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
@@ -753,7 +753,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				if (taskSlot != null) {
 					taskSlot.release(cause);
 				} else {
-					log.debug("Could not find slot {} in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
+					log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
 				}
 			} else {
 				log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId);
@@ -770,7 +770,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 					allocatedSlot.releasePayload(cause);
 					tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 				} else {
-					log.debug("There is no allocated slot with slot request id {}. Ignoring the release slot request.", slotRequestId);
+					log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", slotRequestId);
 				}
 			}
 		}
@@ -801,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	}
 
 	private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
-		Preconditions.checkNotNull(pendingRequest);
-		Preconditions.checkNotNull(e);
+		checkNotNull(pendingRequest);
+		checkNotNull(e);
 
 		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
-			log.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
+			log.info("Failing pending slot request [{}]: {}", pendingRequest.getSlotRequestId(), e.getMessage());
 			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
 		}
 	}
@@ -833,7 +833,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
 
 		if (pendingRequest != null) {
-			log.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+			log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",
 				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
 			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
@@ -970,7 +970,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlots.remove(pendingRequest.getSlotRequestId());
 				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 			} else {
-				log.debug("Fulfilled slot request {} with allocated slot {}.", pendingRequest.getSlotRequestId(), allocationID);
+				log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
 			}
 		}
 		else {
@@ -1011,7 +1011,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			failPendingRequest(pendingRequest, cause);
 		}
 		else if (availableSlots.tryRemove(allocationID)) {
-			log.debug("Failed available slot with allocation id {}.", allocationID, cause);
+			log.debug("Failed available slot [{}].", allocationID, cause);
 		}
 		else {
 			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
@@ -1021,7 +1021,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 				allocatedSlot.releasePayload(cause);
 			}
 			else {
-				log.trace("Outdated request to fail slot with allocation id {}.", allocationID, cause);
+				log.trace("Outdated request to fail slot [{}].", allocationID, cause);
 			}
 		}
 		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
@@ -1068,7 +1068,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 	@VisibleForTesting
 	protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
-		log.info("Pending slot request {} timed out.", slotRequestId);
+		log.info("Pending slot request [{}] timed out.", slotRequestId);
 		removePendingRequest(slotRequestId);
 	}
 
@@ -1109,7 +1109,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 			final AllocationID allocationID = expiredSlot.getAllocationId();
 			if (availableSlots.tryRemove(allocationID)) {
 
-				log.info("Releasing idle slot {}.", allocationID);
+				log.info("Releasing idle slot [{}].", allocationID);
 				final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
 					allocationID,
 					cause,
@@ -1119,12 +1119,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 					(Acknowledge ignored, Throwable throwable) -> {
 						if (throwable != null) {
 							if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
-								log.debug("Releasing slot {} of registered TaskExecutor {} failed. " +
+								log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
 									"Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
 									throwable);
 								tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
 							} else {
-								log.debug("Releasing slot {} failed and owning TaskExecutor {} is no " +
+								log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
 									"longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
 							}
 						}

http://git-wip-us.apache.org/repos/asf/flink/blob/19d39ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index eaa5787..afcd24f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -32,6 +32,9 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
@@ -77,6 +80,8 @@ import java.util.function.Function;
  */
 public class SlotSharingManager {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class);
+
 	/** Lock for the internal data structures. */
 	private final Object lock = new Object();
 
@@ -143,6 +148,8 @@ public class SlotSharingManager {
 			slotContextFuture,
 			allocatedSlotRequestId);
 
+		LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
+
 		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
 
 		synchronized (lock) {
@@ -158,6 +165,8 @@ public class SlotSharingManager {
 						final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
 
 						if (resolvedRootNode != null) {
+							LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+
 							final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
 								slotContext.getTaskManagerLocation(),
 								taskManagerLocation -> new HashSet<>(4));
@@ -384,6 +393,8 @@ public class SlotSharingManager {
 		MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) {
 			Preconditions.checkState(!super.contains(groupId));
 
+			LOG.debug("Create nested multi task slot [{}] in parent multi task slot [{}] for group [{}].", slotRequestId, getSlotRequestId(), groupId);
+
 			final MultiTaskSlot inner = new MultiTaskSlot(
 				slotRequestId,
 				groupId,
@@ -412,6 +423,8 @@ public class SlotSharingManager {
 				Locality locality) {
 			Preconditions.checkState(!super.contains(groupId));
 
+			LOG.debug("Create single task slot [{}] in multi task slot [{}] for group {}.", slotRequestId, getSlotRequestId(), groupId);
+
 			final SingleTaskSlot leaf = new SingleTaskSlot(
 				slotRequestId,
 				groupId,
@@ -557,13 +570,15 @@ public class SlotSharingManager {
 			Preconditions.checkNotNull(locality);
 			singleLogicalSlotFuture = parent.getSlotContextFuture()
 				.thenApply(
-					(SlotContext slotContext) ->
-						new SingleLogicalSlot(
+					(SlotContext slotContext) -> {
+						LOG.trace("Fulfill single task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
+						return new SingleLogicalSlot(
 							slotRequestId,
 							slotContext,
 							slotSharingGroupId,
 							locality,
-							slotOwner));
+							slotOwner);
+					});
 		}
 
 		CompletableFuture<LogicalSlot> getLogicalSlotFuture() {


[10/11] flink git commit: [FLINK-9910][scheduling] Execution#scheduleForeExecution does not cancel slot future

Posted by tr...@apache.org.
[FLINK-9910][scheduling] Execution#scheduleForeExecution does not cancel slot future

In order to properly give back an allocated slot to the SlotPool, one must not complete
the result future of Execution#allocateAndAssignSlotForExecution. This commit changes the
behaviour in Execution#scheduleForExecution accordingly.

This closes #6385.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0180d068
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0180d068
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0180d068

Branch: refs/heads/master
Commit: 0180d068565d99db1db998944686064ceddf398f
Parents: b1391a0
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 21:38:42 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../concurrent/FutureConsumerWithException.java | 43 +++++++++++
 .../flink/runtime/executiongraph/Execution.java | 24 +++----
 .../ExecutionGraphSchedulingTest.java           |  2 +-
 .../runtime/executiongraph/ExecutionTest.java   | 75 ++++++++++++++++++++
 4 files changed, 129 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0180d068/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
new file mode 100644
index 0000000..c49d7dc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureConsumerWithException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+/**
+ * A checked extension of the {@link Consumer} interface which rethrows
+ * exceptions wrapped in a {@link CompletionException}.
+ *
+ * @param <T> type of the first argument
+ * @param <E> type of the thrown exception
+ */
+public interface FutureConsumerWithException<T, E extends Throwable> extends Consumer<T> {
+
+	void acceptWithException(T value) throws E;
+
+	@Override
+	default void accept(T value) {
+		try {
+			acceptWithException(value);
+		} catch (Throwable t) {
+			throw new CompletionException(t);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0180d068/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index f8419d3..801f35a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -54,6 +54,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
 
 import org.slf4j.Logger;
 
@@ -413,24 +414,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
 			// that we directly deploy the tasks if the slot allocation future is completed. This is
 			// necessary for immediate deployment.
-			final CompletableFuture<Void> deploymentFuture = allocationFuture.handle(
-				(Execution ignored, Throwable throwable) -> {
-					if (throwable != null) {
-						markFailed(ExceptionUtils.stripCompletionException(throwable));
-					} else {
-						try {
-							deploy();
-						} catch (Throwable t) {
-							markFailed(ExceptionUtils.stripCompletionException(t));
-						}
+			final CompletableFuture<Void> deploymentFuture = allocationFuture.thenAccept(
+				(FutureConsumerWithException<Execution, Exception>) value -> deploy());
+
+			deploymentFuture.whenComplete(
+				(Void ignored, Throwable failure) -> {
+					if (failure != null) {
+						markFailed(ExceptionUtils.stripCompletionException(failure));
 					}
-					return null;
-				}
-			);
+				});
 
 			// if tasks have to scheduled immediately check that the task has been deployed
 			if (!queued && !deploymentFuture.isDone()) {
-				allocationFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet."));
+				deploymentFuture.completeExceptionally(new IllegalArgumentException("The slot allocation future has not been completed yet."));
 			}
 
 			return deploymentFuture;

http://git-wip-us.apache.org/repos/asf/flink/blob/0180d068/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 6092f52..6680c9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -640,7 +640,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 	}
 
 	@Nonnull
-	private SingleLogicalSlot createSingleLogicalSlot(TestingSlotOwner slotOwner, SimpleAckingTaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
+	static SingleLogicalSlot createSingleLogicalSlot(SlotOwner slotOwner, TaskManagerGateway taskManagerGateway, SlotRequestId slotRequestId) {
 		TaskManagerLocation location = new TaskManagerLocation(
 			ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0180d068/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index d3e88e1..56fd7e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -50,6 +51,8 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -419,6 +422,78 @@ public class ExecutionTest extends TestLogger {
 		assertThat(execution.getTaskRestore(), is(nullValue()));
 	}
 
+	@Test
+	public void testEagerSchedulingFailureReturnsSlot() throws Exception {
+		final JobVertex jobVertex = createNoOpJobVertex();
+		final JobVertexID jobVertexId = jobVertex.getID();
+
+		final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
+
+		final CompletableFuture<SlotRequestId> slotRequestIdFuture = new CompletableFuture<>();
+		final CompletableFuture<SlotRequestId> returnedSlotFuture = new CompletableFuture<>();
+
+		final TestingSlotProvider slotProvider = new TestingSlotProvider(
+			(SlotRequestId slotRequestId) -> {
+				slotRequestIdFuture.complete(slotRequestId);
+				return new CompletableFuture<>();
+			});
+
+		slotProvider.setSlotCanceller(returnedSlotFuture::complete);
+		slotOwner.getReturnedSlotFuture().thenAccept(
+			(LogicalSlot logicalSlot) -> returnedSlotFuture.complete(logicalSlot.getSlotRequestId()));
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+
+		final Execution execution = executionVertex.getCurrentExecutionAttempt();
+
+		taskManagerGateway.setCancelConsumer(
+			executionAttemptID -> {
+				if (execution.getAttemptId().equals(executionAttemptID)) {
+					execution.cancelingComplete();
+				}
+			}
+		);
+
+		final ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+		try {
+			slotRequestIdFuture.thenAcceptAsync(
+				(SlotRequestId slotRequestId) -> {
+					final SingleLogicalSlot singleLogicalSlot = ExecutionGraphSchedulingTest.createSingleLogicalSlot(
+						slotOwner,
+						taskManagerGateway,
+						slotRequestId);
+					slotProvider.complete(slotRequestId, singleLogicalSlot);
+				},
+				executorService);
+
+			final CompletableFuture<Void> schedulingFuture = execution.scheduleForExecution(
+				slotProvider,
+				false,
+				LocationPreferenceConstraint.ANY);
+
+			try {
+				schedulingFuture.get();
+				// cancel the execution in case we could schedule the execution
+				execution.cancel();
+			} catch (ExecutionException ignored) {
+			}
+
+			assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get())));
+		} finally {
+			executorService.shutdownNow();
+		}
+	}
+
 	@Nonnull
 	private JobVertex createNoOpJobVertex() {
 		final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());


[03/11] flink git commit: [hotfix] Fix checkstyle violations in FutureUtils

Posted by tr...@apache.org.
[hotfix] Fix checkstyle violations in FutureUtils


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b47ded30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b47ded30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b47ded30

Branch: refs/heads/master
Commit: b47ded30e333a84481d1d6174bb7f203c44ed77e
Parents: 9afda73
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:20:53 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/concurrent/FutureUtils.java  | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b47ded30/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 3a7e800..d4a65de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -786,7 +785,7 @@ public class FutureUtils {
 		private final CompletableFuture<?> future;
 
 		private Timeout(CompletableFuture<?> future) {
-			this.future = Preconditions.checkNotNull(future);
+			this.future = checkNotNull(future);
 		}
 
 		@Override
@@ -800,8 +799,9 @@ public class FutureUtils {
 	 *
 	 * <p>This class creates a singleton scheduler used to run the provided actions.
 	 */
-	private static final class Delayer {
-		static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
+	private enum Delayer {
+		;
+		static final ScheduledThreadPoolExecutor DELAYER = new ScheduledThreadPoolExecutor(
 			1,
 			new ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler"));
 
@@ -814,10 +814,10 @@ public class FutureUtils {
 		 * @return Future of the scheduled action
 		 */
 		private static ScheduledFuture<?> delay(Runnable runnable, long delay, TimeUnit timeUnit) {
-			Preconditions.checkNotNull(runnable);
-			Preconditions.checkNotNull(timeUnit);
+			checkNotNull(runnable);
+			checkNotNull(timeUnit);
 
-			return delayer.schedule(runnable, delay, timeUnit);
+			return DELAYER.schedule(runnable, delay, timeUnit);
 		}
 	}
 }


[07/11] flink git commit: [hotfix] Replace check state condition in Execution#tryAssignResource with if check

Posted by tr...@apache.org.
[hotfix] Replace check state condition in Execution#tryAssignResource with if check

Instead of risking an IllegalStateException it is better to check that the
taskManagerLocationFuture has not been completed yet. If, then we also reject
the assignment of the LogicalSlot to the Execution. That way, we don't risk
that we don't release the slot in case of an exception in
Execution#allocateAndAssignSlotForExecution.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3422ee8b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3422ee8b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3422ee8b

Branch: refs/heads/master
Commit: 3422ee8b43b4512a5654b670c915c3642dce96e5
Parents: b47ded3
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:34:33 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3422ee8b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 853732f..57aa0d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -280,15 +280,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		// only allow to set the assigned resource in state SCHEDULED or CREATED
 		// note: we also accept resource assignment when being in state CREATED for testing purposes
 		if (state == SCHEDULED || state == CREATED) {
-			if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot) && logicalSlot.tryAssignPayload(this)) {
-				// check for concurrent modification (e.g. cancelling call)
-				if (state == SCHEDULED || state == CREATED) {
-					checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
-					taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
-					assignedAllocationID = logicalSlot.getAllocationId();
-					return true;
+			if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot)) {
+				if (logicalSlot.tryAssignPayload(this)) {
+					// check for concurrent modification (e.g. cancelling call)
+					if ((state == SCHEDULED || state == CREATED) && !taskManagerLocationFuture.isDone()) {
+						taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
+						assignedAllocationID = logicalSlot.getAllocationId();
+						return true;
+					} else {
+						// free assigned resource and return false
+						ASSIGNED_SLOT_UPDATER.set(this, null);
+						return false;
+					}
 				} else {
-					// free assigned resource and return false
 					ASSIGNED_SLOT_UPDATER.set(this, null);
 					return false;
 				}