You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/01 10:36:01 UTC

[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

    [ https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671441#comment-16671441 ] 

ASF GitHub Bot commented on FLINK-9635:
---------------------------------------

tillrohrmann closed pull request #6972: [FLINK-9635][scheduling] Avoid task spread-out in scheduling with loc…
URL: https://github.com/apache/flink/pull/6972
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index 7cb364d687f..ab682a02dd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Set;
 
 /**
  * A slot profile describes the profile of a slot into which a task wants to be scheduled. The profile contains
@@ -47,16 +48,30 @@
 
 	/** This contains desired allocation ids of the slot. */
 	@Nonnull
-	private final Collection<AllocationID> priorAllocations;
+	private final Collection<AllocationID> preferredAllocations;
+
+	/** This contains all prior allocation ids from the whole execution graph. */
+	@Nonnull
+	private final Set<AllocationID> previousExecutionGraphAllocations;
+
+	public SlotProfile(
+		@Nonnull ResourceProfile resourceProfile,
+		@Nonnull Collection<TaskManagerLocation> preferredLocations,
+		@Nonnull Collection<AllocationID> preferredAllocations) {
+
+		this(resourceProfile, preferredLocations, preferredAllocations, Collections.emptySet());
+	}
 
 	public SlotProfile(
 		@Nonnull ResourceProfile resourceProfile,
 		@Nonnull Collection<TaskManagerLocation> preferredLocations,
-		@Nonnull Collection<AllocationID> priorAllocations) {
+		@Nonnull Collection<AllocationID> preferredAllocations,
+		@Nonnull Set<AllocationID> previousExecutionGraphAllocations) {
 
 		this.resourceProfile = resourceProfile;
 		this.preferredLocations = preferredLocations;
-		this.priorAllocations = priorAllocations;
+		this.preferredAllocations = preferredAllocations;
+		this.previousExecutionGraphAllocations = previousExecutionGraphAllocations;
 	}
 
 	/**
@@ -79,8 +94,18 @@ public ResourceProfile getResourceProfile() {
 	 * Returns the desired allocation ids for the slot.
 	 */
 	@Nonnull
-	public Collection<AllocationID> getPriorAllocations() {
-		return priorAllocations;
+	public Collection<AllocationID> getPreferredAllocations() {
+		return preferredAllocations;
+	}
+
+	/**
+	 * Returns a set of all previous allocation ids from the execution graph.
+	 *
+	 * This is optional and can be empty if unused.
+	 */
+	@Nonnull
+	public Set<AllocationID> getPreviousExecutionGraphAllocations() {
+		return previousExecutionGraphAllocations;
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..cbf51037b8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -58,6 +58,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -65,6 +66,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -385,7 +387,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
 		return scheduleForExecution(
 			resourceProvider,
 			allowQueued,
-			LocationPreferenceConstraint.ANY);
+			LocationPreferenceConstraint.ANY,
+			Collections.emptySet());
 	}
 
 	/**
@@ -397,18 +400,22 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
 	 * @param queued Flag to indicate whether the scheduler may queue this task if it cannot
 	 *               immediately deploy it.
 	 * @param locationPreferenceConstraint constraint for the location preferences
+	 * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
+	 *                                                 Can be empty if the allocation ids are not required for scheduling.
 	 * @return Future which is completed once the Execution has been deployed
 	 */
 	public CompletableFuture<Void> scheduleForExecution(
 			SlotProvider slotProvider,
 			boolean queued,
-			LocationPreferenceConstraint locationPreferenceConstraint) {
+			LocationPreferenceConstraint locationPreferenceConstraint,
+			@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
 		final Time allocationTimeout = vertex.getExecutionGraph().getAllocationTimeout();
 		try {
 			final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(
 				slotProvider,
 				queued,
 				locationPreferenceConstraint,
+				allPreviousExecutionGraphAllocationIds,
 				allocationTimeout);
 
 			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
@@ -441,6 +448,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
 	 * @param slotProvider to obtain a new slot from
 	 * @param queued if the allocation can be queued
 	 * @param locationPreferenceConstraint constraint for the location preferences
+	 * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
+	 *                                                 Can be empty if the allocation ids are not required for scheduling.
 	 * @param allocationTimeout rpcTimeout for allocating a new slot
 	 * @return Future which is completed with this execution once the slot has been assigned
 	 * 			or with an exception if an error occurred.
@@ -450,6 +459,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
 			SlotProvider slotProvider,
 			boolean queued,
 			LocationPreferenceConstraint locationPreferenceConstraint,
+			@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds,
 			Time allocationTimeout) throws IllegalExecutionStateException {
 
 		checkNotNull(slotProvider);
@@ -495,7 +505,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) {
 							new SlotProfile(
 								ResourceProfile.UNKNOWN,
 								preferredLocations,
-								previousAllocationIDs),
+								previousAllocationIDs,
+								allPreviousExecutionGraphAllocationIds),
 							allocationTimeout));
 
 			// register call back to cancel slot request in case that the execution gets canceled
@@ -739,7 +750,8 @@ else if (numConsumers == 0) {
 							consumerVertex.scheduleForExecution(
 								executionGraph.getSlotProvider(),
 								executionGraph.isQueuedSchedulingAllowed(),
-								LocationPreferenceConstraint.ANY); // there must be at least one known location
+								LocationPreferenceConstraint.ANY, // there must be at least one known location
+								Collections.emptySet());
 						} catch (Throwable t) {
 							consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
 									"vertex " + consumerVertex, t));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0be1ff27420..3b55e009116 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -40,6 +40,7 @@
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
@@ -60,6 +61,7 @@
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -91,6 +93,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -902,14 +905,14 @@ public void scheduleForExecution() throws JobException {
 	private CompletableFuture<Void> scheduleLazy(SlotProvider slotProvider) {
 
 		final ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<>(numVerticesTotal);
-
 		// simply take the vertices without inputs.
 		for (ExecutionJobVertex ejv : verticesInCreationOrder) {
 			if (ejv.getJobVertex().isInputVertex()) {
 				final CompletableFuture<Void> schedulingJobVertexFuture = ejv.scheduleAll(
 					slotProvider,
 					allowQueuedScheduling,
-					LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty
+					LocationPreferenceConstraint.ALL,// since it is an input vertex, the input based location preferences should be empty
+					Collections.emptySet());
 
 				schedulingFutures.add(schedulingJobVertexFuture);
 			}
@@ -939,6 +942,9 @@ public void scheduleForExecution() throws JobException {
 		// collecting all the slots may resize and fail in that operation without slots getting lost
 		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
+		final Set<AllocationID> allPreviousAllocationIds =
+			Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());
+
 		// allocate the slots (obtain all their futures
 		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
 			// these calls are not blocking, they only return futures
@@ -946,7 +952,8 @@ public void scheduleForExecution() throws JobException {
 				slotProvider,
 				queued,
 				LocationPreferenceConstraint.ALL,
-				allocationTimeout);
+				allPreviousAllocationIds,
+				timeout);
 
 			allAllocationFutures.addAll(allocationFutures);
 		}
@@ -1676,6 +1683,35 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
 		}
 	}
 
