You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/02/26 11:02:54 UTC

[GitHub] [flink] GJL commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

GJL commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#discussion_r384407096
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 ##########
 @@ -213,4 +267,161 @@ public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase sched
 			return operatorGateway.sendOperatorEventToTask(task, operator, evt);
 		}
 	}
+
+
+	/**
+	 * Builder for {@link DefaultScheduler}.
+	 */
+	public static class DefaultSchedulerBuilder {
+		private final JobGraph jobGraph;
+
+		private SchedulingStrategyFactory schedulingStrategyFactory;
+
+		private Logger log = LOG;
+		private BackPressureStatsTracker backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
+		private Executor ioExecutor = java.util.concurrent.Executors.newSingleThreadExecutor();
+		private Configuration jobMasterConfiguration = new Configuration();
+		private ScheduledExecutorService futureExecutor = new DirectScheduledExecutorService();
+		private ScheduledExecutor delayExecutor = new ScheduledExecutorServiceAdapter(futureExecutor);
+		private ClassLoader userCodeLoader = getClass().getClassLoader();
+		private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+		private Time rpcTimeout = DEFAULT_TIMEOUT;
+		private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+		private JobManagerJobMetricGroup jobManagerJobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+		private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+		private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
+		private FailoverStrategy.Factory failoverStrategyFactory = new RestartPipelinedRegionFailoverStrategy.Factory();
+		private RestartBackoffTimeStrategy restartBackoffTimeStrategy = NoRestartBackoffTimeStrategy.INSTANCE;
+		private ExecutionVertexOperations executionVertexOperations = new DefaultExecutionVertexOperations();
+		private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
+		private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
+
+		private DefaultSchedulerBuilder(final JobGraph jobGraph) {
+			this.jobGraph = jobGraph;
+
+			// scheduling strategy is by default set according to the scheduleMode. It can be re-assigned later.
+			this.schedulingStrategyFactory = DefaultSchedulerFactory.createSchedulingStrategyFactory(jobGraph.getScheduleMode());
+		}
+
+		public DefaultSchedulerBuilder setLogger(final Logger log) {
+			this.log = log;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setBackPressureStatsTracker(final BackPressureStatsTracker backPressureStatsTracker) {
+			this.backPressureStatsTracker = backPressureStatsTracker;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setIoExecutor(final Executor ioExecutor) {
+			this.ioExecutor = ioExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setJobMasterConfiguration(final Configuration jobMasterConfiguration) {
+			this.jobMasterConfiguration = jobMasterConfiguration;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) {
+			this.futureExecutor = futureExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setDelayExecutor(final ScheduledExecutor delayExecutor) {
+			this.delayExecutor = delayExecutor;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
+			this.userCodeLoader = userCodeLoader;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+			this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+			this.rpcTimeout = rpcTimeout;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) {
+			this.blobWriter = blobWriter;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setJobManagerJobMetricGroup(final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+			this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+			this.shuffleMaster = shuffleMaster;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setPartitionTracker(final JobMasterPartitionTracker partitionTracker) {
+			this.partitionTracker = partitionTracker;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setSchedulingStrategyFactory(final SchedulingStrategyFactory schedulingStrategyFactory) {
+			this.schedulingStrategyFactory = schedulingStrategyFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setFailoverStrategyFactory(final FailoverStrategy.Factory failoverStrategyFactory) {
+			this.failoverStrategyFactory = failoverStrategyFactory;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+			this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionVertexOperations(final ExecutionVertexOperations executionVertexOperations) {
+			this.executionVertexOperations = executionVertexOperations;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionVertexVersioner(final ExecutionVertexVersioner executionVertexVersioner) {
+			this.executionVertexVersioner = executionVertexVersioner;
+			return this;
+		}
+
+		public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
+			this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
+			return this;
+		}
+
+		public DefaultScheduler build() throws Exception {
+			return new DefaultScheduler(
+				log,
+				jobGraph,
+				backPressureStatsTracker,
+				ioExecutor,
+				jobMasterConfiguration,
+				new SimpleSlotProvider(jobGraph.getJobID(), 0), // this is not used any more in the new scheduler
 
 Review comment:
   I don't think the comment `// this is not used any more in the new scheduler` is needed. It would not be consistent to comment here because there is another invocation of this constructor where we do not comment. I would remove the parameters `slotProvider` and `slotRequestTimeout` from the `DefaultScheduler`, and if possible pass a _"throwing"_ implementation/invalid value to `SchedulerBase`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services