You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/02/29 05:50:44 UTC

[flink] 03/03: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9e0465dff51d1a88d92c672cf0c7cd8a3179984a
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Feb 29 13:49:07 2020 +0800

    [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
---
 .../flink/runtime/scheduler/DefaultScheduler.java  |   2 +-
 .../runtime/scheduler/DefaultSchedulerFactory.java |   2 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  41 +---
 .../runtime/scheduler/SchedulerTestingUtils.java   | 253 +++++++++++++++++++--
 4 files changed, 245 insertions(+), 53 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index a425c9e..2049ec8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -93,7 +93,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 	private final Set<ExecutionVertexID> verticesWaitingForRestart;
 
-	public DefaultScheduler(
+	DefaultScheduler(
 		final Logger log,
 		final JobGraph jobGraph,
 		final BackPressureStatsTracker backPressureStatsTracker,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 0915126..1671b5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -107,7 +107,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
 	}
 
-	private SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
+	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
 		switch (scheduleMode) {
 			case EAGER:
 				return new EagerSchedulingStrategy.Factory();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 3df9801..b4f39e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -22,11 +22,8 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
@@ -36,10 +33,8 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -50,8 +45,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
@@ -59,7 +52,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import org.apache.flink.runtime.scheduler.strategy.TestSchedulingStrategy;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
@@ -780,27 +772,18 @@ public class DefaultSchedulerTest extends TestLogger {
 			final JobGraph jobGraph,
 			final SchedulingStrategyFactory schedulingStrategyFactory) throws Exception {
 
-		return new DefaultScheduler(
-			log,
-			jobGraph,
-			VoidBackPressureStatsTracker.INSTANCE,
-			executor,
-			configuration,
-			scheduledExecutorService,
-			taskRestartExecutor,
-			ClassLoader.getSystemClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(300),
-			VoidBlobWriter.getInstance(),
-			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
-			NettyShuffleMaster.INSTANCE,
-			NoOpJobMasterPartitionTracker.INSTANCE,
-			schedulingStrategyFactory,
-			new RestartPipelinedRegionFailoverStrategy.Factory(),
-			testRestartBackoffTimeStrategy,
-			testExecutionVertexOperations,
-			executionVertexVersioner,
-			executionSlotAllocatorFactory);
+		return SchedulerTestingUtils.newSchedulerBuilder(jobGraph)
+			.setLogger(log)
+			.setIoExecutor(executor)
+			.setJobMasterConfiguration(configuration)
+			.setFutureExecutor(scheduledExecutorService)
+			.setDelayExecutor(taskRestartExecutor)
+			.setSchedulingStrategyFactory(schedulingStrategyFactory)
+			.setRestartBackoffTimeStrategy(testRestartBackoffTimeStrategy)
+			.setExecutionVertexOperations(testExecutionVertexOperations)
+			.setExecutionVertexVersioner(executionVertexVersioner)
+			.setExecutionSlotAllocatorFactory(executionSlotAllocatorFactory)
+			.build();
 	}
 
 	private void startScheduling(final SchedulerNG scheduler) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index d7289b3..2a2c73e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -21,37 +21,52 @@ package org.apache.flink.runtime.scheduler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
@@ -61,6 +76,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -76,8 +93,47 @@ public class SchedulerTestingUtils {
 
 	private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
 
+	private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+
 	private SchedulerTestingUtils() {}
 
+	public static DefaultSchedulerBuilder newSchedulerBuilder(final JobGraph jobGraph) {
+		return new DefaultSchedulerBuilder(jobGraph);
+	}
+
+	public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(
+			final JobGraph jobGraph,
+			final SlotProvider slotProvider) {
+
+		return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, DEFAULT_TIMEOUT);
+	}
+
+	public static DefaultSchedulerBuilder newSchedulerBuilderWithDefaultSlotAllocator(
+			final JobGraph jobGraph,
+			final SlotProvider slotProvider,
+			final Time slotRequestTimeout) {
+
+		return new DefaultSchedulerBuilder(jobGraph)
+			.setExecutionSlotAllocatorFactory(
+				createDefaultExecutionSlotAllocatorFactory(jobGraph.getScheduleMode(), slotProvider, slotRequestTimeout));
+	}
+
+	public static DefaultScheduler createScheduler(
+			final JobGraph jobGraph,
+			final SlotProvider slotProvider) throws Exception {
+
+		return createScheduler(jobGraph, slotProvider, DEFAULT_TIMEOUT);
+	}
+
+	public static DefaultScheduler createScheduler(
+			final JobGraph jobGraph,
+			final SlotProvider slotProvider,
+			final Time slotRequestTimeout) throws Exception {
+
+		return newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, slotProvider, slotRequestTimeout)
+			.build();
+	}
+
 	public static DefaultScheduler createScheduler(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
@@ -102,27 +158,26 @@ public class SchedulerTestingUtils {
 			ManuallyTriggeredScheduledExecutorService asyncExecutor,
 			TaskManagerGateway taskManagerGateway) throws Exception {
 
-		return new DefaultScheduler(
-			LOG,
-			jobGraph,
-			VoidBackPressureStatsTracker.INSTANCE,
-			Executors.directExecutor(),
-			new Configuration(),
-			asyncExecutor,
-			asyncExecutor,
-			ClassLoader.getSystemClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(300),
-			VoidBlobWriter.getInstance(),
-			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
-			NettyShuffleMaster.INSTANCE,
-			NoOpJobMasterPartitionTracker.INSTANCE,
-			new EagerSchedulingStrategy.Factory(),
-			new RestartPipelinedRegionFailoverStrategy.Factory(),
-			new TestRestartBackoffTimeStrategy(true, 0),
-			new DefaultExecutionVertexOperations(),
-			new ExecutionVertexVersioner(),
-			new TestExecutionSlotAllocatorFactory(taskManagerGateway));
+		return newSchedulerBuilder(jobGraph)
+			.setFutureExecutor(asyncExecutor)
+			.setDelayExecutor(asyncExecutor)
+			.setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory())
+			.setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
+			.setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway))
+			.build();
+	}
+
+	public static DefaultExecutionSlotAllocatorFactory createDefaultExecutionSlotAllocatorFactory(
+			final ScheduleMode scheduleMode,
+			final SlotProvider slotProvider,
+			final Time slotRequestTimeout) {
+
+		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
+			scheduleMode,
+			slotProvider,
+			slotRequestTimeout);
+
+		return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
 	}
 
 	public static void enableCheckpointing(final JobGraph jobGraph) {
@@ -212,4 +267,158 @@ public class SchedulerTestingUtils {
 			return operatorGateway.sendOperatorEventToTask(task, operator, evt);
 		}
 	}
