You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/11 14:25:24 UTC

[flink] branch master updated (92e2f3b -> 4df2295)

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 92e2f3b  [FLINK-18980][e2e] Add timeout to get logs from stalling test
     new 6a5987f  [hotfix] Fix checkstyle violations in JobGraph and JobGraphTest
     new d918cc3  [FLINK-17016][runtime] Use SlotSharingExecutionSlotAllocator for pipelined region scheduling
     new 4df2295  [FLINK-17016][runtime] Enable to use pipelined region scheduling strategy

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/configuration/JobManagerOptions.java     |  18 ++
 .../apache/flink/runtime/jobgraph/JobGraph.java    |  36 +++-
 .../slotpool/PhysicalSlotProviderImpl.java         |   7 +-
 .../DefaultExecutionSlotAllocatorFactory.java      |   4 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  |  45 ++++-
 ...actory.java => DefaultSchedulerComponents.java} | 168 +++++++----------
 .../runtime/scheduler/DefaultSchedulerFactory.java | 109 +----------
 .../scheduler/ExecutionSlotAllocationContext.java  |  72 ++++++++
 .../scheduler/ExecutionSlotAllocatorFactory.java   |  10 +-
 .../OneSlotPerExecutionSlotAllocatorFactory.java   |   4 +-
 .../SlotSharingExecutionSlotAllocatorFactory.java  |  29 +--
 .../flink/runtime/jobgraph/JobGraphTest.java       | 110 +++++++----
 .../DefaultSchedulerComponentsFactoryTest.java     |  77 ++++++++
 .../runtime/scheduler/SchedulerTestingUtils.java   |   6 +-
 .../TestExecutionSlotAllocatorFactory.java         |   2 +-
 .../PipelinedRegionSchedulingITCase.java           | 205 +++++++++++++++++++++
 16 files changed, 627 insertions(+), 275 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/{DefaultSchedulerFactory.java => DefaultSchedulerComponents.java} (51%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java


[flink] 02/03: [FLINK-17016][runtime] Use SlotSharingExecutionSlotAllocator for pipelined region scheduling

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d918cc3e660d9cdbb84a175c8bb93ce6a14e38dc
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Aug 31 17:26:24 2020 +0800

    [FLINK-17016][runtime] Use SlotSharingExecutionSlotAllocator for pipelined region scheduling
---
 .../apache/flink/runtime/jobgraph/JobGraph.java    | 26 ++++++
 .../slotpool/PhysicalSlotProviderImpl.java         |  7 +-
 .../DefaultExecutionSlotAllocatorFactory.java      |  4 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  | 10 ++-
 .../runtime/scheduler/DefaultSchedulerFactory.java | 11 +--
 .../scheduler/ExecutionSlotAllocationContext.java  | 95 ++++++++++++++++++++++
 .../scheduler/ExecutionSlotAllocatorFactory.java   | 10 ++-
 .../OneSlotPerExecutionSlotAllocatorFactory.java   |  4 +-
 .../SlotSharingExecutionSlotAllocatorFactory.java  | 29 ++-----
 .../flink/runtime/jobgraph/JobGraphTest.java       | 45 ++++++++++
 .../TestExecutionSlotAllocatorFactory.java         |  2 +-
 11 files changed, 207 insertions(+), 36 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 95a9a28..ffa1502 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -26,7 +26,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -41,7 +44,9 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -300,6 +305,27 @@ public class JobGraph implements Serializable {
 		return this.taskVertices.size();
 	}
 
+	public Set<SlotSharingGroup> getSlotSharingGroups() {
+		final Set<SlotSharingGroup> slotSharingGroups = IterableUtils
+			.toStream(getVertices())
+			.map(JobVertex::getSlotSharingGroup)
+			.collect(Collectors.toSet());
+		return Collections.unmodifiableSet(slotSharingGroups);
+	}
+
+	public Set<CoLocationGroupDesc> getCoLocationGroupDescriptors() {
+		// invoke distinct() on CoLocationGroup first to avoid creating
+		// multiple CoLocationGroupDec from one CoLocationGroup
+		final Set<CoLocationGroupDesc> coLocationGroups = IterableUtils
+			.toStream(getVertices())
+			.map(JobVertex::getCoLocationGroup)
+			.filter(Objects::nonNull)
+			.distinct()
+			.map(CoLocationGroupDesc::from)
+			.collect(Collectors.toSet());
+		return Collections.unmodifiableSet(coLocationGroups);
+	}
+
 	/**
 	 * Sets the settings for asynchronous snapshots. A value of {@code null} means that
 	 * snapshotting is not enabled.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
index 023405b..4841a79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImpl.java
@@ -32,14 +32,17 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
+/**
+ * The provider serves physical slot requests.
+ */
+public class PhysicalSlotProviderImpl implements PhysicalSlotProvider {
 	private static final Logger LOG = LoggerFactory.getLogger(PhysicalSlotProviderImpl.class);
 
 	private final SlotSelectionStrategy slotSelectionStrategy;
 
 	private final SlotPool slotPool;
 
-	PhysicalSlotProviderImpl(SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
+	public PhysicalSlotProviderImpl(SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
 		this.slotSelectionStrategy = checkNotNull(slotSelectionStrategy);
 		this.slotPool = checkNotNull(slotPool);
 		slotPool.disableBatchSlotRequestTimeoutCheck();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
index 4324c3b..824e4cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
@@ -35,7 +35,7 @@ class DefaultExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFact
 	}
 
 	@Override
-	public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
-		return new DefaultExecutionSlotAllocator(slotProvider, preferredLocationsRetriever);
+	public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
+		return new DefaultExecutionSlotAllocator(slotProvider, context::getPreferredLocations);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index ac660ce..db748bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -160,7 +160,15 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			failoverStrategy,
 			restartBackoffTimeStrategy);
 		this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
-		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(getPreferredLocationsRetriever());
+
+		final ExecutionSlotAllocationContext slotAllocationContext = new ExecutionSlotAllocationContext(
+			getPreferredLocationsRetriever(),
+			executionVertexID -> getExecutionVertex(executionVertexID).getResourceProfile(),
+			executionVertexID -> getExecutionVertex(executionVertexID).getLatestPriorAllocation(),
+			getSchedulingTopology(),
+			() -> getJobGraph().getSlotSharingGroups(),
+			() -> getJobGraph().getCoLocationGroupDescriptors());
+		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(slotAllocationContext);
 
 		this.verticesWaitingForRestart = new HashSet<>();
 		this.startUpAction = startUpAction;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 581e075..84f3143 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -35,9 +35,9 @@ import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
-import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
-import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProviderImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
@@ -164,10 +164,11 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
 		final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
 			.createFromSlotPool(slotPool, SystemClock.getInstance());
-		final BulkSlotProvider bulkSlotProvider = new BulkSlotProviderImpl(slotSelectionStrategy, slotPool, bulkChecker);
-		final ExecutionSlotAllocatorFactory allocatorFactory = new OneSlotPerExecutionSlotAllocatorFactory(
-			bulkSlotProvider,
+		final PhysicalSlotProvider physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
+		final ExecutionSlotAllocatorFactory allocatorFactory = new SlotSharingExecutionSlotAllocatorFactory(
+			physicalSlotProvider,
 			scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+			bulkChecker,
 			slotRequestTimeout);
 		return new DefaultSchedulerComponents(
 			new PipelinedRegionSchedulingStrategy.Factory(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
new file mode 100644
index 0000000..8180c1f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Context for slot allocation.
+ */
+class ExecutionSlotAllocationContext implements PreferredLocationsRetriever {
+
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
+
+	private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
+
+	private final SchedulingTopology schedulingTopology;
+
+	private final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier;
+
+	private final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier;
+
+	ExecutionSlotAllocationContext(
+			final PreferredLocationsRetriever preferredLocationsRetriever,
+			final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
+			final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
+			final SchedulingTopology schedulingTopology,
+			final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier,
+			final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier) {
+
+		this.preferredLocationsRetriever = preferredLocationsRetriever;
+		this.resourceProfileRetriever = resourceProfileRetriever;
+		this.priorAllocationIdRetriever = priorAllocationIdRetriever;
+		this.schedulingTopology = schedulingTopology;
+		this.logicalSlotSharingGroupSupplier = logicalSlotSharingGroupSupplier;
+		this.coLocationGroupSupplier = coLocationGroupSupplier;
+	}
+
+	@Override
+	public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
+			final ExecutionVertexID executionVertexId,
+			final Set<ExecutionVertexID> producersToIgnore) {
+		return preferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore);
+	}
+
+	ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
+		return resourceProfileRetriever.apply(executionVertexId);
+	}
+
+	AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
+		return priorAllocationIdRetriever.apply(executionVertexId);
+	}
+
+	SchedulingTopology getSchedulingTopology() {
+		return schedulingTopology;
+	}
+
+	Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
+		return logicalSlotSharingGroupSupplier.get();
+	}
+
+	Set<CoLocationGroupDesc> getCoLocationGroups() {
+		return coLocationGroupSupplier.get();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
index 1dc82aa..7a1586b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorFactory.java
@@ -22,8 +22,14 @@ package org.apache.flink.runtime.scheduler;
 /**
  * Interface for {@link ExecutionSlotAllocator} factories.
  */
+@FunctionalInterface
 public interface ExecutionSlotAllocatorFactory {
 
-	ExecutionSlotAllocator createInstance(PreferredLocationsRetriever preferredLocationsRetriever);
-
+	/**
+	 * Instantiates the {@link ExecutionSlotAllocator}.
+	 *
+	 * @param context for slot allocation
+	 * @return The instantiated slot allocator
+	 */
+	ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
index 1824daf..b8a7f31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
@@ -45,10 +45,10 @@ class OneSlotPerExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorF
 	}
 
 	@Override
-	public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
+	public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
 		return new OneSlotPerExecutionSlotAllocator(
 			slotProvider,
-			preferredLocationsRetriever,
+			context::getPreferredLocations,
 			slotWillBeOccupiedIndefinitely,
 			allocationTimeout);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
index 8842c7c..392ee67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
@@ -19,14 +19,9 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
 import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-
-import java.util.function.Function;
 
 /**
  * Factory for {@link SlotSharingExecutionSlotAllocator}.
@@ -36,12 +31,6 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
 
 	private final boolean slotWillBeOccupiedIndefinitely;
 
-	private final SlotSharingStrategy slotSharingStrategy;
-
-	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
-
-	private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
-
 	private final PhysicalSlotRequestBulkChecker bulkChecker;
 
 	private final Time allocationTimeout;
@@ -49,25 +38,23 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
 	SlotSharingExecutionSlotAllocatorFactory(
 			PhysicalSlotProvider slotProvider,
 			boolean slotWillBeOccupiedIndefinitely,
-			SlotSharingStrategy slotSharingStrategy,
-			Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
-			Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
 			PhysicalSlotRequestBulkChecker bulkChecker,
 			Time allocationTimeout) {
 		this.slotProvider = slotProvider;
 		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
-		this.slotSharingStrategy = slotSharingStrategy;
-		this.resourceProfileRetriever = resourceProfileRetriever;
-		this.priorAllocationIdRetriever = priorAllocationIdRetriever;
 		this.bulkChecker = bulkChecker;
 		this.allocationTimeout = allocationTimeout;
 	}
 
 	@Override
-	public ExecutionSlotAllocator createInstance(PreferredLocationsRetriever preferredLocationsRetriever) {
+	public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
+		SlotSharingStrategy slotSharingStrategy = new LocalInputPreferredSlotSharingStrategy(
+			context.getSchedulingTopology(),
+			context.getLogicalSlotSharingGroups(),
+			context.getCoLocationGroups());
 		SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = new MergingSharedSlotProfileRetrieverFactory(
-			preferredLocationsRetriever,
-			priorAllocationIdRetriever);
+			context::getPreferredLocations,
+			context::getPriorAllocationId);
 		return new SlotSharingExecutionSlotAllocator(
 			slotProvider,
 			slotWillBeOccupiedIndefinitely,
@@ -75,6 +62,6 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
 			sharedSlotProfileRetrieverFactory,
 			bulkChecker,
 			allocationTimeout,
-			resourceProfileRetriever);
+			context::getResourceProfile);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index e1c8d5a..aa3e9da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
@@ -35,9 +37,12 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -361,4 +366,44 @@ public class JobGraphTest extends TestLogger {
 			checkpointCoordinatorConfiguration,
 			null);
 	}
+
+	@Test
+	public void testGetSlotSharingGroups() {
+		final JobVertex v1 = new JobVertex("1");
+		final JobVertex v2 = new JobVertex("2");
+		final JobVertex v3 = new JobVertex("3");
+		final JobVertex v4 = new JobVertex("4");
+
+		final SlotSharingGroup group1 = new SlotSharingGroup();
+		v1.setSlotSharingGroup(group1);
+		v2.setSlotSharingGroup(group1);
+
+		final SlotSharingGroup group2 = new SlotSharingGroup();
+		v3.setSlotSharingGroup(group2);
+		v4.setSlotSharingGroup(group2);
+
+		final JobGraph jobGraph = new JobGraph(v1, v2, v3, v4);
+
+		assertThat(jobGraph.getSlotSharingGroups(), containsInAnyOrder(group1, group2));
+	}
+
+	@Test
+	public void testGetCoLocationGroupDescriptors() {
+		final JobVertex v1 = new JobVertex("1");
+		final JobVertex v2 = new JobVertex("2");
+		final JobVertex v3 = new JobVertex("3");
+		final JobVertex v4 = new JobVertex("4");
+
+		final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+		v1.setSlotSharingGroup(slotSharingGroup);
+		v2.setSlotSharingGroup(slotSharingGroup);
+		v1.setStrictlyCoLocatedWith(v2);
+
+		final JobGraph jobGraph = new JobGraph(v1, v2, v3, v4);
+
+		assertThat(jobGraph.getCoLocationGroupDescriptors(), hasSize(1));
+
+		final CoLocationGroupDesc onlyCoLocationGroupDesc = jobGraph.getCoLocationGroupDescriptors().iterator().next();
+		assertThat(onlyCoLocationGroupDesc.getVertices(), containsInAnyOrder(v1.getID(), v2.getID()));
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
index a996686..39865c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
@@ -37,7 +37,7 @@ public class TestExecutionSlotAllocatorFactory implements ExecutionSlotAllocator
 	}
 
 	@Override
-	public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever ignored) {
+	public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) {
 		return testExecutionSlotAllocator;
 	}
 


[flink] 01/03: [hotfix] Fix checkstyle violations in JobGraph and JobGraphTest

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a5987fd21f4de29cc4dfed0a864495c60cdb96e
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Aug 31 16:31:46 2020 +0800

    [hotfix] Fix checkstyle violations in JobGraph and JobGraphTest
---
 .../apache/flink/runtime/jobgraph/JobGraph.java    | 10 ++--
 .../flink/runtime/jobgraph/JobGraphTest.java       | 65 +++++++++++-----------
 2 files changed, 39 insertions(+), 36 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 1125367..95a9a28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -74,15 +74,15 @@ public class JobGraph implements Serializable {
 	/** Name of this job. */
 	private final String jobName;
 
-	/** The mode in which the job is scheduled */
+	/** The mode in which the job is scheduled. */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
 	// --- checkpointing ---
 
-	/** Job specific execution config */
+	/** Job specific execution config. */
 	private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
-	/** The settings for the job checkpoints */
+	/** The settings for the job checkpoints. */
 	private JobCheckpointingSettings snapshotSettings;
 
 	/** Savepoint restore settings. */
@@ -210,7 +210,7 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Returns the {@link ExecutionConfig}
+	 * Returns the {@link ExecutionConfig}.
 	 *
 	 * @return ExecutionConfig
 	 */
@@ -321,7 +321,7 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Checks if the checkpointing was enabled for this job graph
+	 * Checks if the checkpointing was enabled for this job graph.
 	 *
 	 * @return true if checkpointing enabled
 	 */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index a0c751c..e1c8d5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -41,19 +41,22 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link JobGraph}.
+ */
 public class JobGraphTest extends TestLogger {
 
 	@Test
 	public void testSerialization() {
 		try {
 			JobGraph jg = new JobGraph("The graph");
-			
+
 			// add some configuration values
 			{
 				jg.getJobConfiguration().setString("some key", "some value");
 				jg.getJobConfiguration().setDouble("Life of ", Math.PI);
 			}
-			
+
 			// add some vertices
 			{
 				JobVertex source1 = new JobVertex("source1");
@@ -61,12 +64,12 @@ public class JobGraphTest extends TestLogger {
 				JobVertex target = new JobVertex("target");
 				target.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 				target.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-				
+
 				jg.addVertex(source1);
 				jg.addVertex(source2);
 				jg.addVertex(target);
 			}
-			
+
 			// de-/serialize and compare
 			JobGraph copy = CommonTestUtils.createCopySerializable(jg);
 
@@ -74,7 +77,7 @@ public class JobGraphTest extends TestLogger {
 			assertEquals(jg.getJobID(), copy.getJobID());
 			assertEquals(jg.getJobConfiguration(), copy.getJobConfiguration());
 			assertEquals(jg.getNumberOfVertices(), copy.getNumberOfVertices());
-			
+
 			for (JobVertex vertex : copy.getVertices()) {
 				JobVertex original = jg.findVertexByID(vertex.getID());
 				assertNotNull(original);
@@ -88,7 +91,7 @@ public class JobGraphTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testTopologicalSort1() {
 		try {
@@ -98,7 +101,7 @@ public class JobGraphTest extends TestLogger {
 			JobVertex target2 = new JobVertex("target2");
 			JobVertex intermediate1 = new JobVertex("intermediate1");
 			JobVertex intermediate2 = new JobVertex("intermediate2");
-			
+
 			target1.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			target2.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			target2.connectNewDataSetAsInput(intermediate2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
@@ -108,9 +111,9 @@ public class JobGraphTest extends TestLogger {
 			JobGraph graph = new JobGraph("TestGraph",
 				source1, source2, intermediate1, intermediate2, target1, target2);
 			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
-			
+
 			assertEquals(6, sorted.size());
-			
+
 			assertBefore(source1, target1, sorted);
 			assertBefore(source1, target2, sorted);
 			assertBefore(source2, target2, sorted);
@@ -124,7 +127,7 @@ public class JobGraphTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testTopologicalSort2() {
 		try {
@@ -135,41 +138,41 @@ public class JobGraphTest extends TestLogger {
 			JobVertex l12 = new JobVertex("layer 1 - 2");
 			JobVertex l13 = new JobVertex("layer 1 - 3");
 			JobVertex l2 = new JobVertex("layer 2");
-			
+
 			root.connectNewDataSetAsInput(l13, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			root.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			root.connectNewDataSetAsInput(l2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-			
+
 			l2.connectNewDataSetAsInput(l11, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			l2.connectNewDataSetAsInput(l12, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-			
+
 			l11.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-			
+
 			l12.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			l12.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
-			
+
 			l13.connectNewDataSetAsInput(source2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 			JobGraph graph = new JobGraph("TestGraph",
 				source1, source2, root, l11, l13, l12, l2);
 			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
-			
+
 			assertEquals(7,  sorted.size());
-			
+
 			assertBefore(source1, root, sorted);
 			assertBefore(source2, root, sorted);
 			assertBefore(l11, root, sorted);
 			assertBefore(l12, root, sorted);
 			assertBefore(l13, root, sorted);
 			assertBefore(l2, root, sorted);
-			
+
 			assertBefore(l11, l2, sorted);
 			assertBefore(l12, l2, sorted);
 			assertBefore(l2, root, sorted);
-			
+
 			assertBefore(source1, l2, sorted);
 			assertBefore(source2, l2, sorted);
-			
+
 			assertBefore(source2, l13, sorted);
 		}
 		catch (Exception e) {
@@ -177,7 +180,7 @@ public class JobGraphTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testTopologicalSort3() {
 		//             --> op1 --
@@ -185,13 +188,13 @@ public class JobGraphTest extends TestLogger {
 		//  (source) -           +-> op2 -> op3
 		//            \         /
 		//             ---------
-		
+
 		try {
 			JobVertex source = new JobVertex("source");
 			JobVertex op1 = new JobVertex("op4");
 			JobVertex op2 = new JobVertex("op2");
 			JobVertex op3 = new JobVertex("op3");
-			
+
 			op1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			op2.connectNewDataSetAsInput(op1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			op2.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
@@ -199,9 +202,9 @@ public class JobGraphTest extends TestLogger {
 
 			JobGraph graph = new JobGraph("TestGraph", source, op1, op2, op3);
 			List<JobVertex> sorted = graph.getVerticesSortedTopologicallyFromSources();
-			
+
 			assertEquals(4,  sorted.size());
-			
+
 			assertBefore(source, op1, sorted);
 			assertBefore(source, op2, sorted);
 			assertBefore(op1, op2, sorted);
@@ -212,7 +215,7 @@ public class JobGraphTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testTopoSortCyclicGraphNoSources() {
 		try {
@@ -220,7 +223,7 @@ public class JobGraphTest extends TestLogger {
 			JobVertex v2 = new JobVertex("2");
 			JobVertex v3 = new JobVertex("3");
 			JobVertex v4 = new JobVertex("4");
-			
+
 			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
@@ -240,17 +243,17 @@ public class JobGraphTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testTopoSortCyclicGraphIntermediateCycle() {
-		try{ 
+		try {
 			JobVertex source = new JobVertex("source");
 			JobVertex v1 = new JobVertex("1");
 			JobVertex v2 = new JobVertex("2");
 			JobVertex v3 = new JobVertex("3");
 			JobVertex v4 = new JobVertex("4");
 			JobVertex target = new JobVertex("target");
-			
+
 			v1.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			v1.connectNewDataSetAsInput(v4, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 			v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
@@ -272,7 +275,7 @@ public class JobGraphTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private static final void assertBefore(JobVertex v1, JobVertex v2, List<JobVertex> list) {
 		boolean seenFirst = false;
 		for (JobVertex v : list) {


[flink] 03/03: [FLINK-17016][runtime] Enable to use pipelined region scheduling strategy

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4df2295f54709f4292888a3b4fcbb019dd4d7901
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Sep 10 20:31:42 2020 +0800

    [FLINK-17016][runtime] Enable to use pipelined region scheduling strategy
    
    It can be enabled via config option "jobmanager.scheduler.scheduling-strategy=region"
---
 .../flink/configuration/JobManagerOptions.java     |  18 ++
 .../flink/runtime/scheduler/DefaultScheduler.java  |  51 ++++-
 ...actory.java => DefaultSchedulerComponents.java} | 157 ++++++----------
 .../runtime/scheduler/DefaultSchedulerFactory.java | 110 +----------
 .../scheduler/ExecutionSlotAllocationContext.java  |  99 ++++------
 .../DefaultSchedulerComponentsFactoryTest.java     |  77 ++++++++
 .../runtime/scheduler/SchedulerTestingUtils.java   |   6 +-
 .../PipelinedRegionSchedulingITCase.java           | 205 +++++++++++++++++++++
 8 files changed, 450 insertions(+), 273 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index aa3dc9a..ed8be72 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -326,6 +326,7 @@ public class JobManagerOptions {
 			// default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
 			.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
 			.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
+
 	/**
 	 * Config parameter determining the scheduler implementation.
 	 */
@@ -339,6 +340,23 @@ public class JobManagerOptions {
 				.list(
 					text("'ng': new generation scheduler"))
 				.build());
+
+	/**
+	 * Config parameter determining the scheduling strategy.
+	 */
+	@Documentation.ExcludeFromDocumentation("User normally should not be expected to change this config.")
+	public static final ConfigOption<String> SCHEDULING_STRATEGY =
+		key("jobmanager.scheduler.scheduling-strategy")
+			.stringType()
+			.defaultValue("legacy")
+			.withDescription(Description.builder()
+				.text("Determines which scheduling strategy is used to schedule tasks. Accepted values are:")
+				.list(
+					text("'region': pipelined region scheduling"),
+					text("'legacy': legacy scheduling strategy, which is eager scheduling for streaming jobs " +
+						"and lazy from sources scheduling for batch jobs"))
+				.build());
+
 	/**
 	 * Config parameter controlling whether partitions should already be released during the job execution.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index db748bd..fa27baa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -38,7 +40,9 @@ import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.ThrowingSlotProvider;
@@ -48,8 +52,10 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureSta
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -161,14 +167,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			restartBackoffTimeStrategy);
 		this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
 
-		final ExecutionSlotAllocationContext slotAllocationContext = new ExecutionSlotAllocationContext(
-			getPreferredLocationsRetriever(),
-			executionVertexID -> getExecutionVertex(executionVertexID).getResourceProfile(),
-			executionVertexID -> getExecutionVertex(executionVertexID).getLatestPriorAllocation(),
-			getSchedulingTopology(),
-			() -> getJobGraph().getSlotSharingGroups(),
-			() -> getJobGraph().getCoLocationGroupDescriptors());
-		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(slotAllocationContext);
+		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory)
+			.createInstance(new DefaultExecutionSlotAllocationContext());
 
 		this.verticesWaitingForRestart = new HashSet<>();
 		this.startUpAction = startUpAction;
@@ -513,4 +513,39 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
 		}
 	}
+
+	private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext {
+
+		@Override
+		public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
+				final ExecutionVertexID executionVertexId,
+				final Set<ExecutionVertexID> producersToIgnore) {
+			return getPreferredLocationsRetriever().getPreferredLocations(executionVertexId, producersToIgnore);
+		}
+
+		@Override
+		public ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
+			return getExecutionVertex(executionVertexId).getResourceProfile();
+		}
+
+		@Override
+		public AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
+			return getExecutionVertex(executionVertexId).getLatestPriorAllocation();
+		}
+
+		@Override
+		public SchedulingTopology getSchedulingTopology() {
+			return DefaultScheduler.this.getSchedulingTopology();
+		}
+
+		@Override
+		public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
+			return getJobGraph().getSlotSharingGroups();
+		}
+
+		@Override
+		public Set<CoLocationGroupDesc> getCoLocationGroups() {
+			return getJobGraph().getCoLocationGroupDescriptors();
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
similarity index 55%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
index 84f3143..a99325a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
@@ -23,18 +23,10 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
@@ -45,93 +37,80 @@ import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
 import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.util.clock.SystemClock;
 
-import org.slf4j.Logger;
-
-import javax.annotation.Nonnull;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 /**
- * Factory for {@link DefaultScheduler}.
+ * Components to create a {@link DefaultScheduler} which depends on the
+ * configured {@link JobManagerOptions#SCHEDULING_STRATEGY}.
  */
-public class DefaultSchedulerFactory implements SchedulerNGFactory {
-
-	@Override
-	public SchedulerNG createInstance(
-			final Logger log,
-			final JobGraph jobGraph,
-			final BackPressureStatsTracker backPressureStatsTracker,
-			final Executor ioExecutor,
+public class DefaultSchedulerComponents {
+
+	private static final String PIPELINED_REGION_SCHEDULING = "region";
+	private static final String LEGACY_SCHEDULING = "legacy";
+
+	private final SchedulingStrategyFactory schedulingStrategyFactory;
+	private final Consumer<ComponentMainThreadExecutor> startUpAction;
+	private final ExecutionSlotAllocatorFactory allocatorFactory;
+
+	private DefaultSchedulerComponents(
+			final SchedulingStrategyFactory schedulingStrategyFactory,
+			final Consumer<ComponentMainThreadExecutor> startUpAction,
+			final ExecutionSlotAllocatorFactory allocatorFactory) {
+
+		this.schedulingStrategyFactory = schedulingStrategyFactory;
+		this.startUpAction = startUpAction;
+		this.allocatorFactory = allocatorFactory;
+	}
+
+	SchedulingStrategyFactory getSchedulingStrategyFactory() {
+		return schedulingStrategyFactory;
+	}
+
+	Consumer<ComponentMainThreadExecutor> getStartUpAction() {
+		return startUpAction;
+	}
+
+	ExecutionSlotAllocatorFactory getAllocatorFactory() {
+		return allocatorFactory;
+	}
+
+	static DefaultSchedulerComponents createSchedulerComponents(
+			final ScheduleMode scheduleMode,
 			final Configuration jobMasterConfiguration,
 			final SlotPool slotPool,
-			final ScheduledExecutorService futureExecutor,
-			final ClassLoader userCodeLoader,
-			final CheckpointRecoveryFactory checkpointRecoveryFactory,
-			final Time rpcTimeout,
-			final BlobWriter blobWriter,
-			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
-			final Time slotRequestTimeout,
-			final ShuffleMaster<?> shuffleMaster,
-			final JobMasterPartitionTracker partitionTracker,
-			final ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
-
-		final DefaultSchedulerComponents schedulerComponents = createDefaultSchedulerComponents(
-			jobGraph.getScheduleMode(),
-			jobMasterConfiguration,
-			slotPool,
-			slotRequestTimeout);
-		final RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader
-			.createRestartBackoffTimeStrategyFactory(
-				jobGraph
-					.getSerializedExecutionConfig()
-					.deserializeValue(userCodeLoader)
-					.getRestartStrategy(),
-				jobMasterConfiguration,
-				jobGraph.isCheckpointingEnabled())
-			.create();
-		log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
-
-		return new DefaultScheduler(
-			log,
-			jobGraph,
-			backPressureStatsTracker,
-			ioExecutor,
-			jobMasterConfiguration,
-			schedulerComponents.startUpAction,
-			futureExecutor,
-			new ScheduledExecutorServiceAdapter(futureExecutor),
-			userCodeLoader,
-			checkpointRecoveryFactory,
-			rpcTimeout,
-			blobWriter,
-			jobManagerJobMetricGroup,
-			shuffleMaster,
-			partitionTracker,
-			schedulerComponents.schedulingStrategyFactory,
-			FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
-			restartBackoffTimeStrategy,
-			new DefaultExecutionVertexOperations(),
-			new ExecutionVertexVersioner(),
-			schedulerComponents.allocatorFactory,
-			executionDeploymentTracker);
+			final Time slotRequestTimeout) {
+
+		final String schedulingStrategy = jobMasterConfiguration.getString(JobManagerOptions.SCHEDULING_STRATEGY);
+		switch (schedulingStrategy) {
+			case PIPELINED_REGION_SCHEDULING:
+				return createPipelinedRegionSchedulerComponents(
+					scheduleMode,
+					jobMasterConfiguration,
+					slotPool,
+					slotRequestTimeout);
+			case LEGACY_SCHEDULING:
+				return createLegacySchedulerComponents(
+					scheduleMode,
+					jobMasterConfiguration,
+					slotPool,
+					slotRequestTimeout);
+			default:
+				throw new IllegalStateException("Unsupported scheduling strategy " + schedulingStrategy);
+		}
 	}
 
-	private static DefaultSchedulerComponents createDefaultSchedulerComponents(
+	private static DefaultSchedulerComponents createLegacySchedulerComponents(
 			final ScheduleMode scheduleMode,
 			final Configuration jobMasterConfiguration,
 			final SlotPool slotPool,
 			final Time slotRequestTimeout) {
+
 		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
 		final Scheduler scheduler = new SchedulerImpl(slotSelectionStrategy, slotPool);
 		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
@@ -139,12 +118,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			scheduler,
 			slotRequestTimeout);
 		return new DefaultSchedulerComponents(
-			createSchedulingStrategyFactory(scheduleMode),
+			createLegacySchedulingStrategyFactory(scheduleMode),
 			scheduler::start,
 			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
 	}
 
-	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
+	private static SchedulingStrategyFactory createLegacySchedulingStrategyFactory(final ScheduleMode scheduleMode) {
 		switch (scheduleMode) {
 			case EAGER:
 				return new EagerSchedulingStrategy.Factory();
@@ -161,6 +140,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			final Configuration jobMasterConfiguration,
 			final SlotPool slotPool,
 			final Time slotRequestTimeout) {
+
 		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
 		final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
 			.createFromSlotPool(slotPool, SystemClock.getInstance());
@@ -176,7 +156,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			allocatorFactory);
 	}
 
-	private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
+	private static SlotSelectionStrategy selectSlotSelectionStrategy(final Configuration configuration) {
 		final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
 
 		final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
@@ -189,19 +169,4 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy) :
 			locationPreferenceSlotSelectionStrategy;
 	}
-
-	private static class DefaultSchedulerComponents {
-		private final SchedulingStrategyFactory schedulingStrategyFactory;
-		private final Consumer<ComponentMainThreadExecutor> startUpAction;
-		private final ExecutionSlotAllocatorFactory allocatorFactory;
-
-		private DefaultSchedulerComponents(
-				final SchedulingStrategyFactory schedulingStrategyFactory,
-				final Consumer<ComponentMainThreadExecutor> startUpAction,
-				final ExecutionSlotAllocatorFactory allocatorFactory) {
-			this.schedulingStrategyFactory = schedulingStrategyFactory;
-			this.startUpAction = startUpAction;
-			this.allocatorFactory = allocatorFactory;
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 84f3143..61b7620 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -20,47 +20,27 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
-import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
-import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
-import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.util.clock.SystemClock;
 
 import org.slf4j.Logger;
 
-import javax.annotation.Nonnull;
-
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
+
+import static org.apache.flink.runtime.scheduler.DefaultSchedulerComponents.createSchedulerComponents;
 
 /**
  * Factory for {@link DefaultScheduler}.
@@ -86,7 +66,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			final JobMasterPartitionTracker partitionTracker,
 			final ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
 
-		final DefaultSchedulerComponents schedulerComponents = createDefaultSchedulerComponents(
+		final DefaultSchedulerComponents schedulerComponents = createSchedulerComponents(
 			jobGraph.getScheduleMode(),
 			jobMasterConfiguration,
 			slotPool,
@@ -108,7 +88,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			backPressureStatsTracker,
 			ioExecutor,
 			jobMasterConfiguration,
-			schedulerComponents.startUpAction,
+			schedulerComponents.getStartUpAction(),
 			futureExecutor,
 			new ScheduledExecutorServiceAdapter(futureExecutor),
 			userCodeLoader,
@@ -118,90 +98,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			jobManagerJobMetricGroup,
 			shuffleMaster,
 			partitionTracker,
-			schedulerComponents.schedulingStrategyFactory,
+			schedulerComponents.getSchedulingStrategyFactory(),
 			FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
 			restartBackoffTimeStrategy,
 			new DefaultExecutionVertexOperations(),
 			new ExecutionVertexVersioner(),
-			schedulerComponents.allocatorFactory,
+			schedulerComponents.getAllocatorFactory(),
 			executionDeploymentTracker);
 	}
-
-	private static DefaultSchedulerComponents createDefaultSchedulerComponents(
-			final ScheduleMode scheduleMode,
-			final Configuration jobMasterConfiguration,
-			final SlotPool slotPool,
-			final Time slotRequestTimeout) {
-		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
-		final Scheduler scheduler = new SchedulerImpl(slotSelectionStrategy, slotPool);
-		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
-			scheduleMode,
-			scheduler,
-			slotRequestTimeout);
-		return new DefaultSchedulerComponents(
-			createSchedulingStrategyFactory(scheduleMode),
-			scheduler::start,
-			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
-	}
-
-	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
-		switch (scheduleMode) {
-			case EAGER:
-				return new EagerSchedulingStrategy.Factory();
-			case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST:
-			case LAZY_FROM_SOURCES:
-				return new LazyFromSourcesSchedulingStrategy.Factory();
-			default:
-				throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
-		}
-	}
-
-	private static DefaultSchedulerComponents createPipelinedRegionSchedulerComponents(
-			final ScheduleMode scheduleMode,
-			final Configuration jobMasterConfiguration,
-			final SlotPool slotPool,
-			final Time slotRequestTimeout) {
-		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
-		final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
-			.createFromSlotPool(slotPool, SystemClock.getInstance());
-		final PhysicalSlotProvider physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
-		final ExecutionSlotAllocatorFactory allocatorFactory = new SlotSharingExecutionSlotAllocatorFactory(
-			physicalSlotProvider,
-			scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
-			bulkChecker,
-			slotRequestTimeout);
-		return new DefaultSchedulerComponents(
-			new PipelinedRegionSchedulingStrategy.Factory(),
-			bulkChecker::start,
-			allocatorFactory);
-	}
-
-	private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
-		final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
-
-		final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
-
-		locationPreferenceSlotSelectionStrategy = evenlySpreadOutSlots ?
-			LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut() :
-			LocationPreferenceSlotSelectionStrategy.createDefault();
-
-		return configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY) ?
-			PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy) :
-			locationPreferenceSlotSelectionStrategy;
-	}
-
-	private static class DefaultSchedulerComponents {
-		private final SchedulingStrategyFactory schedulingStrategyFactory;
-		private final Consumer<ComponentMainThreadExecutor> startUpAction;
-		private final ExecutionSlotAllocatorFactory allocatorFactory;
-
-		private DefaultSchedulerComponents(
-				final SchedulingStrategyFactory schedulingStrategyFactory,
-				final Consumer<ComponentMainThreadExecutor> startUpAction,
-				final ExecutionSlotAllocatorFactory allocatorFactory) {
-			this.schedulingStrategyFactory = schedulingStrategyFactory;
-			this.startUpAction = startUpAction;
-			this.allocatorFactory = allocatorFactory;
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
index 8180c1f..268ce89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
@@ -25,71 +25,48 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
-import java.util.Collection;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.function.Supplier;
 
 /**
  * Context for slot allocation.
  */
-class ExecutionSlotAllocationContext implements PreferredLocationsRetriever {
-
-	private final PreferredLocationsRetriever preferredLocationsRetriever;
-
-	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
-
-	private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
-
-	private final SchedulingTopology schedulingTopology;
-
-	private final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier;
-
-	private final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier;
-
-	ExecutionSlotAllocationContext(
-			final PreferredLocationsRetriever preferredLocationsRetriever,
-			final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
-			final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
-			final SchedulingTopology schedulingTopology,
-			final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier,
-			final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier) {
-
-		this.preferredLocationsRetriever = preferredLocationsRetriever;
-		this.resourceProfileRetriever = resourceProfileRetriever;
-		this.priorAllocationIdRetriever = priorAllocationIdRetriever;
-		this.schedulingTopology = schedulingTopology;
-		this.logicalSlotSharingGroupSupplier = logicalSlotSharingGroupSupplier;
-		this.coLocationGroupSupplier = coLocationGroupSupplier;
-	}
-
-	@Override
-	public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
-			final ExecutionVertexID executionVertexId,
-			final Set<ExecutionVertexID> producersToIgnore) {
-		return preferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore);
-	}
-
-	ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
-		return resourceProfileRetriever.apply(executionVertexId);
-	}
-
-	AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
-		return priorAllocationIdRetriever.apply(executionVertexId);
-	}
-
-	SchedulingTopology getSchedulingTopology() {
-		return schedulingTopology;
-	}
-
-	Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
-		return logicalSlotSharingGroupSupplier.get();
-	}
-
-	Set<CoLocationGroupDesc> getCoLocationGroups() {
-		return coLocationGroupSupplier.get();
-	}
+interface ExecutionSlotAllocationContext extends PreferredLocationsRetriever {
+
+	/**
+	 * Returns required resources for an execution vertex.
+	 *
+	 * @param executionVertexId id of the execution vertex
+	 * @return required resources for the given execution vertex
+	 */
+	ResourceProfile getResourceProfile(ExecutionVertexID executionVertexId);
+
+	/**
+	 * Returns prior allocation id for an execution vertex.
+	 *
+	 * @param executionVertexId id of the execution vertex
+	 * @return prior allocation id for the given execution vertex
+	 */
+	AllocationID getPriorAllocationId(ExecutionVertexID executionVertexId);
+
+	/**
+	 * Returns the scheduling topology containing all execution vertices and edges.
+	 *
+	 * @return scheduling topology
+	 */
+	SchedulingTopology getSchedulingTopology();
+
+	/**
+	 * Returns all slot sharing groups in the job.
+	 *
+	 * @return all slot sharing groups in the job
+	 */
+	Set<SlotSharingGroup> getLogicalSlotSharingGroups();
+
+	/**
+	 * Returns all co-location groups in the job.
+	 *
+	 * @return all co-location groups in the job
+	 */
+	Set<CoLocationGroupDesc> getCoLocationGroups();
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
new file mode 100644
index 0000000..572ab3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
+import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the factory method {@link DefaultSchedulerComponents#createSchedulerComponents(
+ * ScheduleMode, Configuration, SlotPool, Time)}.
+ */
+public class DefaultSchedulerComponentsFactoryTest extends TestLogger {
+
+	@Test
+	public void testCreatingPipelinedSchedulingStrategyFactory() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "region");
+
+		final DefaultSchedulerComponents components = createSchedulerComponents(configuration);
+		assertThat(components.getSchedulingStrategyFactory(), instanceOf(PipelinedRegionSchedulingStrategy.Factory.class));
+		assertThat(components.getAllocatorFactory(), instanceOf(SlotSharingExecutionSlotAllocatorFactory.class));
+	}
+
+	@Test
+	public void testCreatingLegacySchedulingStrategyFactory() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");
+
+		final DefaultSchedulerComponents components = createSchedulerComponents(configuration);
+		assertThat(components.getSchedulingStrategyFactory(), instanceOf(LazyFromSourcesSchedulingStrategy.Factory.class));
+		assertThat(components.getAllocatorFactory(), instanceOf(DefaultExecutionSlotAllocatorFactory.class));
+	}
+
+	@Test
+	public void testCreatingLegacySchedulingStrategyFactoryByDefault() {
+		final DefaultSchedulerComponents components = createSchedulerComponents(new Configuration());
+		assertThat(components.getSchedulingStrategyFactory(), instanceOf(LazyFromSourcesSchedulingStrategy.Factory.class));
+	}
+
+	private static DefaultSchedulerComponents createSchedulerComponents(final Configuration configuration) {
+		return DefaultSchedulerComponents.createSchedulerComponents(
+			ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+			configuration,
+			new TestingSlotPoolImpl(new JobID()),
+			Time.milliseconds(10L));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index ec88777..c92b729 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureSta
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -368,7 +369,7 @@ public class SchedulerTestingUtils {
 	public static class DefaultSchedulerBuilder {
 		private final JobGraph jobGraph;
 
-		private SchedulingStrategyFactory schedulingStrategyFactory;
+		private SchedulingStrategyFactory schedulingStrategyFactory = new PipelinedRegionSchedulingStrategy.Factory();
 
 		private Logger log = LOG;
 		private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
@@ -391,9 +392,6 @@ public class SchedulerTestingUtils {
 
 		private DefaultSchedulerBuilder(final JobGraph jobGraph) {
 			this.jobGraph = jobGraph;
-
-			// scheduling strategy is by default set according to the scheduleMode. It can be re-assigned later.
-			this.schedulingStrategyFactory = DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
 		}
 
 		public DefaultSchedulerBuilder setLogger(final Logger log) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
new file mode 100644
index 0000000..df2c1ec
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for pipelined region scheduling.
+ */
+public class PipelinedRegionSchedulingITCase extends TestLogger {
+
+	@Test
+	public void testSuccessWithSlotsNoFewerThanTheMaxRegionRequired() throws Exception {
+		final JobResult jobResult = executeSchedulingTest(2);
+		assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+	}
+
+	@Test
+	public void testFailsOnInsufficientSlots() throws Exception {
+		final JobResult jobResult = executeSchedulingTest(1);
+		assertThat(jobResult.getSerializedThrowable().isPresent(), is(true));
+
+		final Throwable jobFailure = jobResult
+			.getSerializedThrowable()
+			.get()
+			.deserializeError(ClassLoader.getSystemClassLoader());
+
+		final Optional<NoResourceAvailableException> cause = ExceptionUtils.findThrowable(
+			jobFailure,
+			NoResourceAvailableException.class);
+		assertThat(cause.isPresent(), is(true));
+		assertThat(cause.get().getMessage(), containsString("Slot request bulk is not fulfillable!"));
+	}
+
+	private JobResult executeSchedulingTest(int numSlots) throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(RestOptions.BIND_PORT, "0");
+		configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
+		configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "region");
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(numSlots)
+			.build();
+
+		try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
+			miniCluster.start();
+
+			final MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
+
+			final JobGraph jobGraph = createJobGraph(10);
+
+			// wait for the submission to succeed
+			final JobID jobID = miniClusterClient.submitJob(jobGraph).get();
+
+			final CompletableFuture<JobResult> resultFuture = miniClusterClient.requestJobResult(jobID);
+
+			final JobResult jobResult = resultFuture.get();
+
+			return jobResult;
+		}
+	}
+
+	private JobGraph createJobGraph(final int parallelism) {
+		final SlotSharingGroup group1 = new SlotSharingGroup();
+		final JobVertex source1 = new JobVertex("source1");
+		source1.setInvokableClass(PipelinedSender.class);
+		source1.setParallelism(parallelism * 2);
+		source1.setSlotSharingGroup(group1);
+
+		final SlotSharingGroup group2 = new SlotSharingGroup();
+		final JobVertex source2 = new JobVertex("source2");
+		source2.setInvokableClass(NoOpInvokable.class);
+		source2.setParallelism(parallelism);
+		source2.setSlotSharingGroup(group2);
+
+		final JobVertex sink = new JobVertex("sink");
+		sink.setInvokableClass(Receiver.class);
+		sink.setParallelism(parallelism);
+		sink.setSlotSharingGroup(group1);
+
+		sink.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		sink.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		final JobGraph jobGraph = new JobGraph(source1, source2, sink);
+
+		jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
+
+		return jobGraph;
+	}
+
+	/**
+	 * This invokable is used by source1. It sends data to trigger the scheduling
+	 * of the sink task. It will also wait for a bit time before finishing itself,
+	 * so that the scheduled sink task can directly use its slot.
+	 */
+	public static class PipelinedSender extends AbstractInvokable {
+
+		public PipelinedSender(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			if (getEnvironment().getAllWriters().length < 1) {
+				throw new IllegalStateException();
+			}
+
+			final RecordWriter<IntValue> writer = new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
+
+			try {
+				writer.emit(new IntValue(42));
+				writer.flushAll();
+			} finally {
+				writer.clearBuffers();
+			}
+
+			if (getIndexInSubtaskGroup() == 0) {
+				Thread.sleep(2000);
+			}
+		}
+	}
+
+	/**
+	 * This invokable finishes only after all its upstream task finishes.
+	 * Unexpected result partition errors can happen if a task finished
+	 * later than its consumer task.
+	 */
+	public static class Receiver extends AbstractInvokable {
+
+		public Receiver(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			if (getEnvironment().getAllInputGates().length < 2) {
+				throw new IllegalStateException();
+			}
+
+			final String[] tmpDirs = getEnvironment().getTaskManagerInfo().getTmpDirectories();
+			final List<RecordReader<IntValue>> readers = Arrays.asList(getEnvironment().getAllInputGates())
+				.stream()
+				.map(inputGate -> new RecordReader<>(inputGate, IntValue.class, tmpDirs))
+				.collect(Collectors.toList());
+
+			for (RecordReader<IntValue> reader : readers) {
+				while (reader.hasNext()) {
+					reader.next();
+				}
+			}
+		}
+	}
+}