You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/27 13:10:43 UTC

[3/4] flink git commit: [FLINK-9634] Disable local recovery scheduling if local recovery is disabled

[FLINK-9634] Disable local recovery scheduling if local recovery is disabled

Introduce a SchedulingStrategy which is used by the SlotPool to schedule tasks.
The default implementation is LocationPreferenceSchedulingStrategy which tries
to schedule tasks to their preferred locations. In order to support local recovery
the PreviousAllocationSchedulingStrategy schedules tasks to their previous
allocation.

The scheduling strategy is selected based on the configuration option
state.backend.local-recovery. If set to true, then PreviousAllocationSchedulingStrategy
is selected. Otherwise LocationPreferenceSchedulingStrategy is selected.

This closes #6208.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5919d9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5919d9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5919d9c

Branch: refs/heads/master
Commit: d5919d9c1d4054dbfa182e482e082940b4e2f24d
Parents: ca7e87d
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jun 22 16:34:10 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 27 15:03:40 2018 +0200

----------------------------------------------------------------------
 .../clusterframework/types/SlotProfile.java     | 189 ++-----------------
 .../slotpool/DefaultSlotPoolFactory.java        |  18 ++
 .../LocationPreferenceSchedulingStrategy.java   | 119 ++++++++++++
 .../PreviousAllocationSchedulingStrategy.java   |  85 +++++++++
 .../jobmaster/slotpool/SchedulingStrategy.java  |  60 ++++++
 .../runtime/jobmaster/slotpool/SlotPool.java    |  19 +-
 .../jobmaster/slotpool/SlotSharingManager.java  |   3 +-
 .../clusterframework/types/SlotProfileTest.java |  11 +-
 .../jobmanager/scheduler/SchedulerTestBase.java |  11 +-
 .../jobmaster/slotpool/AvailableSlotsTest.java  |   4 +-
 .../jobmaster/slotpool/SlotPoolRpcTest.java     |   2 +
 .../slotpool/SlotPoolSchedulingTestBase.java    |   3 +-
 .../jobmaster/slotpool/SlotPoolTest.java        |  18 +-
 .../slotpool/SlotSharingManagerTest.java        |  20 +-
 .../flink/test/runtime/SchedulingITCase.java    | 167 ++++++++++++++++
 15 files changed, 529 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
index bd0a2db..7cb364d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java
@@ -18,26 +18,13 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
 
 /**
  * A slot profile describes the profile of a slot into which a task wants to be scheduled. The profile contains
@@ -97,174 +84,38 @@ public class SlotProfile {
 	}
 
 	/**
-	 * Returns the matcher for this profile that helps to find slots that fit the profile.
-	 */
-	public ProfileToSlotContextMatcher matcher() {
-		if (priorAllocations.isEmpty()) {
-			return new LocalityAwareRequirementsToSlotMatcher(preferredLocations);
-		} else {
-			return new PreviousAllocationProfileToSlotContextMatcher(priorAllocations);
-		}
-	}
-
-	/**
-	 * Classes that implement this interface provide a method to match objects to somehow represent slot candidates
-	 * against the {@link SlotProfile} that produced the matcher object. A matching candidate is transformed into a
-	 * desired result. If the matcher does not find a matching candidate, it returns null.
-	 */
-	public interface ProfileToSlotContextMatcher {
-
-		/**
-		 * This method takes the candidate slots, extracts slot contexts from them, filters them by the profile
-		 * requirements and potentially by additional requirements, and produces a result from a match.
-		 *
-		 * @param candidates                   stream of candidates to match against.
-		 * @param contextExtractor             function to extract the {@link SlotContext} from the candidates.
-		 * @param additionalRequirementsFilter predicate to specify additional requirements for each candidate.
-		 * @param resultProducer               function to produce a result from a matching candidate input.
-		 * @param <IN>                         type of the objects against we match the profile.
-		 * @param <OUT>                        type of the produced output from a matching object.
-		 * @return the result produced by resultProducer if a matching candidate was found or null otherwise.
-		 */
-		@Nullable
-		<IN, OUT> OUT findMatchWithLocality(
-			@Nonnull Stream<IN> candidates,
-			@Nonnull Function<IN, SlotContext> contextExtractor,
-			@Nonnull Predicate<IN> additionalRequirementsFilter,
-			@Nonnull BiFunction<IN, Locality, OUT> resultProducer);
-	}
-
-	/**
-	 * This matcher implementation is the presence of prior allocations. Prior allocations are supposed to overrule
-	 * other locality requirements, such as preferred locations. Prior allocations also require strict matching and
-	 * this matcher returns null if it cannot find a candidate for the same prior allocation. The background is that
-	 * this will force the scheduler tor request a new slot that is guaranteed to be not the prior location of any
-	 * other subtask, so that subtasks do not steal another subtasks prior allocation in case that the own prior
-	 * allocation is no longer available (e.g. machine failure). This is important to enable local recovery for all
-	 * tasks that can still return to their prior allocation.
+	 * Returns a slot profile that has no requirements.
 	 */