+
+	/**
+	 * Builder for {@link DefaultScheduler}.
+	 */
+	public static class DefaultSchedulerBuilder {
+		private final JobGraph jobGraph;
+
+		private SchedulingStrategyFactory schedulingStrategyFactory;
+
+		private Logger log = LOG;
+		private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
+		private Executor ioExecutor = TestingUtils.defaultExecutor();
+		private Configuration jobMasterConfiguration = new Configuration();
+		private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+		private ScheduledExecutor delayExecutor = new ScheduledExecutorServiceAdapter(futureExecutor);
+		private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+		private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+		private Time rpcTimeout = DEFAULT_TIMEOUT;
+		private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+		private JobManagerJobMetricGroup jobManagerJobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+		private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+		private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
+		private FailoverStrategy.Factory failoverStrategyFactory = new RestartPipelinedRegionFailoverStrategy.Factory();
+		private RestartBackoffTimeStrategy restartBackoffTimeStrategy = NoRestartBackoffTimeStrategy.INSTANCE;
+		private ExecutionVertexOperations executionVertexOperations = new DefaultExecutionVertexOperations();
+		private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
+		private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
+
+		private DefaultSchedulerBuilder(final JobGraph jobGraph) {
+			this.jobGraph = jobGraph;
+
+			// scheduling strategy is by default set according to the scheduleMode. It can be re-assigned later.
+			this.schedulingStrategyFactory = DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
+		}
+
+		public DefaultSchedulerBuilder setLogger(final Logger log) {
+			this.log = log;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setBackPressureStatsTracker(final BackPressureStatsTracker backPressureStatsTracker) {
+			this.backPressureStatsTracker = backPressureStatsTracker;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
+			this.ioExecutor = ioExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setJobMasterConfiguration(final Configuration jobMasterConfiguration) {
+			this.jobMasterConfiguration = jobMasterConfiguration;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) {
+			this.futureExecutor = futureExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setDelayExecutor(final ScheduledExecutor delayExecutor) {
+			this.delayExecutor = delayExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
+			this.userCodeLoader = userCodeLoader;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+			this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+			this.rpcTimeout = rpcTimeout;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
+			this.blobWriter = blobWriter;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setJobManagerJobMetricGroup(final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+			this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+			this.shuffleMaster = shuffleMaster;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setPartitionTracker(final JobMasterPartitionTracker partitionTracker) {
+			this.partitionTracker = partitionTracker;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setSchedulingStrategyFactory(final SchedulingStrategyFactory schedulingStrategyFactory) {
+			this.schedulingStrategyFactory = schedulingStrategyFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setFailoverStrategyFactory(final FailoverStrategy.Factory failoverStrategyFactory) {
+			this.failoverStrategyFactory = failoverStrategyFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+			this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionVertexOperations(final ExecutionVertexOperations executionVertexOperations) {
+			this.executionVertexOperations = executionVertexOperations;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionVertexVersioner(final ExecutionVertexVersioner executionVertexVersioner) {
+			this.executionVertexVersioner = executionVertexVersioner;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
+			this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
+			return this;
+		}
+
+		public DefaultScheduler build() throws Exception {
+			return new DefaultScheduler(
+				log,
+				jobGraph,
+				backPressureStatsTracker,
+				ioExecutor,
+				jobMasterConfiguration,
+				futureExecutor,
+				delayExecutor,
+				userCodeLoader,
+				checkpointRecoveryFactory,
+				rpcTimeout,
+				blobWriter,
+				jobManagerJobMetricGroup,
+				shuffleMaster,
+				partitionTracker,
+				schedulingStrategyFactory,
+				failoverStrategyFactory,
+				restartBackoffTimeStrategy,
+				executionVertexOperations,
+				executionVertexVersioner,
+				executionSlotAllocatorFactory);
+		}
+	}
 }