You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/09/27 07:06:09 UTC

[GitHub] [flink] GJL commented on a change in pull request #9663: [FLINK-12433][runtime] Implement DefaultScheduler stub

GJL commented on a change in pull request #9663: [FLINK-12433][runtime] Implement DefaultScheduler stub
URL: https://github.com/apache/flink/pull/9663#discussion_r328937701
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 ##########
 @@ -75,10 +127,283 @@ public DefaultScheduler(
 			slotRequestTimeout,
 			shuffleMaster,
 			partitionTracker);
+
+		this.log = log;
+
+		this.delayExecutor = checkNotNull(delayExecutor);
+		this.userCodeLoader = checkNotNull(userCodeLoader);
+		this.executionVertexOperations = checkNotNull(executionVertexOperations);
+		this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
+
+		this.executionFailureHandler = new ExecutionFailureHandler(failoverStrategyFactory.create(getFailoverTopology()), restartBackoffTimeStrategy);
+		this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology(), getJobGraph());
+		this.executionSlotAllocator = new DefaultExecutionSlotAllocator(slotProvider, getInputsLocationsRetriever(), slotRequestTimeout);
+	}
+
+	// ------------------------------------------------------------------------
+	// SchedulerNG
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void startSchedulingInternal() {
+		log.debug("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());
+		prepareExecutionGraphForNgScheduling();
+		schedulingStrategy.startScheduling();
+	}
+
+	@Override
+	protected void updateTaskExecutionStateInternal(final ExecutionVertexID executionVertexId, final TaskExecutionState taskExecutionState) {
+		schedulingStrategy.onExecutionStateChange(executionVertexId, taskExecutionState.getExecutionState());
+		maybeHandleTaskFailure(taskExecutionState, executionVertexId);
+	}
+
+	private void maybeHandleTaskFailure(final TaskExecutionState taskExecutionState, final ExecutionVertexID executionVertexId) {
+		if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
+			final Throwable error = taskExecutionState.getError(userCodeLoader);
+			handleTaskFailure(executionVertexId, error);
+		}
 	}
 
+	private void handleTaskFailure(final ExecutionVertexID executionVertexId, final Throwable error) {
+		final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
+		maybeRestartTasks(failureHandlingResult);
+	}
+
+	private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult) {
+		if (failureHandlingResult.canRestart()) {
+			restartTasksWithDelay(failureHandlingResult);
+		} else {
+			failJob(failureHandlingResult.getError());
+		}
+	}
+
+	private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) {
+		final Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart();
+
+		final Set<ExecutionVertexVersion> executionVertexVersions =
+			new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+
+		final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);
+
+		delayExecutor.schedule(
+			() -> FutureUtils.assertNoException(
+				cancelFuture.thenRunAsync(restartTasks(executionVertexVersions), getMainThreadExecutor())),
+			failureHandlingResult.getRestartDelayMS(),
+			TimeUnit.MILLISECONDS);
+	}
+
+	private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions) {
+		return () -> {
+			final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
+			schedulingStrategy.restartTasks(verticesToRestart);
+		};
+	}
+
+	private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) {
+		final List<CompletableFuture<?>> cancelFutures = verticesToRestart.stream()
+			.map(this::cancelExecutionVertex)
+			.collect(Collectors.toList());
+
+		return FutureUtils.combineAll(cancelFutures);
+	}
+
+	private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) {
+		return executionVertexOperations.cancel(getExecutionVertex(executionVertexId));
+	}
+
+	@Override
+	protected void scheduleOrUpdateConsumersInternal(final ExecutionVertexID producerVertexId, final ResultPartitionID partitionId) {
+		schedulingStrategy.onPartitionConsumable(producerVertexId, partitionId);
+	}
+
+	// ------------------------------------------------------------------------
+	// SchedulerOperations
+	// ------------------------------------------------------------------------
+
 	@Override
-	public void startScheduling() {
-		throw new UnsupportedOperationException();
+	public void allocateSlotsAndDeploy(final Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
+		final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex = groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
+		final Set<ExecutionVertexID> verticesToDeploy = deploymentOptionsByVertex.keySet();
+		final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex = executionVertexVersioner.recordVertexModifications(verticesToDeploy);
+
+		prepareToDeployVertices(verticesToDeploy);
+
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
+
+		final Collection<DeploymentHandle> deploymentHandles = createDeploymentHandles(
+			requiredVersionByVertex,
+			deploymentOptionsByVertex,
+			slotExecutionVertexAssignments);
+
+		if (isDeployIndividually()) {
+			deployIndividually(deploymentHandles);
+		} else {
+			waitForAllSlotsAndDeploy(deploymentHandles);
+		}
+	}
+
+	private static Map<ExecutionVertexID, ExecutionVertexDeploymentOption> groupDeploymentOptionsByVertexId(
+			final Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
+		return executionVertexDeploymentOptions.stream().collect(Collectors.toMap(
+				ExecutionVertexDeploymentOption::getExecutionVertexId,
+				Function.identity()));
+	}
+
+	private void prepareToDeployVertices(final Set<ExecutionVertexID> verticesToDeploy) {
+		cancelSlotAssignments(verticesToDeploy);
+		resetForNewExecution(verticesToDeploy);
 
 Review comment:
   This is known. See PR description:
   
   > Note that the implementation is not yet complete. For example, high availability support is still lacking.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services