-	@VisibleForTesting
-	public static class PreviousAllocationProfileToSlotContextMatcher implements ProfileToSlotContextMatcher {
-
-		/** Set of prior allocations. */
-		private final HashSet<AllocationID> priorAllocations;
-
-		@VisibleForTesting
-		PreviousAllocationProfileToSlotContextMatcher(@Nonnull Collection<AllocationID> priorAllocations) {
-			this.priorAllocations = new HashSet<>(priorAllocations);
-			Preconditions.checkState(
-				this.priorAllocations.size() > 0,
-				"This matcher should only be used if there are prior allocations!");
-		}
-
-		public <I, O> O findMatchWithLocality(
-			@Nonnull Stream<I> candidates,
-			@Nonnull Function<I, SlotContext> contextExtractor,
-			@Nonnull Predicate<I> additionalRequirementsFilter,
-			@Nonnull BiFunction<I, Locality, O> resultProducer) {
-
-			Predicate<I> filterByAllocation =
-				(candidate) -> priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
-
-			return candidates
-				.filter(filterByAllocation.and(additionalRequirementsFilter))
-				.findFirst()
-				.map((result) -> resultProducer.apply(result, Locality.LOCAL)) // TODO introduce special locality?
-				.orElse(null);
-		}
+	public static SlotProfile noRequirements() {
+		return NO_REQUIREMENTS;
 	}
 
 	/**
-	 * This matcher is used whenever no prior allocation was specified in the {@link SlotProfile}. This implementation
-	 * tries to achieve best possible locality if a preferred location is specified in the profile.
+	 * Returns a slot profile for the given resource profile, without any locality requirements.
 	 */
-	@VisibleForTesting
-	public static class LocalityAwareRequirementsToSlotMatcher implements ProfileToSlotContextMatcher {
-
-		private final Collection<TaskManagerLocation> locationPreferences;
-
-		/**
-		 * Calculates the candidate's locality score.
-		 */
-		private static final BiFunction<Integer, Integer, Integer> LOCALITY_EVALUATION_FUNCTION
-			= (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh;
-
-		@VisibleForTesting
-		public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLocation> locationPreferences) {
-			this.locationPreferences = new ArrayList<>(locationPreferences);
-		}
-
-		@Override
-		public <IN, OUT> OUT findMatchWithLocality(
-			@Nonnull Stream<IN> candidates,
-			@Nonnull Function<IN, SlotContext> contextExtractor,
-			@Nonnull Predicate<IN> additionalRequirementsFilter,
-			@Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
-
-			// if we have no location preferences, we can only filter by the additional requirements.
-			if (locationPreferences.isEmpty()) {
-				return candidates
-					.filter(additionalRequirementsFilter)
-					.findFirst()
-					.map((result) -> resultProducer.apply(result, Locality.UNCONSTRAINED))
-					.orElse(null);
-			}
-
-			// we build up two indexes, one for resource id and one for host names of the preferred locations.
-			final Map<ResourceID, Integer> preferredResourceIDs = new HashMap<>(locationPreferences.size());
-			final Map<String, Integer> preferredFQHostNames = new HashMap<>(locationPreferences.size());
-
-			for (TaskManagerLocation locationPreference : locationPreferences) {
-				preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum);
-				preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum);
-			}
-
-			Iterator<IN> iterator = candidates.iterator();
-
-			IN bestCandidate = null;
-			int bestCandidateScore = Integer.MIN_VALUE;
-			Locality bestCandidateLocality = null;
-
-			while (iterator.hasNext()) {
-				IN candidate = iterator.next();
-				if (additionalRequirementsFilter.test(candidate)) {
-					SlotContext slotContext = contextExtractor.apply(candidate);
-
-					// this gets candidate is local-weigh
-					Integer localWeigh = preferredResourceIDs.getOrDefault(slotContext.getTaskManagerLocation().getResourceID(), 0);
-
-					// this gets candidate is host-local-weigh
-					Integer hostLocalWeigh = preferredFQHostNames.getOrDefault(slotContext.getTaskManagerLocation().getFQDNHostname(), 0);
-
-					int candidateScore = LOCALITY_EVALUATION_FUNCTION.apply(localWeigh, hostLocalWeigh);
-					if (candidateScore > bestCandidateScore) {
-						bestCandidateScore = candidateScore;
-						bestCandidate = candidate;
-						bestCandidateLocality = localWeigh > 0 ? Locality.LOCAL : hostLocalWeigh > 0 ? Locality.HOST_LOCAL : Locality.NON_LOCAL;
-					}
-				}
-			}
-
-			// at the end of the iteration, we return the candidate with best possible locality or null.
-			if (bestCandidate != null) {
-				return resultProducer.apply(bestCandidate, bestCandidateLocality);
-			} else {
-				return null;
-			}
-		}
+	public static SlotProfile noLocality(ResourceProfile resourceProfile) {
+		return new SlotProfile(resourceProfile, Collections.emptyList(), Collections.emptyList());
 	}
 
 	/**
-	 * Returns a slot profile that has no requirements.
+	 * Returns a slot profile for the given resource profile and the preferred locations.
+	 *
+	 * @param resourceProfile specifying the slot requirements
+	 * @param preferredLocations specifying the preferred locations
+	 * @return Slot profile with the given resource profile and preferred locations
 	 */
