You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/11 14:25:27 UTC

[flink] 03/03: [FLINK-17016][runtime] Enable to use pipelined region scheduling strategy

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4df2295f54709f4292888a3b4fcbb019dd4d7901
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Sep 10 20:31:42 2020 +0800

    [FLINK-17016][runtime] Enable to use pipelined region scheduling strategy
    
    It can be enabled via config option "jobmanager.scheduler.scheduling-strategy=region"
---
 .../flink/configuration/JobManagerOptions.java     |  18 ++
 .../flink/runtime/scheduler/DefaultScheduler.java  |  51 ++++-
 ...actory.java => DefaultSchedulerComponents.java} | 157 ++++++----------
 .../runtime/scheduler/DefaultSchedulerFactory.java | 110 +----------
 .../scheduler/ExecutionSlotAllocationContext.java  |  99 ++++------
 .../DefaultSchedulerComponentsFactoryTest.java     |  77 ++++++++
 .../runtime/scheduler/SchedulerTestingUtils.java   |   6 +-
 .../PipelinedRegionSchedulingITCase.java           | 205 +++++++++++++++++++++
 8 files changed, 450 insertions(+), 273 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index aa3dc9a..ed8be72 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -326,6 +326,7 @@ public class JobManagerOptions {
 			// default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
 			.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
 			.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
+
 	/**
 	 * Config parameter determining the scheduler implementation.
 	 */
@@ -339,6 +340,23 @@ public class JobManagerOptions {
 				.list(
 					text("'ng': new generation scheduler"))
 				.build());
+
+	/**
+	 * Config parameter determining the scheduling strategy.
+	 */
+	@Documentation.ExcludeFromDocumentation("User normally should not be expected to change this config.")
+	public static final ConfigOption<String> SCHEDULING_STRATEGY =
+		key("jobmanager.scheduler.scheduling-strategy")
+			.stringType()
+			.defaultValue("legacy")
+			.withDescription(Description.builder()
+				.text("Determines which scheduling strategy is used to schedule tasks. Accepted values are:")
+				.list(
+					text("'region': pipelined region scheduling"),
+					text("'legacy': legacy scheduling strategy, which is eager scheduling for streaming jobs " +
+						"and lazy from sources scheduling for batch jobs"))
+				.build());
+
 	/**
 	 * Config parameter controlling whether partitions should already be released during the job execution.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index db748bd..fa27baa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -38,7 +40,9 @@ import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.ThrowingSlotProvider;
@@ -48,8 +52,10 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureSta
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -161,14 +167,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			restartBackoffTimeStrategy);
 		this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
 
-		final ExecutionSlotAllocationContext slotAllocationContext = new ExecutionSlotAllocationContext(
-			getPreferredLocationsRetriever(),
-			executionVertexID -> getExecutionVertex(executionVertexID).getResourceProfile(),
-			executionVertexID -> getExecutionVertex(executionVertexID).getLatestPriorAllocation(),
-			getSchedulingTopology(),
-			() -> getJobGraph().getSlotSharingGroups(),
-			() -> getJobGraph().getCoLocationGroupDescriptors());
-		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(slotAllocationContext);
+		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory)
+			.createInstance(new DefaultExecutionSlotAllocationContext());
 
 		this.verticesWaitingForRestart = new HashSet<>();
 		this.startUpAction = startUpAction;
@@ -513,4 +513,39 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
 		}
 	}
+
+	private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext {
+
+		@Override
+		public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
+				final ExecutionVertexID executionVertexId,
+				final Set<ExecutionVertexID> producersToIgnore) {
+			return getPreferredLocationsRetriever().getPreferredLocations(executionVertexId, producersToIgnore);
+		}
+
+		@Override
+		public ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
+			return getExecutionVertex(executionVertexId).getResourceProfile();
+		}
+
+		@Override
+		public AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
+			return getExecutionVertex(executionVertexId).getLatestPriorAllocation();
+		}
+
+		@Override
+		public SchedulingTopology getSchedulingTopology() {
+			return DefaultScheduler.this.getSchedulingTopology();
+		}
+
+		@Override
+		public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
+			return getJobGraph().getSlotSharingGroups();
+		}
+
+		@Override
+		public Set<CoLocationGroupDesc> getCoLocationGroups() {
+			return getJobGraph().getCoLocationGroupDescriptors();
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
similarity index 55%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
index 84f3143..a99325a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
@@ -23,18 +23,10 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
@@ -45,93 +37,80 @@ import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
 import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.util.clock.SystemClock;
 
-import org.slf4j.Logger;
-
-import javax.annotation.Nonnull;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 /**
- * Factory for {@link DefaultScheduler}.
+ * Components to create a {@link DefaultScheduler} which depends on the
+ * configured {@link JobManagerOptions#SCHEDULING_STRATEGY}.
  */
-public class DefaultSchedulerFactory implements SchedulerNGFactory {
-
-	@Override
-	public SchedulerNG createInstance(
-			final Logger log,
-			final JobGraph jobGraph,
-			final BackPressureStatsTracker backPressureStatsTracker,
-			final Executor ioExecutor,
+public class DefaultSchedulerComponents {
+
+	private static final String PIPELINED_REGION_SCHEDULING = "region";
+	private static final String LEGACY_SCHEDULING = "legacy";
+
+	private final SchedulingStrategyFactory schedulingStrategyFactory;
+	private final Consumer<ComponentMainThreadExecutor> startUpAction;
+	private final ExecutionSlotAllocatorFactory allocatorFactory;
+
+	private DefaultSchedulerComponents(
+			final SchedulingStrategyFactory schedulingStrategyFactory,
+			final Consumer<ComponentMainThreadExecutor> startUpAction,
+			final ExecutionSlotAllocatorFactory allocatorFactory) {
+
+		this.schedulingStrategyFactory = schedulingStrategyFactory;
+		this.startUpAction = startUpAction;
+		this.allocatorFactory = allocatorFactory;
+	}
+
+	SchedulingStrategyFactory getSchedulingStrategyFactory() {
+		return schedulingStrategyFactory;
+	}
+
+	Consumer<ComponentMainThreadExecutor> getStartUpAction() {
+		return startUpAction;
+	}
+
+	ExecutionSlotAllocatorFactory getAllocatorFactory() {
+		return allocatorFactory;
+	}
+
+	static DefaultSchedulerComponents createSchedulerComponents(
+			final ScheduleMode scheduleMode,
 			final Configuration jobMasterConfiguration,
 			final SlotPool slotPool,
-			final ScheduledExecutorService futureExecutor,
-			final ClassLoader userCodeLoader,
-			final CheckpointRecoveryFactory checkpointRecoveryFactory,
-			final Time rpcTimeout,
-			final BlobWriter blobWriter,
-			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
-			final Time slotRequestTimeout,
-			final ShuffleMaster<?> shuffleMaster,
-			final JobMasterPartitionTracker partitionTracker,
-			final ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
-
-		final DefaultSchedulerComponents schedulerComponents = createDefaultSchedulerComponents(
-			jobGraph.getScheduleMode(),
-			jobMasterConfiguration,
-			slotPool,
-			slotRequestTimeout);
-		final RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader
-			.createRestartBackoffTimeStrategyFactory(
-				jobGraph
-					.getSerializedExecutionConfig()
-					.deserializeValue(userCodeLoader)
-					.getRestartStrategy(),
-				jobMasterConfiguration,
-				jobGraph.isCheckpointingEnabled())
-			.create();
-		log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
-
-		return new DefaultScheduler(
-			log,
-			jobGraph,
-			backPressureStatsTracker,
-			ioExecutor,
-			jobMasterConfiguration,
-			schedulerComponents.startUpAction,
-			futureExecutor,
-			new ScheduledExecutorServiceAdapter(futureExecutor),
-			userCodeLoader,
-			checkpointRecoveryFactory,
-			rpcTimeout,
-			blobWriter,
-			jobManagerJobMetricGroup,
-			shuffleMaster,
-			partitionTracker,
-			schedulerComponents.schedulingStrategyFactory,
-			FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
-			restartBackoffTimeStrategy,
-			new DefaultExecutionVertexOperations(),
-			new ExecutionVertexVersioner(),
-			schedulerComponents.allocatorFactory,
-			executionDeploymentTracker);
+			final Time slotRequestTimeout) {
+
+		final String schedulingStrategy = jobMasterConfiguration.getString(JobManagerOptions.SCHEDULING_STRATEGY);
+		switch (schedulingStrategy) {
+			case PIPELINED_REGION_SCHEDULING:
+				return createPipelinedRegionSchedulerComponents(
+					scheduleMode,
+					jobMasterConfiguration,
+					slotPool,
+					slotRequestTimeout);
+			case LEGACY_SCHEDULING:
+				return createLegacySchedulerComponents(
+					scheduleMode,
+					jobMasterConfiguration,
+					slotPool,
+					slotRequestTimeout);
+			default:
+				throw new IllegalStateException("Unsupported scheduling strategy " + schedulingStrategy);
+		}
 	}
 
-	private static DefaultSchedulerComponents createDefaultSchedulerComponents(
+	private static DefaultSchedulerComponents createLegacySchedulerComponents(
 			final ScheduleMode scheduleMode,
 			final Configuration jobMasterConfiguration,
 			final SlotPool slotPool,
 			final Time slotRequestTimeout) {
+
 		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
 		final Scheduler scheduler = new SchedulerImpl(slotSelectionStrategy, slotPool);
 		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
@@ -139,12 +118,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			scheduler,
 			slotRequestTimeout);
 		return new DefaultSchedulerComponents(
-			createSchedulingStrategyFactory(scheduleMode),
+			createLegacySchedulingStrategyFactory(scheduleMode),
 			scheduler::start,
 			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
 	}
 
-	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
+	private static SchedulingStrategyFactory createLegacySchedulingStrategyFactory(final ScheduleMode scheduleMode) {
 		switch (scheduleMode) {
 			case EAGER:
 				return new EagerSchedulingStrategy.Factory();
@@ -161,6 +140,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			final Configuration jobMasterConfiguration,
 			final SlotPool slotPool,
 			final Time slotRequestTimeout) {
+
 		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
 		final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
 			.createFromSlotPool(slotPool, SystemClock.getInstance());
@@ -176,7 +156,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			allocatorFactory);
 	}
 
-	private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
+	private static SlotSelectionStrategy selectSlotSelectionStrategy(final Configuration configuration) {
 		final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
 
 		final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
@@ -189,19 +169,4 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy) :
 			locationPreferenceSlotSelectionStrategy;
 	}
-
-	private static class DefaultSchedulerComponents {
-		private final SchedulingStrategyFactory schedulingStrategyFactory;
-		private final Consumer<ComponentMainThreadExecutor> startUpAction;
-		private final ExecutionSlotAllocatorFactory allocatorFactory;
-
-		private DefaultSchedulerComponents(
-				final SchedulingStrategyFactory schedulingStrategyFactory,
-				final Consumer<ComponentMainThreadExecutor> startUpAction,
-				final ExecutionSlotAllocatorFactory allocatorFactory) {
-			this.schedulingStrategyFactory = schedulingStrategyFactory;
-			this.startUpAction = startUpAction;
-			this.allocatorFactory = allocatorFactory;
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 84f3143..61b7620 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -20,47 +20,27 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
-import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
-import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
-import org.apache.flink.runtime.jobmaster.slotpool.PreviousAllocationSlotSelectionStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
-import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
-import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
-import org.apache.flink.util.clock.SystemClock;
 
 import org.slf4j.Logger;
 
-import javax.annotation.Nonnull;
-
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Consumer;
+
+import static org.apache.flink.runtime.scheduler.DefaultSchedulerComponents.createSchedulerComponents;
 
 /**
  * Factory for {@link DefaultScheduler}.
@@ -86,7 +66,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			final JobMasterPartitionTracker partitionTracker,
 			final ExecutionDeploymentTracker executionDeploymentTracker) throws Exception {
 
-		final DefaultSchedulerComponents schedulerComponents = createDefaultSchedulerComponents(
+		final DefaultSchedulerComponents schedulerComponents = createSchedulerComponents(
 			jobGraph.getScheduleMode(),
 			jobMasterConfiguration,
 			slotPool,
@@ -108,7 +88,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			backPressureStatsTracker,
 			ioExecutor,
 			jobMasterConfiguration,
-			schedulerComponents.startUpAction,
+			schedulerComponents.getStartUpAction(),
 			futureExecutor,
 			new ScheduledExecutorServiceAdapter(futureExecutor),
 			userCodeLoader,
@@ -118,90 +98,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			jobManagerJobMetricGroup,
 			shuffleMaster,
 			partitionTracker,
-			schedulerComponents.schedulingStrategyFactory,
+			schedulerComponents.getSchedulingStrategyFactory(),
 			FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
 			restartBackoffTimeStrategy,
 			new DefaultExecutionVertexOperations(),
 			new ExecutionVertexVersioner(),
-			schedulerComponents.allocatorFactory,
+			schedulerComponents.getAllocatorFactory(),
 			executionDeploymentTracker);
 	}
-
-	private static DefaultSchedulerComponents createDefaultSchedulerComponents(
-			final ScheduleMode scheduleMode,
-			final Configuration jobMasterConfiguration,
-			final SlotPool slotPool,
-			final Time slotRequestTimeout) {
-		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
-		final Scheduler scheduler = new SchedulerImpl(slotSelectionStrategy, slotPool);
-		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
-			scheduleMode,
-			scheduler,
-			slotRequestTimeout);
-		return new DefaultSchedulerComponents(
-			createSchedulingStrategyFactory(scheduleMode),
-			scheduler::start,
-			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
-	}
-
-	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
-		switch (scheduleMode) {
-			case EAGER:
-				return new EagerSchedulingStrategy.Factory();
-			case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST:
-			case LAZY_FROM_SOURCES:
-				return new LazyFromSourcesSchedulingStrategy.Factory();
-			default:
-				throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
-		}
-	}
-
-	private static DefaultSchedulerComponents createPipelinedRegionSchedulerComponents(
-			final ScheduleMode scheduleMode,
-			final Configuration jobMasterConfiguration,
-			final SlotPool slotPool,
-			final Time slotRequestTimeout) {
-		final SlotSelectionStrategy slotSelectionStrategy = selectSlotSelectionStrategy(jobMasterConfiguration);
-		final PhysicalSlotRequestBulkChecker bulkChecker = PhysicalSlotRequestBulkCheckerImpl
-			.createFromSlotPool(slotPool, SystemClock.getInstance());
-		final PhysicalSlotProvider physicalSlotProvider = new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);
-		final ExecutionSlotAllocatorFactory allocatorFactory = new SlotSharingExecutionSlotAllocatorFactory(
-			physicalSlotProvider,
-			scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
-			bulkChecker,
-			slotRequestTimeout);
-		return new DefaultSchedulerComponents(
-			new PipelinedRegionSchedulingStrategy.Factory(),
-			bulkChecker::start,
-			allocatorFactory);
-	}
-
-	private static SlotSelectionStrategy selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
-		final boolean evenlySpreadOutSlots = configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
-
-		final SlotSelectionStrategy locationPreferenceSlotSelectionStrategy;
-
-		locationPreferenceSlotSelectionStrategy = evenlySpreadOutSlots ?
-			LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut() :
-			LocationPreferenceSlotSelectionStrategy.createDefault();
-
-		return configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY) ?
-			PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy) :
-			locationPreferenceSlotSelectionStrategy;
-	}
-
-	private static class DefaultSchedulerComponents {
-		private final SchedulingStrategyFactory schedulingStrategyFactory;
-		private final Consumer<ComponentMainThreadExecutor> startUpAction;
-		private final ExecutionSlotAllocatorFactory allocatorFactory;
-
-		private DefaultSchedulerComponents(
-				final SchedulingStrategyFactory schedulingStrategyFactory,
-				final Consumer<ComponentMainThreadExecutor> startUpAction,
-				final ExecutionSlotAllocatorFactory allocatorFactory) {
-			this.schedulingStrategyFactory = schedulingStrategyFactory;
-			this.startUpAction = startUpAction;
-			this.allocatorFactory = allocatorFactory;
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
index 8180c1f..268ce89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
@@ -25,71 +25,48 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
-import java.util.Collection;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.function.Supplier;
 
 /**
  * Context for slot allocation.
  */
-class ExecutionSlotAllocationContext implements PreferredLocationsRetriever {
-
-	private final PreferredLocationsRetriever preferredLocationsRetriever;
-
-	private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;
-
-	private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;
-
-	private final SchedulingTopology schedulingTopology;
-
-	private final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier;
-
-	private final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier;
-
-	ExecutionSlotAllocationContext(
-			final PreferredLocationsRetriever preferredLocationsRetriever,
-			final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever,
-			final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever,
-			final SchedulingTopology schedulingTopology,
-			final Supplier<Set<SlotSharingGroup>> logicalSlotSharingGroupSupplier,
-			final Supplier<Set<CoLocationGroupDesc>> coLocationGroupSupplier) {
-
-		this.preferredLocationsRetriever = preferredLocationsRetriever;
-		this.resourceProfileRetriever = resourceProfileRetriever;
-		this.priorAllocationIdRetriever = priorAllocationIdRetriever;
-		this.schedulingTopology = schedulingTopology;
-		this.logicalSlotSharingGroupSupplier = logicalSlotSharingGroupSupplier;
-		this.coLocationGroupSupplier = coLocationGroupSupplier;
-	}
-
-	@Override
-	public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(
-			final ExecutionVertexID executionVertexId,
-			final Set<ExecutionVertexID> producersToIgnore) {
-		return preferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore);
-	}
-
-	ResourceProfile getResourceProfile(final ExecutionVertexID executionVertexId) {
-		return resourceProfileRetriever.apply(executionVertexId);
-	}
-
-	AllocationID getPriorAllocationId(final ExecutionVertexID executionVertexId) {
-		return priorAllocationIdRetriever.apply(executionVertexId);
-	}
-
-	SchedulingTopology getSchedulingTopology() {
-		return schedulingTopology;
-	}
-
-	Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
-		return logicalSlotSharingGroupSupplier.get();
-	}
-
-	Set<CoLocationGroupDesc> getCoLocationGroups() {
-		return coLocationGroupSupplier.get();
-	}
+interface ExecutionSlotAllocationContext extends PreferredLocationsRetriever {
+
+	/**
+	 * Returns required resources for an execution vertex.
+	 *
+	 * @param executionVertexId id of the execution vertex
+	 * @return required resources for the given execution vertex
+	 */
+	ResourceProfile getResourceProfile(ExecutionVertexID executionVertexId);
+
+	/**
+	 * Returns prior allocation id for an execution vertex.
+	 *
+	 * @param executionVertexId id of the execution vertex
+	 * @return prior allocation id for the given execution vertex
+	 */
+	AllocationID getPriorAllocationId(ExecutionVertexID executionVertexId);
+
+	/**
+	 * Returns the scheduling topology containing all execution vertices and edges.
+	 *
+	 * @return scheduling topology
+	 */
+	SchedulingTopology getSchedulingTopology();
+
+	/**
+	 * Returns all slot sharing groups in the job.
+	 *
+	 * @return all slot sharing groups in the job
+	 */
+	Set<SlotSharingGroup> getLogicalSlotSharingGroups();
+
+	/**
+	 * Returns all co-location groups in the job.
+	 *
+	 * @return all co-location groups in the job
+	 */
+	Set<CoLocationGroupDesc> getCoLocationGroups();
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
new file mode 100644
index 0000000..572ab3a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponentsFactoryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
+import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the factory method {@link DefaultSchedulerComponents#createSchedulerComponents(
+ * ScheduleMode, Configuration, SlotPool, Time)}.
+ */
+public class DefaultSchedulerComponentsFactoryTest extends TestLogger {
+
+	@Test
+	public void testCreatingPipelinedSchedulingStrategyFactory() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "region");
+
+		final DefaultSchedulerComponents components = createSchedulerComponents(configuration);
+		assertThat(components.getSchedulingStrategyFactory(), instanceOf(PipelinedRegionSchedulingStrategy.Factory.class));
+		assertThat(components.getAllocatorFactory(), instanceOf(SlotSharingExecutionSlotAllocatorFactory.class));
+	}
+
+	@Test
+	public void testCreatingLegacySchedulingStrategyFactory() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "legacy");
+
+		final DefaultSchedulerComponents components = createSchedulerComponents(configuration);
+		assertThat(components.getSchedulingStrategyFactory(), instanceOf(LazyFromSourcesSchedulingStrategy.Factory.class));
+		assertThat(components.getAllocatorFactory(), instanceOf(DefaultExecutionSlotAllocatorFactory.class));
+	}
+
+	@Test
+	public void testCreatingLegacySchedulingStrategyFactoryByDefault() {
+		final DefaultSchedulerComponents components = createSchedulerComponents(new Configuration());
+		assertThat(components.getSchedulingStrategyFactory(), instanceOf(LazyFromSourcesSchedulingStrategy.Factory.class));
+	}
+
+	private static DefaultSchedulerComponents createSchedulerComponents(final Configuration configuration) {
+		return DefaultSchedulerComponents.createSchedulerComponents(
+			ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+			configuration,
+			new TestingSlotPoolImpl(new JobID()),
+			Time.milliseconds(10L));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index ec88777..c92b729 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureSta
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -368,7 +369,7 @@ public class SchedulerTestingUtils {
 	public static class DefaultSchedulerBuilder {
 		private final JobGraph jobGraph;
 
-		private SchedulingStrategyFactory schedulingStrategyFactory;
+		private SchedulingStrategyFactory schedulingStrategyFactory = new PipelinedRegionSchedulingStrategy.Factory();
 
 		private Logger log = LOG;
 		private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
@@ -391,9 +392,6 @@ public class SchedulerTestingUtils {
 
 		private DefaultSchedulerBuilder(final JobGraph jobGraph) {
 			this.jobGraph = jobGraph;
-
-			// scheduling strategy is by default set according to the scheduleMode. It can be re-assigned later.
-			this.schedulingStrategyFactory = DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
 		}
 
 		public DefaultSchedulerBuilder setLogger(final Logger log) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
new file mode 100644
index 0000000..df2c1ec
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.scheduling;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT case for pipelined region scheduling.
+ */
+public class PipelinedRegionSchedulingITCase extends TestLogger {
+
+	@Test
+	public void testSuccessWithSlotsNoFewerThanTheMaxRegionRequired() throws Exception {
+		final JobResult jobResult = executeSchedulingTest(2);
+		assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+	}
+
+	@Test
+	public void testFailsOnInsufficientSlots() throws Exception {
+		final JobResult jobResult = executeSchedulingTest(1);
+		assertThat(jobResult.getSerializedThrowable().isPresent(), is(true));
+
+		final Throwable jobFailure = jobResult
+			.getSerializedThrowable()
+			.get()
+			.deserializeError(ClassLoader.getSystemClassLoader());
+
+		final Optional<NoResourceAvailableException> cause = ExceptionUtils.findThrowable(
+			jobFailure,
+			NoResourceAvailableException.class);
+		assertThat(cause.isPresent(), is(true));
+		assertThat(cause.get().getMessage(), containsString("Slot request bulk is not fulfillable!"));
+	}
+
+	private JobResult executeSchedulingTest(int numSlots) throws Exception {
+		final Configuration configuration = new Configuration();
+		configuration.setString(RestOptions.BIND_PORT, "0");
+		configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
+		configuration.setString(JobManagerOptions.SCHEDULING_STRATEGY, "region");
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(numSlots)
+			.build();
+
+		try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
+			miniCluster.start();
+
+			final MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
+
+			final JobGraph jobGraph = createJobGraph(10);
+
+			// wait for the submission to succeed
+			final JobID jobID = miniClusterClient.submitJob(jobGraph).get();
+
+			final CompletableFuture<JobResult> resultFuture = miniClusterClient.requestJobResult(jobID);
+
+			final JobResult jobResult = resultFuture.get();
+
+			return jobResult;
+		}
+	}
+
+	private JobGraph createJobGraph(final int parallelism) {
+		final SlotSharingGroup group1 = new SlotSharingGroup();
+		final JobVertex source1 = new JobVertex("source1");
+		source1.setInvokableClass(PipelinedSender.class);
+		source1.setParallelism(parallelism * 2);
+		source1.setSlotSharingGroup(group1);
+
+		final SlotSharingGroup group2 = new SlotSharingGroup();
+		final JobVertex source2 = new JobVertex("source2");
+		source2.setInvokableClass(NoOpInvokable.class);
+		source2.setParallelism(parallelism);
+		source2.setSlotSharingGroup(group2);
+
+		final JobVertex sink = new JobVertex("sink");
+		sink.setInvokableClass(Receiver.class);
+		sink.setParallelism(parallelism);
+		sink.setSlotSharingGroup(group1);
+
+		sink.connectNewDataSetAsInput(source1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		sink.connectNewDataSetAsInput(source2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		final JobGraph jobGraph = new JobGraph(source1, source2, sink);
+
+		jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
+
+		return jobGraph;
+	}
+
+	/**
+	 * This invokable is used by source1. It sends data to trigger the scheduling
+	 * of the sink task. It will also wait for a bit time before finishing itself,
+	 * so that the scheduled sink task can directly use its slot.
+	 */
+	public static class PipelinedSender extends AbstractInvokable {
+
+		public PipelinedSender(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			if (getEnvironment().getAllWriters().length < 1) {
+				throw new IllegalStateException();
+			}
+
+			final RecordWriter<IntValue> writer = new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
+
+			try {
+				writer.emit(new IntValue(42));
+				writer.flushAll();
+			} finally {
+				writer.clearBuffers();
+			}
+
+			if (getIndexInSubtaskGroup() == 0) {
+				Thread.sleep(2000);
+			}
+		}
+	}
+
+	/**
+	 * This invokable finishes only after all its upstream task finishes.
+	 * Unexpected result partition errors can happen if a task finished
+	 * later than its consumer task.
+	 */
+	public static class Receiver extends AbstractInvokable {
+
+		public Receiver(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			if (getEnvironment().getAllInputGates().length < 2) {
+				throw new IllegalStateException();
+			}
+
+			final String[] tmpDirs = getEnvironment().getTaskManagerInfo().getTmpDirectories();
+			final List<RecordReader<IntValue>> readers = Arrays.asList(getEnvironment().getAllInputGates())
+				.stream()
+				.map(inputGate -> new RecordReader<>(inputGate, IntValue.class, tmpDirs))
+				.collect(Collectors.toList());
+
+			for (RecordReader<IntValue> reader : readers) {
+				while (reader.hasNext()) {
+					reader.next();
+				}
+			}
+		}
+	}
+}