+	/**
+	 * Computes and returns a set with the prior allocation ids from all execution vertices in the graph.
+	 */
+	private Set<AllocationID> computeAllPriorAllocationIds() {
+		HashSet<AllocationID> allPreviousAllocationIds = new HashSet<>(getNumberOfExecutionJobVertices());
+		for (ExecutionVertex executionVertex : getAllExecutionVertices()) {
+			AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation();
+			if (latestPriorAllocation != null) {
+				allPreviousAllocationIds.add(latestPriorAllocation);
+			}
+		}
+		return allPreviousAllocationIds;
+	}
+
+	/**
+	 * Returns the result of {@link #computeAllPriorAllocationIds()}, but only if the scheduling really requires it.
+	 * Otherwise this method simply returns an empty set.
+	 */
+	private Set<AllocationID> computeAllPriorAllocationIdsIfRequiredByScheduling() {
+		// This is a temporary optimization to avoid computing all previous allocations if not required
+		// This can go away when we progress with the implementation of the Scheduler.
+		if (slotProvider instanceof SlotPool.ProviderAndOwner
+			&& ((SlotPool.ProviderAndOwner) slotProvider).requiresPreviousAllocationsForScheduling()) {
+			return computeAllPriorAllocationIds();
+		} else {
+			return Collections.emptySet();
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6da1e0db892..2ab1d686410 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -34,6 +34,7 @@
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -54,6 +55,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -64,6 +66,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -474,12 +477,15 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
 	 * @param slotProvider to allocate the slots from
 	 * @param queued if the allocations can be queued
 	 * @param locationPreferenceConstraint constraint for the location preferences
+	 * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
+	 *                                                 Can be empty if the allocation ids are not required for scheduling.
 	 * @return Future which is completed once all {@link Execution} could be deployed
 	 */
 	public CompletableFuture<Void> scheduleAll(
 			SlotProvider slotProvider,
 			boolean queued,
-			LocationPreferenceConstraint locationPreferenceConstraint) {
+			LocationPreferenceConstraint locationPreferenceConstraint,
+			@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
 
 		final ExecutionVertex[] vertices = this.taskVertices;
 
@@ -487,7 +493,11 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
 
 		// kick off the tasks
 		for (ExecutionVertex ev : vertices) {
-			scheduleFutures.add(ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint));
+			scheduleFutures.add(ev.scheduleForExecution(
+				slotProvider,
+				queued,
+				locationPreferenceConstraint,
+				allPreviousExecutionGraphAllocationIds));
 		}
 
 		return FutureUtils.waitForAll(scheduleFutures);
@@ -503,12 +513,14 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
 	 * @param resourceProvider The resource provider from whom the slots are requested.
 	 * @param queued if the allocation can be queued
 	 * @param locationPreferenceConstraint constraint for the location preferences
+	 * @param allPreviousExecutionGraphAllocationIds the allocation ids of all previous executions in the execution job graph.
 	 * @param allocationTimeout timeout for allocating the individual slots
 	 */
 	public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
 			SlotProvider resourceProvider,
 			boolean queued,
 			LocationPreferenceConstraint locationPreferenceConstraint,
+			@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds,
 			Time allocationTimeout) {
 		final ExecutionVertex[] vertices = this.taskVertices;
 		final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];
@@ -522,6 +534,7 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
 				resourceProvider,
 				queued,
 				locationPreferenceConstraint,
+				allPreviousExecutionGraphAllocationIds,
 				allocationTimeout);
 			slots[i] = allocationFuture;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e4228011830..a0747296c53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -56,6 +56,7 @@
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -617,17 +618,21 @@ public Execution resetForNewExecution(final long timestamp, final long originati
 	 * @param slotProvider to allocate the slots from
 	 * @param queued if the allocation can be queued
 	 * @param locationPreferenceConstraint constraint for the location preferences
+	 * @param allPreviousExecutionGraphAllocationIds set with all previous allocation ids in the job graph.
+	 *                                                 Can be empty if the allocation ids are not required for scheduling.
 	 * @return Future which is completed once the execution is deployed. The future
 	 * can also completed exceptionally.
 	 */
 	public CompletableFuture<Void> scheduleForExecution(
 			SlotProvider slotProvider,
 			boolean queued,
-			LocationPreferenceConstraint locationPreferenceConstraint) {
+			LocationPreferenceConstraint locationPreferenceConstraint,
+			@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
 		return this.currentExecution.scheduleForExecution(
 			slotProvider,
 			queued,
-			locationPreferenceConstraint);
+			locationPreferenceConstraint,
+			allPreviousExecutionGraphAllocationIds);
 	}
 
 	@VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 0b00c0e039d..f3ba48e75ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.failover;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -212,6 +213,15 @@ private void restart(long globalModVersionOfFailover) {
 							connectedExecutionVertexes, false, false);
 				}
 				*/