-	public static SlotProfile noRequirements() {
-		return NO_REQUIREMENTS;
+	public static SlotProfile preferredLocality(ResourceProfile resourceProfile, Collection<TaskManagerLocation> preferredLocations) {
+		return new SlotProfile(resourceProfile, preferredLocations, Collections.emptyList());
 	}
 
 	/**
-	 * Returns a slot profile for the given resource profile, without any locality requirements.
+	 * Returns a slot profile for the given resource profile and the prior allocations.
+	 *
+	 * @param resourceProfile specifying the slot requirements
+	 * @param priorAllocations specifying the prior allocations
+	 * @return Slot profile with the given resource profile and prior allocations
 	 */
-	public static SlotProfile noLocality(ResourceProfile resourceProfile) {
-		return new SlotProfile(resourceProfile, Collections.emptyList(), Collections.emptyList());
+	public static SlotProfile priorAllocation(ResourceProfile resourceProfile, Collection<AllocationID> priorAllocations) {
+		return new SlotProfile(resourceProfile, Collections.emptyList(), priorAllocations);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
index 2d082c2..ed14aab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -38,6 +39,9 @@ public class DefaultSlotPoolFactory implements SlotPoolFactory {
 	private final RpcService rpcService;
 
 	@Nonnull
+	private final SchedulingStrategy schedulingStrategy;
+
+	@Nonnull
 	private final Clock clock;
 
 	@Nonnull
@@ -48,10 +52,12 @@ public class DefaultSlotPoolFactory implements SlotPoolFactory {
 
 	public DefaultSlotPoolFactory(
 			@Nonnull RpcService rpcService,
+			@Nonnull SchedulingStrategy schedulingStrategy,
 			@Nonnull Clock clock,
 			@Nonnull Time rpcTimeout,
 			@Nonnull Time slotIdleTimeout) {
 		this.rpcService = rpcService;
+		this.schedulingStrategy = schedulingStrategy;
 		this.clock = clock;
 		this.rpcTimeout = rpcTimeout;
 		this.slotIdleTimeout = slotIdleTimeout;
@@ -63,6 +69,7 @@ public class DefaultSlotPoolFactory implements SlotPoolFactory {
 		return new SlotPool(
 			rpcService,
 			jobId,
+			schedulingStrategy,
 			clock,
 			rpcTimeout,
 			slotIdleTimeout);
@@ -75,10 +82,21 @@ public class DefaultSlotPoolFactory implements SlotPoolFactory {
 		final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
 		final Time slotIdleTimeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
 
+		final SchedulingStrategy schedulingStrategy = selectSchedulingStrategy(configuration);
+
 		return new DefaultSlotPoolFactory(
 			rpcService,
+			schedulingStrategy,
 			SystemClock.getInstance(),
 			rpcTimeout,
 			slotIdleTimeout);
 	}
+
+	private static SchedulingStrategy selectSchedulingStrategy(Configuration configuration) {
+		if (configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)) {
+			return PreviousAllocationSchedulingStrategy.getInstance();
+		} else {
+			return LocationPreferenceSchedulingStrategy.getInstance();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
new file mode 100644
index 0000000..25e884c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSchedulingStrategy.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * Default {@link SchedulingStrategy} which tries to match a slot with its location preferences.
+ */
+public class LocationPreferenceSchedulingStrategy implements SchedulingStrategy {
+
+	private static final LocationPreferenceSchedulingStrategy INSTANCE = new LocationPreferenceSchedulingStrategy();
+
+	/**
+	 * Calculates the candidate's locality score.
+	 */
+	private static final BiFunction<Integer, Integer, Integer> LOCALITY_EVALUATION_FUNCTION = (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh;
+
+	LocationPreferenceSchedulingStrategy() {}
+
+	@Nullable
+	@Override
+	public <IN, OUT> OUT findMatchWithLocality(
+			@Nonnull SlotProfile slotProfile,
+			@Nonnull Stream<IN> candidates,
+			@Nonnull Function<IN, SlotContext> contextExtractor,
+			@Nonnull Predicate<IN> additionalRequirementsFilter,
+			@Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
+
+		Collection<TaskManagerLocation> locationPreferences = slotProfile.getPreferredLocations();
+
+		// if we have no location preferences, we can only filter by the additional requirements.
+		if (locationPreferences.isEmpty()) {
+			return candidates
+				.filter(additionalRequirementsFilter)
+				.findFirst()
+				.map((result) -> resultProducer.apply(result, Locality.UNCONSTRAINED))
+				.orElse(null);
+		}
+
+		// we build up two indexes, one for resource id and one for host names of the preferred locations.
+		final Map<ResourceID, Integer> preferredResourceIDs = new HashMap<>(locationPreferences.size());
+		final Map<String, Integer> preferredFQHostNames = new HashMap<>(locationPreferences.size());
+
+		for (TaskManagerLocation locationPreference : locationPreferences) {
+			preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum);
+			preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum);
+		}
+
+		Iterator<IN> iterator = candidates.iterator();
+
+		IN bestCandidate = null;
+		int bestCandidateScore = Integer.MIN_VALUE;
+		Locality bestCandidateLocality = null;
+
+		while (iterator.hasNext()) {
+			IN candidate = iterator.next();
+			if (additionalRequirementsFilter.test(candidate)) {
+				SlotContext slotContext = contextExtractor.apply(candidate);
+
+				// this gets candidate is local-weigh
+				Integer localWeigh = preferredResourceIDs.getOrDefault(slotContext.getTaskManagerLocation().getResourceID(), 0);
+
+				// this gets candidate is host-local-weigh
+				Integer hostLocalWeigh = preferredFQHostNames.getOrDefault(slotContext.getTaskManagerLocation().getFQDNHostname(), 0);
+
+				int candidateScore = LOCALITY_EVALUATION_FUNCTION.apply(localWeigh, hostLocalWeigh);
+				if (candidateScore > bestCandidateScore) {
+					bestCandidateScore = candidateScore;
+					bestCandidate = candidate;
+					bestCandidateLocality = localWeigh > 0 ? Locality.LOCAL : hostLocalWeigh > 0 ? Locality.HOST_LOCAL : Locality.NON_LOCAL;
+				}
+			}
+		}
+
+		// at the end of the iteration, we return the candidate with best possible locality or null.
+		if (bestCandidate != null) {
+			return resultProducer.apply(bestCandidate, bestCandidateLocality);
+		} else {
+			return null;
+		}
+	}
+
+	public static LocationPreferenceSchedulingStrategy getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
new file mode 100644
index 0000000..9b1872e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSchedulingStrategy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * {@link SchedulingStrategy} which tries to match a slot with its previous {@link AllocationID}.
+ * If the previous allocation cannot be found, then it returns {@code null}. If the slot has not
+ * been scheduled before (no assigned allocation id), it will fall back to
+ * {@link LocationPreferenceSchedulingStrategy}.
+ */
+public class PreviousAllocationSchedulingStrategy extends LocationPreferenceSchedulingStrategy {
+
+	private static final PreviousAllocationSchedulingStrategy INSTANCE = new PreviousAllocationSchedulingStrategy();
+
+	private PreviousAllocationSchedulingStrategy() {}
+
+	@Nullable
+	@Override
+	public <IN, OUT> OUT findMatchWithLocality(
+			@Nonnull SlotProfile slotProfile,
+			@Nonnull Stream<IN> candidates,
+			@Nonnull Function<IN, SlotContext> contextExtractor,
+			@Nonnull Predicate<IN> additionalRequirementsFilter,
+			@Nonnull BiFunction<IN, Locality, OUT> resultProducer) {
+
+		Collection<AllocationID> priorAllocations = slotProfile.getPriorAllocations();
+
+		if (priorAllocations.isEmpty()) {
+			return super.findMatchWithLocality(slotProfile, candidates, contextExtractor, additionalRequirementsFilter, resultProducer);
+		} else {
+			return findPreviousAllocation(candidates, contextExtractor, additionalRequirementsFilter, resultProducer, priorAllocations);
+		}
+	}
+
+	@Nullable
+	private <IN, OUT> OUT findPreviousAllocation(
+			@Nonnull Stream<IN> candidates,
+			@Nonnull Function<IN, SlotContext> contextExtractor,
+			@Nonnull Predicate<IN> additionalRequirementsFilter,
+			@Nonnull BiFunction<IN, Locality, OUT> resultProducer,
+			Collection<AllocationID> priorAllocations) {
+		Predicate<IN> filterByAllocation =
+			(candidate) -> priorAllocations.contains(contextExtractor.apply(candidate).getAllocationId());
+
+		return candidates
+			.filter(filterByAllocation.and(additionalRequirementsFilter))
+			.findFirst()
+			.map((result) -> resultProducer.apply(result, Locality.LOCAL)) // TODO introduce special locality?
+			.orElse(null);
+	}
+
+	public static PreviousAllocationSchedulingStrategy getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
new file mode 100644
index 0000000..fb27a21
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulingStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * Classes that implement this interface provide a method to match objects to somehow represent slot candidates
+ * against the {@link SlotProfile} that produced the matcher object. A matching candidate is transformed into a
+ * desired result. If the matcher does not find a matching candidate, it returns null.
+ */
+public interface SchedulingStrategy {
+
+	/**
+	 * This method takes the candidate slots, extracts slot contexts from them, filters them by the profile
+	 * requirements and potentially by additional requirements, and produces a result from a match.
+	 *
+	 * @param slotProfile slotProfile for which to find a matching slot
+	 * @param candidates stream of candidates to match against.
+	 * @param contextExtractor function to extract the {@link SlotContext} from the candidates.
+	 * @param additionalRequirementsFilter predicate to specify additional requirements for each candidate.
+	 * @param resultProducer function to produce a result from a matching candidate input.
+	 * @param <IN> type of the objects against we match the profile.
+	 * @param <OUT> type of the produced output from a matching object.
+	 * @return the result produced by resultProducer if a matching candidate was found or null otherwise.
+	 */
+	@Nullable
+	<IN, OUT> OUT findMatchWithLocality(
+		@Nonnull SlotProfile slotProfile,
+		@Nonnull Stream<IN> candidates,
+		@Nonnull Function<IN, SlotContext> contextExtractor,
+		@Nonnull Predicate<IN> additionalRequirementsFilter,
+		@Nonnull BiFunction<IN, Locality, OUT> resultProducer);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 6ab21c2..81b3e24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -97,6 +97,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 	private final JobID jobId;
 
+	private final SchedulingStrategy schedulingStrategy;
+
 	private final ProviderAndOwner providerAndOwner;
 
 	/** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */
@@ -136,10 +138,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
-	protected SlotPool(RpcService rpcService, JobID jobId) {
+	protected SlotPool(RpcService rpcService, JobID jobId, SchedulingStrategy schedulingStrategy) {
 		this(
 			rpcService,
 			jobId,
+			schedulingStrategy,
 			SystemClock.getInstance(),
 			AkkaUtils.getDefaultTimeout(),
 			Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()));
@@ -148,6 +151,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 	public SlotPool(
 			RpcService rpcService,
 			JobID jobId,
+			SchedulingStrategy schedulingStrategy,
 			Clock clock,
 			Time rpcTimeout,
 			Time idleSlotTimeout) {
@@ -155,6 +159,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		super(rpcService);
 
 		this.jobId = checkNotNull(jobId);
+		this.schedulingStrategy = checkNotNull(schedulingStrategy);
 		this.clock = checkNotNull(clock);
 		this.rpcTimeout = checkNotNull(rpcTimeout);
 		this.idleSlotTimeout = checkNotNull(idleSlotTimeout);
@@ -511,7 +516,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		// check first whether we have a resolved root slot which we can use
 		SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = slotSharingManager.getResolvedRootSlot(
 			groupId,
-			slotProfile.matcher());
+			schedulingStrategy,
+			slotProfile);
 
 		if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
 			return multiTaskSlotLocality;
@@ -806,7 +812,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 
 	@Nullable
 	private SlotAndLocality pollAndAllocateSlot(SlotRequestId slotRequestId, SlotProfile slotProfile) {
-		SlotAndLocality slotFromPool = availableSlots.poll(slotProfile);
+		SlotAndLocality slotFromPool = availableSlots.poll(schedulingStrategy, slotProfile);
 
 		if (slotFromPool != null) {
 			allocatedSlots.add(slotRequestId, slotFromPool.getSlot());
@@ -1441,16 +1447,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
 		 *
 		 * @return Slot which matches the resource profile, null if we can't find a match
 		 */
-		SlotAndLocality poll(SlotProfile slotProfile) {
+		SlotAndLocality poll(SchedulingStrategy schedulingStrategy, SlotProfile slotProfile) {
 			// fast path if no slots are available
 			if (availableSlots.isEmpty()) {
 				return null;
 			}
-
-			SlotProfile.ProfileToSlotContextMatcher matcher = slotProfile.matcher();
 			Collection<SlotAndTimestamp> slotAndTimestamps = availableSlots.values();
 
-			SlotAndLocality matchingSlotAndLocality = matcher.findMatchWithLocality(
+			SlotAndLocality matchingSlotAndLocality = schedulingStrategy.findMatchWithLocality(
+				slotProfile,
 				slotAndTimestamps.stream(),
 				SlotAndTimestamp::slot,
 				(SlotAndTimestamp slot) -> slot.slot().getResourceProfile().isMatching(slotProfile.getResourceProfile()),

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index e3338fc..eaa5787 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -183,10 +183,11 @@ public class SlotSharingManager {
 	 * 		or null if there was no root slot which did not contain the given groupId
 	 */
 	@Nullable
-	MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SlotProfile.ProfileToSlotContextMatcher matcher) {
+	MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy matcher, SlotProfile slotProfile) {
 		synchronized (lock) {
 			Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
 			return matcher.findMatchWithLocality(
+				slotProfile,
 				resolvedRootSlotsValues.stream().flatMap(Collection::stream),
 				(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
 				(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
index 9f5f449..6a826aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/SlotProfileTest.java
@@ -22,7 +22,10 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGate
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -33,7 +36,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-public class SlotProfileTest {
+public class SlotProfileTest extends TestLogger {
 
 	private final ResourceProfile resourceProfile = new ResourceProfile(2, 1024);
 
@@ -58,6 +61,8 @@ public class SlotProfileTest {
 
 	private final Set<SlotContext> candidates = Collections.unmodifiableSet(createCandidates());
 
+	private final SchedulingStrategy schedulingStrategy = PreviousAllocationSchedulingStrategy.getInstance();
+
 	private Set<SlotContext> createCandidates() {
 		Set<SlotContext> candidates = new HashSet<>(4);
 		candidates.add(ssc1);
@@ -128,8 +133,8 @@ public class SlotProfileTest {
 	}
 
 	private SlotContext runMatching(SlotProfile slotProfile) {
-		SlotProfile.ProfileToSlotContextMatcher matcher = slotProfile.matcher();
-		return matcher.findMatchWithLocality(
+		return schedulingStrategy.findMatchWithLocality(
+			slotProfile,
 			candidates.stream(),
 			(candidate) -> candidate,
 			(candidate) -> true,

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 940934f..9a7fdd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -32,6 +32,8 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSchedulingStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
@@ -104,7 +106,10 @@ public class SchedulerTestBase extends TestLogger {
 			case SLOT_POOL:
 				rpcService = new TestingRpcService();
 				final JobID jobId = new JobID();
-				final TestingSlotPool slotPool = new TestingSlotPool(rpcService, jobId);
+				final TestingSlotPool slotPool = new TestingSlotPool(
+					rpcService,
+					jobId,
+					PreviousAllocationSchedulingStrategy.getInstance());
 				testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);
 
 				final JobMasterId jobMasterId = JobMasterId.generate();
@@ -392,8 +397,8 @@ public class SchedulerTestBase extends TestLogger {
 
 	private static final class TestingSlotPool extends SlotPool {
 
-		public TestingSlotPool(RpcService rpcService, JobID jobId) {
-			super(rpcService, jobId);
+		public TestingSlotPool(RpcService rpcService, JobID jobId, SchedulingStrategy schedulingStrategy) {
+			super(rpcService, jobId, schedulingStrategy);
 		}
 
 		CompletableFuture<Integer> getNumberOfAvailableSlots() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
index d477c69..caa174d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
@@ -95,9 +95,9 @@ public class AvailableSlotsTest extends TestLogger {
 		assertTrue(availableSlots.contains(slot1.getAllocationId()));
 		assertTrue(availableSlots.containsTaskManager(resource1));
 
-		assertNull(availableSlots.poll(SlotProfile.noLocality(DEFAULT_TESTING_BIG_PROFILE)));
+		assertNull(availableSlots.poll(LocationPreferenceSchedulingStrategy.getInstance(), SlotProfile.noLocality(DEFAULT_TESTING_BIG_PROFILE)));
 
-		SlotAndLocality slotAndLocality = availableSlots.poll(SlotProfile.noLocality(DEFAULT_TESTING_PROFILE));
+		SlotAndLocality slotAndLocality = availableSlots.poll(LocationPreferenceSchedulingStrategy.getInstance(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE));
 		assertEquals(slot1, slotAndLocality.getSlot());
 		assertEquals(0, availableSlots.size());
 		assertFalse(availableSlots.contains(slot1.getAllocationId()));

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index 5dfffad..4fb2ada 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -107,6 +107,7 @@ public class SlotPoolRpcTest extends TestLogger {
 		final SlotPool pool = new SlotPool(
 			rpcService,
 			jid,
+			LocationPreferenceSchedulingStrategy.getInstance(),
 			SystemClock.getInstance(),
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime()
@@ -355,6 +356,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			super(
 				rpcService,
 				jobId,
+				LocationPreferenceSchedulingStrategy.getInstance(),
 				clock,
 				rpcTimeout,
 				idleSlotTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
index 04268da..1dfdc17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -74,7 +74,8 @@ public class SlotPoolSchedulingTestBase extends TestLogger {
 
 		slotPool = new SlotPool(
 			testingRpcService,
-			jobId);
+			jobId,
+			PreviousAllocationSchedulingStrategy.getInstance());
 
 		slotPool.start(jobMasterId, jobMasterAddress);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 91ccc81..9815cb2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -111,7 +111,7 @@ public class SlotPoolTest extends TestLogger {
 		CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
 		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
 
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
@@ -154,7 +154,7 @@ public class SlotPoolTest extends TestLogger {
 			}
 		});
 
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
@@ -216,7 +216,7 @@ public class SlotPoolTest extends TestLogger {
 		final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
 		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
 
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
@@ -272,7 +272,7 @@ public class SlotPoolTest extends TestLogger {
 
 		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
 
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
@@ -340,7 +340,7 @@ public class SlotPoolTest extends TestLogger {
 
 		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
 
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
 
 		try {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
@@ -398,7 +398,7 @@ public class SlotPoolTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotRequestCancellationUponFailingRequest() throws Exception {
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
 		final CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<>();
 		final CompletableFuture<AllocationID> cancelSlotFuture = new CompletableFuture<>();
 		final CompletableFuture<AllocationID> requestSlotFutureAllocationId = new CompletableFuture<>();
@@ -458,7 +458,7 @@ public class SlotPoolTest extends TestLogger {
 	 */
 	@Test
 	public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
 
 		final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
 
@@ -533,7 +533,7 @@ public class SlotPoolTest extends TestLogger {
 	 */
 	@Test
 	public void testShutdownReleasesAllSlots() throws Exception {
-		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance());
 
 		try {
 			final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
@@ -593,6 +593,7 @@ public class SlotPoolTest extends TestLogger {
 		final SlotPool slotPool = new SlotPool(
 			rpcService,
 			jobId,
+			LocationPreferenceSchedulingStrategy.getInstance(),
 			clock,
 			TestingUtils.infiniteTime(),
 			timeout);
@@ -654,6 +655,7 @@ public class SlotPoolTest extends TestLogger {
 		final SlotPool slotPool = new SlotPool(
 			rpcService,
 			jobId,
+			LocationPreferenceSchedulingStrategy.getInstance(),
 			clock,
 			TestingUtils.infiniteTime(),
 			timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index c325a69..d4ab867 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
@@ -414,7 +415,7 @@ public class SlotSharingManagerTest extends TestLogger {
 
 		AbstractID groupId = new AbstractID();
 		SlotSharingManager.MultiTaskSlotLocality resolvedRootSlotLocality =
-			slotSharingManager.getResolvedRootSlot(groupId, SlotProfile.noRequirements().matcher());
+			slotSharingManager.getResolvedRootSlot(groupId, LocationPreferenceSchedulingStrategy.getInstance(), SlotProfile.noRequirements());
 
 		assertNotNull(resolvedRootSlotLocality);
 		assertEquals(Locality.UNCONSTRAINED, resolvedRootSlotLocality.getLocality());
@@ -430,7 +431,8 @@ public class SlotSharingManagerTest extends TestLogger {
 
 		SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = slotSharingManager.getResolvedRootSlot(
 			groupId,
-			SlotProfile.noRequirements().matcher());
+			LocationPreferenceSchedulingStrategy.getInstance(),
+			SlotProfile.noRequirements());
 
 		assertNull(resolvedRootSlot1);
 	}
@@ -469,9 +471,12 @@ public class SlotSharingManagerTest extends TestLogger {
 			new SlotRequestId());
 
 		AbstractID groupId = new AbstractID();
-		SlotProfile.LocalityAwareRequirementsToSlotMatcher matcher =
-			new SlotProfile.LocalityAwareRequirementsToSlotMatcher(Collections.singleton(taskManagerLocation));
-		SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = slotSharingManager.getResolvedRootSlot(groupId, matcher);
+
+		SlotProfile slotProfile = SlotProfile.preferredLocality(ResourceProfile.UNKNOWN, Collections.singleton(taskManagerLocation));
+		SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot1 = slotSharingManager.getResolvedRootSlot(
+			groupId,
+			LocationPreferenceSchedulingStrategy.getInstance(),
+			slotProfile);
 		assertNotNull(resolvedRootSlot1);
 		assertEquals(Locality.LOCAL, resolvedRootSlot1.getLocality());
 		assertEquals(rootSlot2.getSlotRequestId(), resolvedRootSlot1.getMultiTaskSlot().getSlotRequestId());
@@ -482,7 +487,10 @@ public class SlotSharingManagerTest extends TestLogger {
 			groupId,
 			resolvedRootSlot1.getLocality());
 
-		SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot2 = slotSharingManager.getResolvedRootSlot(groupId,matcher);
+		SlotSharingManager.MultiTaskSlotLocality resolvedRootSlot2 = slotSharingManager.getResolvedRootSlot(
+			groupId,
+			LocationPreferenceSchedulingStrategy.getInstance(),
+			slotProfile);
 
 		assertNotNull(resolvedRootSlot2);
 		assertNotSame(Locality.LOCAL, (resolvedRootSlot2.getLocality()));

http://git-wip-us.apache.org/repos/asf/flink/blob/d5919d9c/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
new file mode 100644
index 0000000..0344d71
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingITCase.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for testing Flink's scheduling strategies.
+ */
+public class SchedulingITCase extends TestLogger {
+
+	/**
+	 * Tests that if local recovery is disabled we won't spread
+	 * out tasks when recovering.
+	 */
+	@Test
+	public void testDisablingLocalRecovery() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, false);
+
+		executeSchedulingTest(configuration);
+	}
+
+	/**
+	 * Tests that if local recovery is enabled we won't spread
+	 * out tasks when recovering.
+	 */
+	@Test
+	@Ignore("The test should not pass until FLINK-9635 has been fixed")
+	public void testLocalRecovery() throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
+
+		executeSchedulingTest(configuration);
+	}
+
+	private void executeSchedulingTest(Configuration configuration) throws Exception {
+		configuration.setInteger(RestOptions.PORT, 0);
+
+		final long slotIdleTimeout = 50L;
+		configuration.setLong(JobManagerOptions.SLOT_IDLE_TIMEOUT, slotIdleTimeout);
+
+		final int parallelism = 4;
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumTaskManagers(parallelism)
+			.setNumSlotsPerTaskManager(1)
+			.build();
+
+		try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
+			miniCluster.start();
+
+			MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
+
+			JobGraph jobGraph = createJobGraph(slotIdleTimeout << 1, parallelism);
+			CompletableFuture<JobSubmissionResult> submissionFuture = miniClusterClient.submitJob(jobGraph);
+
+			// wait for the submission to succeed
+			JobSubmissionResult jobSubmissionResult = submissionFuture.get();
+
+			CompletableFuture<JobResult> resultFuture = miniClusterClient.requestJobResult(jobSubmissionResult.getJobID());
+
+			JobResult jobResult = resultFuture.get();
+
+			assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+		}
+	}
+
+	@Nonnull
+	private JobGraph createJobGraph(long delay, int parallelism) throws IOException {
+		SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+
+		final JobVertex source = new JobVertex("source");
+		source.setInvokableClass(OneTimeFailingInvokable.class);
+		source.setParallelism(parallelism);
+		source.setSlotSharingGroup(slotSharingGroup);
+
+		final JobVertex sink = new JobVertex("sink");
+		sink.setInvokableClass(NoOpInvokable.class);
+		sink.setParallelism(parallelism);
+		sink.setSlotSharingGroup(slotSharingGroup);
+
+		sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		JobGraph jobGraph = new JobGraph(source, sink);
+
+		jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, delay));
+		jobGraph.setExecutionConfig(executionConfig);
+
+		return jobGraph;
+	}
+
+	/**
+	 * Invokable which fails exactly once (one sub task of it).
+	 */
+	public static final class OneTimeFailingInvokable extends AbstractInvokable {
+
+		private static final AtomicBoolean hasFailed = new AtomicBoolean(false);
+
+		/**
+		 * Create an Invokable task and set its environment.
+		 *
+		 * @param environment The environment assigned to this invokable.
+		 */
+		public OneTimeFailingInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			if (hasFailed.compareAndSet(false, true)) {
+				throw new FlinkException("One time failure.");
+			}
+		}
+	}
+}