You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/02/29 05:50:41 UTC

[flink] branch master updated (622bd31 -> 9e0465d)

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 622bd31  Fixup! [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
     new 0abab7e  Revert "Fixup! [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing"
     new 1cc6c79  Revert "[FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing"
     new 9e0465d  [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[flink] 01/03: Revert "Fixup! [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing"

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0abab7e6ade5e762fecf6b91a59cbc8ce80b0599
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Feb 29 13:47:48 2020 +0800

    Revert "Fixup! [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing"
    
    This reverts commit 622bd31ab8e48afa120b9bb37e8fdc5fdb04f193.
---
 .../org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 2a2c73e..b9d81e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -66,7 +66,7 @@ import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
@@ -278,11 +278,11 @@ public class SchedulerTestingUtils {
 
 		private Logger log = LOG;
 		private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
-		private Executor ioExecutor = TestingUtils.defaultExecutor();
+		private Executor ioExecutor = java.util.concurrent.Executors.newSingleThreadExecutor();
 		private Configuration jobMasterConfiguration = new Configuration();
-		private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+		private ScheduledExecutorService futureExecutor = new DirectScheduledExecutorService();
 		private ScheduledExecutor delayExecutor = new ScheduledExecutorServiceAdapter(futureExecutor);
-		private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+		private ClassLoader userCodeLoader = getClass().getClassLoader();
 		private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
 		private Time rpcTimeout = DEFAULT_TIMEOUT;
 		private BlobWriter blobWriter = VoidBlobWriter.getInstance();


[flink] 02/03: Revert "[FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing"

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1cc6c790e2b8139c921317bfc9cebe98393ba0e7
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Feb 29 13:48:02 2020 +0800

    Revert "[FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing"
    
    This reverts commit bd757cb2a29cc2d9046d39aee9684d5b2e5ec036.
---
 .../flink/runtime/scheduler/DefaultScheduler.java  |   2 +-
 .../runtime/scheduler/DefaultSchedulerFactory.java |   2 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  41 +++-
 .../runtime/scheduler/SchedulerTestingUtils.java   | 253 ++-------------------
 4 files changed, 53 insertions(+), 245 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 2049ec8..a425c9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -93,7 +93,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 	private final Set<ExecutionVertexID> verticesWaitingForRestart;
 
-	DefaultScheduler(
+	public DefaultScheduler(
 		final Logger log,
 		final JobGraph jobGraph,
 		final BackPressureStatsTracker backPressureStatsTracker,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 1671b5e..0915126 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -107,7 +107,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
 	}
 
-	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
+	private SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
 		switch (scheduleMode) {
 			case EAGER:
 				return new EagerSchedulingStrategy.Factory();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index b4f39e9..3df9801 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -22,8 +22,11 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -33,8 +36,10 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -45,6 +50,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
@@ -52,6 +59,7 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -772,18 +780,27 @@ public class DefaultSchedulerTest extends TestLogger {
 			final JobGraph jobGraph,
 			final SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
 
-		return SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
-			.setLogger(log)
-			.setIoExecutor(executor)
-			.setJobMasterConfiguration(configuration)
-			.setFutureExecutor(scheduledExecutorService)
-			.setDelayExecutor(taskRestartExecutor)
-			.setSchedulingStrategyFactory(schedulingStrategyFactory)
-			.setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
-			.setExecutionVertexOperations(testExecutionVertexOperations)
-			.setExecutionVertexVersioner(executionVertexVersioner)
-			.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
-			.build();
+		return new DefaultScheduler(
+			log,
+			jobGraph,
+			VoidBackPressureStatsTracker.INSTANCE,
+			executor,
+			configuration,
+			scheduledExecutorService,
+			taskRestartExecutor,
+			ClassLoader.getSystemClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(300),
+			VoidBlobWriter.getInstance(),
+			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+			NettyShuffleMaster.INSTANCE,
+			NoOpJobMasterPartitionTracker.INSTANCE,
+			schedulingStrategyFactory,
+			new RestartPipelinedRegionFailoverStrategy.Factory(),
+			testRestartBackoffTimeStrategy,
+			testExecutionVertexOperations,
+			executionVertexVersioner,
+			executionSlotAllocatorFactory);
 	}
 
 	private void startScheduling(final SchedulerNG scheduler) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index b9d81e3..d7289b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -21,52 +21,37 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
-import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
@@ -76,8 +61,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -93,47 +76,8 @@ public class SchedulerTestingUtils {
 
 	private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
 
-	private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
-
 	private SchedulerTestingUtils() {}
 
-	public static DefaultSchedulerBuilder newSchedulerBuilder(final JobGraph jobGraph) {
-		return new DefaultSchedulerBuilder(jobGraph);
-	}
-
-	public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(
-			final JobGraph jobGraph,
-			final SlotProvider slotProvider) {
-
-		return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, DEFAULT_TIMEOUT);
-	}
-
-	public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(
-			final JobGraph jobGraph,
-			final SlotProvider slotProvider,
-			final Time slotRequestTimeout) {
-
-		return new DefaultSchedulerBuilder(jobGraph)
-			.setExecutionSlotAllocatorFactory(
-				createDefaultExecutionSlotAllocatorFactory(jobGraph.getScheduleMode(), slotProvider, slotRequestTimeout));
-	}
-
-	public static DefaultScheduler createScheduler(
-			final JobGraph jobGraph,
-			final SlotProvider slotProvider) throws Exception {
-
-		return createScheduler(jobGraph, slotProvider, DEFAULT_TIMEOUT);
-	}
-
-	public static DefaultScheduler createScheduler(
-			final JobGraph jobGraph,
-			final SlotProvider slotProvider,
-			final Time slotRequestTimeout) throws Exception {
-
-		return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, slotRequestTimeout)
-			.build();
-	}
-
 	public static DefaultScheduler createScheduler(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
@@ -158,26 +102,27 @@ public class SchedulerTestingUtils {
 			ManuallyTriggeredScheduledExecutorService asyncExecutor,
 			TaskManagerGateway taskManagerGateway) throws Exception {
 
-		return newSchedulerBuilder(jobGraph)
-			.setFutureExecutor(asyncExecutor)
-			.setDelayExecutor(asyncExecutor)
-			.setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory())
-			.setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
-			.setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway))
-			.build();
-	}
-
-	public static DefaultExecutionSlotAllocatorFactory createDefaultExecutionSlotAllocatorFactory(
-			final ScheduleMode scheduleMode,
-			final SlotProvider slotProvider,
-			final Time slotRequestTimeout) {
-
-		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
-			scheduleMode,
-			slotProvider,
-			slotRequestTimeout);
-
-		return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+		return new DefaultScheduler(
+			LOG,
+			jobGraph,
+			VoidBackPressureStatsTracker.INSTANCE,
+			Executors.directExecutor(),
+			new Configuration(),
+			asyncExecutor,
+			asyncExecutor,
+			ClassLoader.getSystemClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(300),
+			VoidBlobWriter.getInstance(),
+			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+			NettyShuffleMaster.INSTANCE,
+			NoOpJobMasterPartitionTracker.INSTANCE,
+			new EagerSchedulingStrategy.Factory(),
+			new RestartPipelinedRegionFailoverStrategy.Factory(),
+			new TestRestartBackoffTimeStrategy(true, 0),
+			new DefaultExecutionVertexOperations(),
+			new ExecutionVertexVersioner(),
+			new TestExecutionSlotAllocatorFactory(taskManagerGateway));
 	}
 
 	public static void enableCheckpointing(final JobGraph jobGraph) {
@@ -267,158 +212,4 @@ public class SchedulerTestingUtils {
 			return operatorGateway.sendOperatorEventToTask(task, operator, evt);
 		}
 	}
-
-	/**
-	 * Builder for {@link DefaultScheduler}.
-	 */
-	public static class DefaultSchedulerBuilder {
-		private final JobGraph jobGraph;
-
-		private SchedulingStrategyFactory schedulingStrategyFactory;
-
-		private Logger log = LOG;
-		private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
-		private Executor ioExecutor = java.util.concurrent.Executors.newSingleThreadExecutor();
-		private Configuration jobMasterConfiguration = new Configuration();
-		private ScheduledExecutorService futureExecutor = new DirectScheduledExecutorService();
-		private ScheduledExecutor delayExecutor = new ScheduledExecutorServiceAdapter(futureExecutor);
-		private ClassLoader userCodeLoader = getClass().getClassLoader();
-		private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
-		private Time rpcTimeout = DEFAULT_TIMEOUT;
-		private BlobWriter blobWriter = VoidBlobWriter.getInstance();
-		private JobManagerJobMetricGroup jobManagerJobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
-		private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
-		private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
-		private FailoverStrategy.Factory failoverStrategyFactory = new RestartPipelinedRegionFailoverStrategy.Factory();
-		private RestartBackoffTimeStrategy restartBackoffTimeStrategy = NoRestartBackoffTimeStrategy.INSTANCE;
-		private ExecutionVertexOperations executionVertexOperations = new DefaultExecutionVertexOperations();
-		private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
-		private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
-
-		private DefaultSchedulerBuilder(final JobGraph jobGraph) {
-			this.jobGraph = jobGraph;
-
-			// scheduling strategy is by default set according to the scheduleMode. It can be re-assigned later.
-			this.schedulingStrategyFactory = DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
-		}
-
-		public DefaultSchedulerBuilder setLogger(final Logger log) {
-			this.log = log;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setBackPressureStatsTracker(final BackPressureStatsTracker backPressureStatsTracker) {
-			this.backPressureStatsTracker = backPressureStatsTracker;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
-			this.ioExecutor = ioExecutor;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setJobMasterConfiguration(final Configuration jobMasterConfiguration) {
-			this.jobMasterConfiguration = jobMasterConfiguration;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) {
-			this.futureExecutor = futureExecutor;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setDelayExecutor(final ScheduledExecutor delayExecutor) {
-			this.delayExecutor = delayExecutor;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
-			this.userCodeLoader = userCodeLoader;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) {
-			this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
-			this.rpcTimeout = rpcTimeout;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
-			this.blobWriter = blobWriter;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setJobManagerJobMetricGroup(final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
-			this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
-			this.shuffleMaster = shuffleMaster;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setPartitionTracker(final JobMasterPartitionTracker partitionTracker) {
-			this.partitionTracker = partitionTracker;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setSchedulingStrategyFactory(final SchedulingStrategyFactory schedulingStrategyFactory) {
-			this.schedulingStrategyFactory = schedulingStrategyFactory;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setFailoverStrategyFactory(final FailoverStrategy.Factory failoverStrategyFactory) {
-			this.failoverStrategyFactory = failoverStrategyFactory;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
-			this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setExecutionVertexOperations(final ExecutionVertexOperations executionVertexOperations) {
-			this.executionVertexOperations = executionVertexOperations;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setExecutionVertexVersioner(final ExecutionVertexVersioner executionVertexVersioner) {
-			this.executionVertexVersioner = executionVertexVersioner;
-			return this;
-		}
-
-		public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
-			this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
-			return this;
-		}
-
-		public DefaultScheduler build() throws Exception {
-			return new DefaultScheduler(
-				log,
-				jobGraph,
-				backPressureStatsTracker,
-				ioExecutor,
-				jobMasterConfiguration,
-				futureExecutor,
-				delayExecutor,
-				userCodeLoader,
-				checkpointRecoveryFactory,
-				rpcTimeout,
-				blobWriter,
-				jobManagerJobMetricGroup,
-				shuffleMaster,
-				partitionTracker,
-				schedulingStrategyFactory,
-				failoverStrategyFactory,
-				restartBackoffTimeStrategy,
-				executionVertexOperations,
-				executionVertexVersioner,
-				executionSlotAllocatorFactory);
-		}
-	}
 }


[flink] 03/03: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9e0465dff51d1a88d92c672cf0c7cd8a3179984a
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Feb 29 13:49:07 2020 +0800

    [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
---
 .../flink/runtime/scheduler/DefaultScheduler.java  |   2 +-
 .../runtime/scheduler/DefaultSchedulerFactory.java |   2 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  41 +---
 .../runtime/scheduler/SchedulerTestingUtils.java   | 253 +++++++++++++++++++--
 4 files changed, 245 insertions(+), 53 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index a425c9e..2049ec8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -93,7 +93,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 	private final Set<ExecutionVertexID> verticesWaitingForRestart;
 
-	public DefaultScheduler(
+	DefaultScheduler(
 		final Logger log,
 		final JobGraph jobGraph,
 		final BackPressureStatsTracker backPressureStatsTracker,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 0915126..1671b5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -107,7 +107,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
 	}
 
-	private SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
+	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
 		switch (scheduleMode) {
 			case EAGER:
 				return new EagerSchedulingStrategy.Factory();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 3df9801..b4f39e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -22,11 +22,8 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -36,10 +33,8 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -50,8 +45,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
@@ -59,7 +52,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -780,27 +772,18 @@ public class DefaultSchedulerTest extends TestLogger {
 			final JobGraph jobGraph,
 			final SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
 
-		return new DefaultScheduler(
-			log,
-			jobGraph,
-			VoidBackPressureStatsTracker.INSTANCE,
-			executor,
-			configuration,
-			scheduledExecutorService,
-			taskRestartExecutor,
-			ClassLoader.getSystemClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(300),
-			VoidBlobWriter.getInstance(),
-			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
-			NettyShuffleMaster.INSTANCE,
-			NoOpJobMasterPartitionTracker.INSTANCE,
-			schedulingStrategyFactory,
-			new RestartPipelinedRegionFailoverStrategy.Factory(),
-			testRestartBackoffTimeStrategy,
-			testExecutionVertexOperations,
-			executionVertexVersioner,
-			executionSlotAllocatorFactory);
+		return SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+			.setLogger(log)
+			.setIoExecutor(executor)
+			.setJobMasterConfiguration(configuration)
+			.setFutureExecutor(scheduledExecutorService)
+			.setDelayExecutor(taskRestartExecutor)
+			.setSchedulingStrategyFactory(schedulingStrategyFactory)
+			.setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
+			.setExecutionVertexOperations(testExecutionVertexOperations)
+			.setExecutionVertexVersioner(executionVertexVersioner)
+			.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
+			.build();
 	}
 
 	private void startScheduling(final SchedulerNG scheduler) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index d7289b3..2a2c73e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -21,37 +21,52 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
@@ -61,6 +76,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -76,8 +93,47 @@ public class SchedulerTestingUtils {
 
 	private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
 
+	private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+
 	private SchedulerTestingUtils() {}
 
+	public static DefaultSchedulerBuilder newSchedulerBuilder(final JobGraph jobGraph) {
+		return new DefaultSchedulerBuilder(jobGraph);
+	}
+
+	public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(
+			final JobGraph jobGraph,
+			final SlotProvider slotProvider) {
+
+		return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, DEFAULT_TIMEOUT);
+	}
+
+	public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(
+			final JobGraph jobGraph,
+			final SlotProvider slotProvider,
+			final Time slotRequestTimeout) {
+
+		return new DefaultSchedulerBuilder(jobGraph)
+			.setExecutionSlotAllocatorFactory(
+				createDefaultExecutionSlotAllocatorFactory(jobGraph.getScheduleMode(), slotProvider, slotRequestTimeout));
+	}
+
+	public static DefaultScheduler createScheduler(
+			final JobGraph jobGraph,
+			final SlotProvider slotProvider) throws Exception {
+
+		return createScheduler(jobGraph, slotProvider, DEFAULT_TIMEOUT);
+	}
+
+	public static DefaultScheduler createScheduler(
+			final JobGraph jobGraph,
+			final SlotProvider slotProvider,
+			final Time slotRequestTimeout) throws Exception {
+
+		return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, slotRequestTimeout)
+			.build();
+	}
+
 	public static DefaultScheduler createScheduler(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
@@ -102,27 +158,26 @@ public class SchedulerTestingUtils {
 			ManuallyTriggeredScheduledExecutorService asyncExecutor,
 			TaskManagerGateway taskManagerGateway) throws Exception {
 
-		return new DefaultScheduler(
-			LOG,
-			jobGraph,
-			VoidBackPressureStatsTracker.INSTANCE,
-			Executors.directExecutor(),
-			new Configuration(),
-			asyncExecutor,
-			asyncExecutor,
-			ClassLoader.getSystemClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(300),
-			VoidBlobWriter.getInstance(),
-			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
-			NettyShuffleMaster.INSTANCE,
-			NoOpJobMasterPartitionTracker.INSTANCE,
-			new EagerSchedulingStrategy.Factory(),
-			new RestartPipelinedRegionFailoverStrategy.Factory(),
-			new TestRestartBackoffTimeStrategy(true, 0),
-			new DefaultExecutionVertexOperations(),
-			new ExecutionVertexVersioner(),
-			new TestExecutionSlotAllocatorFactory(taskManagerGateway));
+		return newSchedulerBuilder(jobGraph)
+			.setFutureExecutor(asyncExecutor)
+			.setDelayExecutor(asyncExecutor)
+			.setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory())
+			.setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
+			.setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway))
+			.build();
+	}
+
+	public static DefaultExecutionSlotAllocatorFactory createDefaultExecutionSlotAllocatorFactory(
+			final ScheduleMode scheduleMode,
+			final SlotProvider slotProvider,
+			final Time slotRequestTimeout) {
+
+		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
+			scheduleMode,
+			slotProvider,
+			slotRequestTimeout);
+
+		return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
 	}
 
 	public static void enableCheckpointing(final JobGraph jobGraph) {
@@ -212,4 +267,158 @@ public class SchedulerTestingUtils {
 			return operatorGateway.sendOperatorEventToTask(task, operator, evt);
 		}
 	}
+
+	/**
+	 * Builder for {@link DefaultScheduler}.
+	 */
+	public static class DefaultSchedulerBuilder {
+		private final JobGraph jobGraph;
+
+		private SchedulingStrategyFactory schedulingStrategyFactory;
+
+		private Logger log = LOG;
+		private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
+		private Executor ioExecutor = TestingUtils.defaultExecutor();
+		private Configuration jobMasterConfiguration = new Configuration();
+		private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+		private ScheduledExecutor delayExecutor = new ScheduledExecutorServiceAdapter(futureExecutor);
+		private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+		private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+		private Time rpcTimeout = DEFAULT_TIMEOUT;
+		private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+		private JobManagerJobMetricGroup jobManagerJobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+		private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+		private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
+		private FailoverStrategy.Factory failoverStrategyFactory = new RestartPipelinedRegionFailoverStrategy.Factory();
+		private RestartBackoffTimeStrategy restartBackoffTimeStrategy = NoRestartBackoffTimeStrategy.INSTANCE;
+		private ExecutionVertexOperations executionVertexOperations = new DefaultExecutionVertexOperations();
+		private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
+		private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
+
+		private DefaultSchedulerBuilder(final JobGraph jobGraph) {
+			this.jobGraph = jobGraph;
+
+			// scheduling strategy is by default set according to the scheduleMode. It can be re-assigned later.
+			this.schedulingStrategyFactory = DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
+		}
+
+		public DefaultSchedulerBuilder setLogger(final Logger log) {
+			this.log = log;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setBackPressureStatsTracker(final BackPressureStatsTracker backPressureStatsTracker) {
+			this.backPressureStatsTracker = backPressureStatsTracker;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
+			this.ioExecutor = ioExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setJobMasterConfiguration(final Configuration jobMasterConfiguration) {
+			this.jobMasterConfiguration = jobMasterConfiguration;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) {
+			this.futureExecutor = futureExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setDelayExecutor(final ScheduledExecutor delayExecutor) {
+			this.delayExecutor = delayExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
+			this.userCodeLoader = userCodeLoader;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+			this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+			this.rpcTimeout = rpcTimeout;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
+			this.blobWriter = blobWriter;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setJobManagerJobMetricGroup(final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+			this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+			this.shuffleMaster = shuffleMaster;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setPartitionTracker(final JobMasterPartitionTracker partitionTracker) {
+			this.partitionTracker = partitionTracker;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setSchedulingStrategyFactory(final SchedulingStrategyFactory schedulingStrategyFactory) {
+			this.schedulingStrategyFactory = schedulingStrategyFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setFailoverStrategyFactory(final FailoverStrategy.Factory failoverStrategyFactory) {
+			this.failoverStrategyFactory = failoverStrategyFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+			this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionVertexOperations(final ExecutionVertexOperations executionVertexOperations) {
+			this.executionVertexOperations = executionVertexOperations;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionVertexVersioner(final ExecutionVertexVersioner executionVertexVersioner) {
+			this.executionVertexVersioner = executionVertexVersioner;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
+			this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
+			return this;
+		}
+
+		public DefaultScheduler build() throws Exception {
+			return new DefaultScheduler(
+				log,
+				jobGraph,
+				backPressureStatsTracker,
+				ioExecutor,
+				jobMasterConfiguration,
+				futureExecutor,
+				delayExecutor,
+				userCodeLoader,
+				checkpointRecoveryFactory,
+				rpcTimeout,
+				blobWriter,
+				jobManagerJobMetricGroup,
+				shuffleMaster,
+				partitionTracker,
+				schedulingStrategyFactory,
+				failoverStrategyFactory,
+				restartBackoffTimeStrategy,
+				executionVertexOperations,
+				executionVertexVersioner,
+				executionSlotAllocatorFactory);
+		}
+	}
 }