+
+				HashSet<AllocationID> previousAllocationsInRegion = new HashSet<>(connectedExecutionVertexes.size());
+				for (ExecutionVertex connectedExecutionVertex : connectedExecutionVertexes) {
+					AllocationID latestPriorAllocation = connectedExecutionVertex.getLatestPriorAllocation();
+					if (latestPriorAllocation != null) {
+						previousAllocationsInRegion.add(latestPriorAllocation);
+					}
+				}
+
 				//TODO, use restart strategy to schedule them.
 				//restart all connected ExecutionVertexes
 				for (ExecutionVertex ev : connectedExecutionVertexes) {
@@ -219,7 +229,8 @@ private void restart(long globalModVersionOfFailover) {
 						ev.scheduleForExecution(
 							executionGraph.getSlotProvider(),
 							executionGraph.isQueuedSchedulingAllowed(),
-							LocationPreferenceConstraint.ANY); // some inputs not belonging to the failover region might have failed concurrently
+							LocationPreferenceConstraint.ANY,
+							previousAllocationsInRegion); // some inputs not belonging to the failover region might have failed concurrently
 					}
 					catch (Throwable e) {
 						failover(globalModVersionOfFailover);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
index 95dd1f6f9e6..282fd2ccf4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
index 38781676842..8777edd2fe7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java
@@ -18,37 +18,14 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 /**
  * Interface for the context of a {@link LogicalSlot}. This context contains information
  * about the underlying allocated slot and how to communicate with the TaskManager on which
  * it was allocated.
  */
-public interface SlotContext {
-	/**
-	 * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
-	 * physical slot.
-	 *
-	 * @return The id under which the slot has been allocated on the TaskManager
-	 */
-	AllocationID getAllocationId();
-
-	/**
-	 * Gets the location info of the TaskManager that offers this slot.
-	 *
-	 * @return The location info of the TaskManager that offers this slot
-	 */
-	TaskManagerLocation getTaskManagerLocation();
-
-	/**
-	 * Gets the number of the slot.
-	 *
-	 * @return The number of the slot on the TaskManager.
-	 */
-	int getPhysicalSlotNumber();
+public interface SlotContext extends SlotInfo {
 
 	/**
 	 * Gets the actor gateway that can be used to send messages to the TaskManager.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
new file mode 100644
index 00000000000..fd33aacfd6c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface that provides basic information in the context of a slot.
+ */
+public interface SlotInfo {
+
+	/**
+	 * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
+	 * physical slot.
+	 *
+	 * @return The id under which the slot has been allocated on the TaskManager
+	 */
+	AllocationID getAllocationId();
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
+	 *
+	 * @return The location info of the TaskManager that offers this slot
+	 */
+	TaskManagerLocation getTaskManagerLocation();
+
+	/**
+	 * Gets the number of the slot.
+	 *
+	 * @return The number of the slot on the TaskManager.
+	 */
+	int getPhysicalSlotNumber();
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
index 75195cd9378..e4e583c09d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlot.java
@@ -92,6 +92,7 @@ public SlotID getSlotId() {
 	 * 
 	 * @return The ID under which the slot is allocated
 	 */
+	@Override
 	public AllocationID getAllocationId() {
 		return allocationId;
 	}
@@ -121,6 +122,7 @@ public ResourceProfile getResourceProfile() {
 	 *
 	 * @return The location info of the TaskManager that offers this slot
 	 */
+	@Override
 	public TaskManagerLocation getTaskManagerLocation() {
 		return taskManagerLocation;
 	}
@@ -132,6 +134,7 @@ public TaskManagerLocation getTaskManagerLocation() {
 	 *
 	 * @return The actor gateway that can be used to send messages to the TaskManager.
 	 */
+	@Override
 	public TaskManagerGateway getTaskManagerGateway() {
 		return taskManagerGateway;
 	}
@@ -142,6 +145,7 @@ public TaskManagerGateway getTaskManagerGateway() {
 	 *
 	 * @return Physical slot number of the allocated slot
 	 */
+	@Override
 	public int getPhysicalSlotNumber() {
 		return physicalSlotNumber;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
index 25e884c32dd..bc90be65ac5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
@@ -21,7 +21,7 @@
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import javax.annotation.Nonnull;
@@ -34,6 +34,7 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /**
@@ -54,11 +55,26 @@
 	@Override
 	public <IN, OUT> OUT findMatchWithLocality(
 			@Nonnull SlotProfile slotProfile,
-			@Nonnull Stream<IN> candidates,
-			@Nonnull Function<IN, SlotContext> contextExtractor,
+			@Nonnull Supplier<Stream<IN>> candidates,
+			@Nonnull Function<IN, SlotInfo> contextExtractor,
 			@Nonnull Predicate<IN> additionalRequirementsFilter,
 			@Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
 
+		return doFindMatchWithLocality(
+			slotProfile,
+			candidates.get(),
+			contextExtractor,
+			additionalRequirementsFilter,
+			resultProducer);
+	}
+
+	@Nullable
+	protected  <IN, OUT> OUT doFindMatchWithLocality(
+		@Nonnull SlotProfile slotProfile,
+		@Nonnull Stream<IN> candidates,
+		@Nonnull Function<IN, SlotInfo> contextExtractor,
+		@Nonnull Predicate<IN> additionalRequirementsFilter,
+		@Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
 		Collection<TaskManagerLocation> locationPreferences = slotProfile.getPreferredLocations();
 
 		// if we have no location preferences, we can only filter by the additional requirements.
@@ -88,7 +104,7 @@
 		while (iterator.hasNext()) {
 			IN candidate = iterator.next();
 			if (additionalRequirementsFilter.test(candidate)) {
-				SlotContext slotContext = contextExtractor.apply(candidate);
+				SlotInfo slotContext = contextExtractor.apply(candidate);
 
 				// this gets candidate is local-weigh
 				Integer localWeigh = preferredResourceIDs.getOrDefault(slotContext.getTaskManagerLocation().getResourceID(), 0);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
index 9b1872ec363..d2193ad7022 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
@@ -21,15 +21,17 @@
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /**
@@ -48,35 +50,68 @@ private PreviousAllocationSchedulingStrategy() {}
 	@Override
 	public <IN, OUT> OUT findMatchWithLocality(
 			@Nonnull SlotProfile slotProfile,
-			@Nonnull Stream<IN> candidates,
-			@Nonnull Function<IN, SlotContext> contextExtractor,
+			@Nonnull Supplier<Stream<IN>> candidates,
+			@Nonnull Function<IN, SlotInfo> contextExtractor,
 			@Nonnull Predicate<IN> additionalRequirementsFilter,
 			@Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
 
-		Collection<AllocationID> priorAllocations = slotProfile.getPriorAllocations();
+		Collection<AllocationID> priorAllocations = slotProfile.getPreferredAllocations();
 
 		if (priorAllocations.isEmpty()) {
-			return super.findMatchWithLocality(slotProfile, candidates, contextExtractor, additionalRequirementsFilter, resultProducer);
+			return super.findMatchWithLocality(
+				slotProfile,
+				candidates,
+				contextExtractor,
+				additionalRequirementsFilter,
+				resultProducer);
 		} else {
-			return findPreviousAllocation(candidates, contextExtractor, additionalRequirementsFilter, resultProducer, priorAllocations);
+			return findPreviousAllocation(
+				slotProfile,
+				candidates,
+				contextExtractor,
+				additionalRequirementsFilter,
+				resultProducer,
+				priorAllocations);
 		}
 	}
 
 	@Nullable
 	private <IN, OUT> OUT findPreviousAllocation(
-			@Nonnull Stream<IN> candidates,
-			@Nonnull Function<IN, SlotContext> contextExtractor,
+			@Nonnull SlotProfile slotProfile,
+			@Nonnull Supplier<Stream<IN>> candidates,
+			@Nonnull Function<IN, SlotInfo> contextExtractor,
 			@Nonnull Predicate<IN> additionalRequirementsFilter,
 			@Nonnull BiFunction<IN, Locality, OUT> resultProducer,
-			Collection<AllocationID> priorAllocations) {
+			@Nonnull Collection<AllocationID> priorAllocations) {
+
 		Predicate<IN> filterByAllocation =
-			(candidate) -> priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
+			(IN candidate) -> priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
 
-		return candidates
+		OUT previousAllocationCandidate = candidates
+			.get()
 			.filter(filterByAllocation.and(additionalRequirementsFilter))
 			.findFirst()
-			.map((result) -> resultProducer.apply(result, Locality.LOCAL)) // TODO introduce special locality?
+			.map((IN result) -> resultProducer.apply(result, Locality.LOCAL)) // TODO introduce special locality?
 			.orElse(null);
+
+		if (previousAllocationCandidate != null) {
+			return previousAllocationCandidate;
+		}
+
+		Set<AllocationID> blackListedAllocationIDs = slotProfile.getPreviousExecutionGraphAllocations();
+		Stream<IN> candidateStream = candidates.get();
+		if (!blackListedAllocationIDs.isEmpty()) {
+			candidateStream = candidateStream.filter(
+				(IN candidate) -> !blackListedAllocationIDs.contains(
+					contextExtractor.apply(candidate).getAllocationId()));
+		}
+
+		return doFindMatchWithLocality(
+			slotProfile,
+			candidateStream,
+			contextExtractor,
+			additionalRequirementsFilter,
+			resultProducer);
 	}
 
 	public static PreviousAllocationSchedulingStrategy getInstance() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
index fb27a214eff..89b2d05cada 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
@@ -21,6 +21,7 @@
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -28,6 +29,7 @@
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /**
@@ -53,8 +55,8 @@
 	@Nullable
 	<IN, OUT> OUT findMatchWithLocality(
 		@Nonnull SlotProfile slotProfile,
-		@Nonnull Stream<IN> candidates,
-		@Nonnull Function<IN, SlotContext> contextExtractor,
+		@Nonnull Supplier<Stream<IN>> candidates,
+		@Nonnull Function<IN, SlotInfo> contextExtractor,
 		@Nonnull Predicate<IN> additionalRequirementsFilter,
 		@Nonnull BiFunction<IN, Locality, OUT> resultProducer);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 13f0462455c..fdf0fa9e485 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -54,6 +54,7 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -170,7 +171,9 @@ public SlotPool(
 		this.pendingRequests = new DualKeyMap<>(16);
 		this.waitingForResourceManager = new HashMap<>(16);
 
-		this.providerAndOwner = new ProviderAndOwner(getSelfGateway(SlotPoolGateway.class));
+		this.providerAndOwner = new ProviderAndOwner(
+			getSelfGateway(SlotPoolGateway.class),
+			schedulingStrategy instanceof PreviousAllocationSchedulingStrategy);
 
 		this.slotSharingManagers = new HashMap<>(4);
 
@@ -325,76 +328,93 @@ public void disconnectResourceManager() {
 
 		log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute());
 
-		final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
+		if (task.getSlotSharingGroupId() == null) {
+			return allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout);
+		} else {
+			return allocateSharedSlot(slotRequestId, task, slotProfile, allowQueuedScheduling, allocationTimeout);
+		}
+	}
 
-		if (slotSharingGroupId != null) {
-			// allocate slot with slot sharing
-			final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
-				slotSharingGroupId,
-				id -> new SlotSharingManager(
-					id,
-					this,
-					providerAndOwner));
-
-			final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
-
-			try {
-				if (task.getCoLocationConstraint() != null) {
-					multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
-						task.getCoLocationConstraint(),
-						multiTaskSlotManager,
-						slotProfile,
-						allowQueuedScheduling,
-						allocationTimeout);
+	private CompletableFuture<LogicalSlot> allocateSingleSlot(
+		SlotRequestId slotRequestId,
+		SlotProfile slotProfile,
+		boolean allowQueuedScheduling,
+		Time allocationTimeout) {
+		// request an allocated slot to assign a single logical slot to
+		CompletableFuture<SlotAndLocality> slotAndLocalityFuture = requestAllocatedSlot(
+			slotRequestId,
+			slotProfile,
+			allowQueuedScheduling,
+			allocationTimeout);
+
+		return slotAndLocalityFuture.thenApply(
+			(SlotAndLocality slotAndLocality) -> {
+				final AllocatedSlot allocatedSlot = slotAndLocality.getSlot();
+
+				final SingleLogicalSlot singleTaskSlot = new SingleLogicalSlot(
+					slotRequestId,
+					allocatedSlot,
+					null,
+					slotAndLocality.getLocality(),
+					providerAndOwner);
+
+				if (allocatedSlot.tryAssignPayload(singleTaskSlot)) {
+					return singleTaskSlot;
 				} else {
-					multiTaskSlotLocality = allocateMultiTaskSlot(
-						task.getJobVertexId(),
-						multiTaskSlotManager,
-						slotProfile,
-						allowQueuedScheduling,
-						allocationTimeout);
+					final FlinkException flinkException =
+						new FlinkException("Could not assign payload to allocated slot " + allocatedSlot.getAllocationId() + '.');
+					releaseSingleSlot(slotRequestId, flinkException);
+					throw new CompletionException(flinkException);
 				}
-			} catch (NoResourceAvailableException noResourceException) {
-				return FutureUtils.completedExceptionally(noResourceException);
-			}
+			});
+	}
 
-			// sanity check
-			Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(task.getJobVertexId()));
+	private CompletableFuture<LogicalSlot> allocateSharedSlot(
+		SlotRequestId slotRequestId,
+		ScheduledUnit task,
+		SlotProfile slotProfile,
+		boolean allowQueuedScheduling,
+		Time allocationTimeout) {
 
-			final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
-				slotRequestId,
-				task.getJobVertexId(),
-				multiTaskSlotLocality.getLocality());
+		// allocate slot with slot sharing
+		final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
+			task.getSlotSharingGroupId(),
+			id -> new SlotSharingManager(
+				id,
+				this,
+				providerAndOwner));
 
-			return leaf.getLogicalSlotFuture();
-		} else {
-			// request an allocated slot to assign a single logical slot to
-			CompletableFuture<SlotAndLocality> slotAndLocalityFuture = requestAllocatedSlot(
-				slotRequestId,
-				slotProfile,
-				allowQueuedScheduling,
-				allocationTimeout);
+		final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
 
-			return slotAndLocalityFuture.thenApply(
-				(SlotAndLocality slotAndLocality) -> {
-					final AllocatedSlot allocatedSlot = slotAndLocality.getSlot();
+		try {
+			if (task.getCoLocationConstraint() != null) {
+				multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
+					task.getCoLocationConstraint(),
+					multiTaskSlotManager,
+					slotProfile,
+					allowQueuedScheduling,
+					allocationTimeout);
+			} else {
+				multiTaskSlotLocality = allocateMultiTaskSlot(
+					task.getJobVertexId(),
+					multiTaskSlotManager,
+					slotProfile,
+					allowQueuedScheduling,
+					allocationTimeout);
+			}
+		} catch (NoResourceAvailableException noResourceException) {
+			return FutureUtils.completedExceptionally(noResourceException);
+		}
 
-					final SingleLogicalSlot singleTaskSlot = new SingleLogicalSlot(
-						slotRequestId,
-						allocatedSlot,
-						null,
-						slotAndLocality.getLocality(),
-						providerAndOwner);
+		// sanity check
+		Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(task.getJobVertexId()));
 
-					if (allocatedSlot.tryAssignPayload(singleTaskSlot)) {
-						return singleTaskSlot;
-					} else {
-						final FlinkException flinkException = new FlinkException("Could not assign payload to allocated slot " + allocatedSlot.getAllocationId() + '.');
-						releaseSlot(slotRequestId, null, flinkException);
-						throw new CompletionException(flinkException);
-					}
-				});
-		}
+		final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
+			slotRequestId,
+			task.getJobVertexId(),
+			multiTaskSlotLocality.getLocality());
+
+		return leaf.getLogicalSlotFuture();
 	}
 
 	/**
@@ -438,12 +458,13 @@ public void disconnectResourceManager() {
 			slotProfile = new SlotProfile(
 				slotProfile.getResourceProfile(),
 				Collections.singleton(coLocationConstraint.getLocation()),
-				slotProfile.getPriorAllocations());
+				slotProfile.getPreferredAllocations());
 		}
 
 		// get a new multi task slot
 		final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot(
-			coLocationConstraint.getGroupId(), multiTaskSlotManager,
+			coLocationConstraint.getGroupId(),
+			multiTaskSlotManager,
 			slotProfile,
 			allowQueuedScheduling,
 			allocationTimeout);
@@ -548,9 +569,8 @@ public void disconnectResourceManager() {
 		if (multiTaskSlotLocality != null) {
 			// prefer slot sharing group slots over unused slots
 			if (polledSlotAndLocality != null) {
-				releaseSlot(
+				releaseSingleSlot(
 					allocatedSlotRequestId,
-					null,
 					new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
 			}
 			return multiTaskSlotLocality;
@@ -587,9 +607,8 @@ public void disconnectResourceManager() {
 								}
 							}
 						} else {
-							releaseSlot(
+							releaseSingleSlot(
 								allocatedSlotRequestId,
-								null,
 								new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.'));
 						}
 					});
@@ -741,41 +760,56 @@ private void stashRequestWaitingForResourceManager(final PendingRequest pendingR
 	// ------------------------------------------------------------------------
 
 	@Override
-	public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
+	public CompletableFuture<Acknowledge> releaseSlot(
+		SlotRequestId slotRequestId,
+		@Nullable SlotSharingGroupId slotSharingGroupId,
+		Throwable cause) {
+
 		log.debug("Releasing slot [{}] because: {}", slotRequestId, cause != null ? cause.getMessage() : "null");
 
 		if (slotSharingGroupId != null) {
-			final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
+			releaseSharedSlot(slotRequestId, slotSharingGroupId, cause);
+		} else {
+			releaseSingleSlot(slotRequestId, cause);
+		}
 
-			if (multiTaskSlotManager != null) {
-				final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(slotRequestId);
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
 
-				if (taskSlot != null) {
-					taskSlot.release(cause);
-				} else {
-					log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
-				}
+	private void releaseSharedSlot(
+		SlotRequestId slotRequestId,
+		@Nonnull SlotSharingGroupId slotSharingGroupId, Throwable cause) {
+
+		final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
+
+		if (multiTaskSlotManager != null) {
+			final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(slotRequestId);
+
+			if (taskSlot != null) {
+				taskSlot.release(cause);
 			} else {
-				log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId);
+				log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
 			}
 		} else {
-			final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
+			log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId);
+		}
+	}
 
-			if (pendingRequest != null) {
-				failPendingRequest(pendingRequest, new FlinkException("Pending slot request with " + slotRequestId + " has been released."));
-			} else {
-				final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
+	private void releaseSingleSlot(SlotRequestId slotRequestId, Throwable cause) {
+		final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
 
-				if (allocatedSlot != null) {
-					allocatedSlot.releasePayload(cause);
-					tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
-				} else {
-					log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", slotRequestId);
-				}
+		if (pendingRequest != null) {
+			failPendingRequest(pendingRequest, new FlinkException("Pending slot request with " + slotRequestId + " has been released."));
+		} else {
+			final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
+
+			if (allocatedSlot != null) {
+				allocatedSlot.releasePayload(cause);
+				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
+			} else {
+				log.debug("There is no allocated slot [{}]. Ignoring the release slot request.", slotRequestId);
 			}
 		}
-
-		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	/**
@@ -875,17 +909,13 @@ private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
 		validateRunsInMainThread();
 
 		List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map(
-			offer -> {
-				CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(
-						taskManagerLocation,
-						taskManagerGateway,
-						offer)
-					.thenApply(
-						(acceptedSlot) -> acceptedSlot ? Optional.of(offer) : Optional.empty()
-					);
-
-				return acceptedSlotOffer;
-			}
+			offer -> offerSlot(
+					taskManagerLocation,
+					taskManagerGateway,
+					offer)
+				.<Optional<SlotOffer>>thenApply(
+					(acceptedSlot) -> acceptedSlot ? Optional.of(offer) : Optional.empty()
+				)
 		).collect(Collectors.toList());
 
 		CompletableFuture<Collection<Optional<SlotOffer>>> optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);
@@ -1359,11 +1389,7 @@ int size() {
 
 		@VisibleForTesting
 		Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
-			if (allocatedSlotsByTaskManager.containsKey(resourceId)) {
-				return allocatedSlotsByTaskManager.get(resourceId);
-			} else {
-				return Collections.emptySet();
-			}
+			return allocatedSlotsByTaskManager.getOrDefault(resourceId, Collections.emptySet());
 		}
 	}
 
@@ -1404,18 +1430,12 @@ void add(final AllocatedSlot slot, final long timestamp) {
 				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
 				final String host = slot.getTaskManagerLocation().getFQDNHostname();
 
-				Set<AllocatedSlot> slotsForTaskManager = availableSlotsByTaskManager.get(resourceID);
-				if (slotsForTaskManager == null) {
-					slotsForTaskManager = new HashSet<>();
-					availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
-				}
+				Set<AllocatedSlot> slotsForTaskManager =
+					availableSlotsByTaskManager.computeIfAbsent(resourceID, k -> new HashSet<>());
 				slotsForTaskManager.add(slot);
 
-				Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
-				if (slotsForHost == null) {
-					slotsForHost = new HashSet<>();
-					availableSlotsByHost.put(host, slotsForHost);
-				}
+				Set<AllocatedSlot> slotsForHost =
+					availableSlotsByHost.computeIfAbsent(host, k -> new HashSet<>());
 				slotsForHost.add(slot);
 			}
 			else {
@@ -1456,7 +1476,7 @@ SlotAndLocality poll(SchedulingStrategy schedulingStrategy, SlotProfile slotProf
 
 			SlotAndLocality matchingSlotAndLocality = schedulingStrategy.findMatchWithLocality(
 				slotProfile,
-				slotAndTimestamps.stream(),
+				slotAndTimestamps::stream,
 				SlotAndTimestamp::slot,
 				(SlotAndTimestamp slot) -> slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()),
 				(SlotAndTimestamp slotAndTimestamp, Locality locality) -> {
@@ -1563,12 +1583,18 @@ void clear() {
 	 * An implementation of the {@link SlotOwner} and {@link SlotProvider} interfaces
 	 * that delegates methods as RPC calls to the SlotPool's RPC gateway.
 	 */
-	private static class ProviderAndOwner implements SlotOwner, SlotProvider {
+	public static class ProviderAndOwner implements SlotOwner, SlotProvider {
 
 		private final SlotPoolGateway gateway;
+		private final boolean requiresPreviousAllocationsForScheduling;
 
-		ProviderAndOwner(SlotPoolGateway gateway) {
+		ProviderAndOwner(SlotPoolGateway gateway, boolean requiresPreviousAllocationsForScheduling) {
 			this.gateway = gateway;
+			this.requiresPreviousAllocationsForScheduling = requiresPreviousAllocationsForScheduling;
+		}
+
+		public boolean requiresPreviousAllocationsForScheduling() {
+			return requiresPreviousAllocationsForScheduling;
 		}
 
 		@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f1064..80c2dd2d708 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -197,7 +197,7 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy
 			Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
 			return matcher.findMatchWithLocality(
 				slotProfile,
-				resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+				() ->resolvedRootSlotsValues.stream().flatMap(Collection::stream),
 				(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
 				(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
 				MultiTaskSlotLocality::of);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
index 6a826aa23e1..ee491499b77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
@@ -124,18 +124,59 @@ public void matchPreviousAllocationOverridesPreferredLocation() {
 	}
 
 	@Test
-	public void matchPreviousLocationNotAvailable() {
+	public void matchPreviousLocationNotAvailableButByLocality() {
 
 		SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml4), Collections.singletonList(aidX));
 		SlotContext match = runMatching(slotProfile);
 
-		Assert.assertEquals(null, match);
+		Assert.assertEquals(ssc4, match);
+	}
+
+	@Test
+	public void matchPreviousLocationNotAvailableAndAllOthersBlacklisted() {
+		HashSet<AllocationID> blacklisted = new HashSet<>(4);
+		blacklisted.add(aid1);
+		blacklisted.add(aid2);
+		blacklisted.add(aid3);
+		blacklisted.add(aid4);
+		SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml4), Collections.singletonList(aidX), blacklisted);
+		SlotContext match = runMatching(slotProfile);
+
+		// there should be no valid option left and we expect null as return
+		Assert.assertNull(match);
+	}
+
+	@Test
+	public void matchPreviousLocationNotAvailableAndSomeOthersBlacklisted() {
+		HashSet<AllocationID> blacklisted = new HashSet<>(3);
+		blacklisted.add(aid1);
+		blacklisted.add(aid3);
+		blacklisted.add(aid4);
+		SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml4), Collections.singletonList(aidX), blacklisted);
+		SlotContext match = runMatching(slotProfile);
+
+		// we expect that the candidate that is not blacklisted is returned
+		Assert.assertEquals(ssc2, match);
+	}
+
+	@Test
+	public void matchPreviousLocationAvailableButAlsoBlacklisted() {
+		HashSet<AllocationID> blacklisted = new HashSet<>(4);
+		blacklisted.add(aid1);
+		blacklisted.add(aid2);
+		blacklisted.add(aid3);
+		blacklisted.add(aid4);
+		SlotProfile slotProfile = new SlotProfile(resourceProfile, Collections.singletonList(tml3), Collections.singletonList(aid3), blacklisted);
+		SlotContext match = runMatching(slotProfile);
+
+		// available previous allocation should override blacklisting
+		Assert.assertEquals(ssc3, match);
 	}
 
 	private SlotContext runMatching(SlotProfile slotProfile) {
 		return schedulingStrategy.findMatchWithLocality(
 			slotProfile,
-			candidates.stream(),
+			candidates::stream,
 			(candidate) -> candidate,
 			(candidate) -> true,
 			(candidate, locality) -> candidate);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 56fd7e12369..74724629eea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -107,6 +107,7 @@ public void testSlotReleaseOnFailedResourceAssignment() throws Exception {
 			slotProvider,
 			false,
 			LocationPreferenceConstraint.ALL,
+			Collections.emptySet(),
 			TestingUtils.infiniteTime());
 
 		assertFalse(allocationFuture.isDone());
@@ -156,6 +157,7 @@ public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception
 			slotProvider,
 			false,
 			LocationPreferenceConstraint.ALL,
+			Collections.emptySet(),
 			TestingUtils.infiniteTime());
 
 		assertTrue(allocationFuture.isDone());
@@ -205,6 +207,7 @@ public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
 			slotProvider,
 			false,
 			LocationPreferenceConstraint.ALL,
+			Collections.emptySet(),
 			TestingUtils.infiniteTime());
 
 		assertTrue(allocationFuture.isDone());
@@ -254,6 +257,7 @@ public void testSlotAllocationCancellationWhenExecutionCancelled() throws Except
 			slotProvider,
 			false,
 			LocationPreferenceConstraint.ALL,
+			Collections.emptySet(),
 			TestingUtils.infiniteTime());
 
 		assertThat(allocationFuture.isDone(), is(false));
@@ -357,7 +361,7 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
 
 		ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
 
-		executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get();
+		executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
 
 		Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
 
@@ -417,7 +421,7 @@ public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
 		assertThat(execution.getTaskRestore(), is(notNullValue()));
 
 		// schedule the execution vertex and wait for its deployment
-		executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get();
+		executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY, Collections.emptySet()).get();
 
 		assertThat(execution.getTaskRestore(), is(nullValue()));
 	}
@@ -479,7 +483,8 @@ public void testEagerSchedulingFailureReturnsSlot() throws Exception {
 			final CompletableFuture<Void> schedulingFuture = execution.scheduleForExecution(
 				slotProvider,
 				false,
-				LocationPreferenceConstraint.ANY);
+				LocationPreferenceConstraint.ANY,
+				Collections.emptySet());
 
 			try {
 				schedulingFuture.get();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index cd613f0f50a..41894767b6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -40,6 +40,7 @@
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import scala.concurrent.ExecutionContext;
 
@@ -454,7 +455,7 @@ public void testScheduleOrDeployAfterCancel() {
 			// it can occur as the result of races
 			{
 				Scheduler scheduler = mock(Scheduler.class);
-				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
+				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
 
 				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			}
@@ -493,7 +494,7 @@ public void testActionsWhileCancelling() {
 				setVertexState(vertex, ExecutionState.CANCELING);
 
 				Scheduler scheduler = mock(Scheduler.class);
-				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
+				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
 			}
 			catch (Exception e) {
 				fail("should not throw an exception");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 51d1827e668..6bfcb7ff5d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -36,6 +36,7 @@
 
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
@@ -71,7 +72,7 @@ public void testSlotReleasedWhenScheduledImmediately() {
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
+			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
 
 			// will have failed
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
@@ -103,7 +104,7 @@ public void testSlotReleasedWhenScheduledQueued() {
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, true, LocationPreferenceConstraint.ALL);
+			vertex.scheduleForExecution(scheduler, true, LocationPreferenceConstraint.ALL, Collections.emptySet());
 
 			// future has not yet a slot
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -138,7 +139,7 @@ public void testScheduleToDeploying() {
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
+			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, Collections.emptySet());
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 		}
 		catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index c411393990c..a53debfd0b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -49,6 +49,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -157,7 +158,7 @@ public void testMultiRegionsFailover() throws Exception {
 
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
 
-		ev21.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL);
+		ev21.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL, Collections.emptySet());
 		ev21.getCurrentExecutionAttempt().fail(new Exception("New fail"));
 		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
@@ -170,7 +171,7 @@ public void testMultiRegionsFailover() throws Exception {
 
 		ev11.getCurrentExecutionAttempt().markFinished();
 		ev21.getCurrentExecutionAttempt().markFinished();
-		ev22.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL);
+		ev22.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL, Collections.emptySet());
 		ev22.getCurrentExecutionAttempt().markFinished();
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
index 0344d71879b..fb28c2c75a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
@@ -41,7 +41,6 @@
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
@@ -50,6 +49,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -72,13 +72,26 @@ public void testDisablingLocalRecovery() throws Exception {
 
 	/**
 	 * Tests that if local recovery is enabled we won't spread
-	 * out tasks when recovering.
+	 * out tasks when recovering for global failover.
 	 */
 	@Test
-	@Ignore("The test should not pass until FLINK-9635 has been fixed")
-	public void testLocalRecovery() throws Exception {
+	public void testLocalRecoveryFull() throws Exception {
+		testLocalRecoveryInternal("full");
+	}
+
+	/**
+	 * Tests that if local recovery is enabled we won't spread
+	 * out tasks when recovering for regional failover.
+	 */
+	@Test
+	public void testLocalRecoveryRegion() throws Exception {
+		testLocalRecoveryInternal("region");
+	}
+
+	private void testLocalRecoveryInternal(String failoverStrategyValue) throws Exception {
 		final Configuration configuration = new Configuration();
 		configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
+		configuration.setString(EXECUTION_FAILOVER_STRATEGY.key(), failoverStrategyValue);
 
 		executeSchedulingTest(configuration);
 	}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> -------------------------------------------------------
>
>                 Key: FLINK-9635
>                 URL: https://issues.apache.org/jira/browse/FLINK-9635
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0, 1.6.2
>            Reporter: Till Rohrmann
>            Assignee: Stefan Richter
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such that it tries to be rescheduled to its previous location. In order to not occupy slots which have state of other tasks cached, the strategy will request a new slot if the old slot identified by the previous allocation id is no longer present. This also applies to newly allocated slots because there is no distinction between new or already used. This behaviour can cause that every tasks gets deployed to its own slot if the {{SlotPool}} has released all slots in the meantime, for example. The consequence could be that a job can no longer be executed after a failure because it needs more slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)