You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2020/02/07 18:13:38 UTC

[flink] 02/02: [FLINK-15918][runtime] Fix that 'uptime' metric is not reset after restart

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1268a7b9a3a3d07f76ea1fe78a0b1a6a7d0ef7eb
Author: Gary Yao <ga...@apache.org>
AuthorDate: Fri Feb 7 16:34:00 2020 +0100

    [FLINK-15918][runtime] Fix that 'uptime' metric is not reset after restart
    
    The 'uptime' metric is the time difference between 'now' and the timestamp when
    the job transitioned to state RUNNING. The new scheduler until now never
    transitioned out of status RUNNING when restarting tasks which had the effect
    that the 'uptime' metric was not reset after a restart. This introduces new
    state transitions to the job. We transition the job status to RESTARTING if at
    least one ExecutionVertex is waiting to be restarted, and we transition from
    RESTARTING immediately to RUNNING again after the restart.
    
    This closes #11032.
---
 .../runtime/executiongraph/ExecutionGraph.java     | 14 +------
 .../flink/runtime/scheduler/DefaultScheduler.java  | 21 ++++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  4 ++
 .../ManuallyTriggeredScheduledExecutor.java        | 13 ++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    | 47 ++++++++++++++++++++++
 5 files changed, 87 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5803f8d..4447b1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -898,7 +898,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		while (true) {
 			JobStatus current = state;
 
-			if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
+			if (current == JobStatus.RUNNING || current == JobStatus.CREATED || current == JobStatus.RESTARTING) {
 				if (transitionState(current, JobStatus.CANCELLING)) {
 
 					// make sure no concurrent local actions interfere with the cancellation
@@ -938,16 +938,6 @@ public class ExecutionGraph implements AccessExecutionGraph {
 					return;
 				}
 			}
-			// All vertices have been cancelled and it's safe to directly go
-			// into the canceled state.
-			else if (current == JobStatus.RESTARTING) {
-				if (transitionState(current, JobStatus.CANCELED)) {
-					onTerminalState(JobStatus.CANCELED);
-
-					LOG.info("Canceled during restart.");
-					return;
-				}
-			}
 			else {
 				// no need to treat other states
 				return;
@@ -1220,7 +1210,7 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	//  State Transitions
 	// ------------------------------------------------------------------------
 
-	private boolean transitionState(JobStatus current, JobStatus newState) {
+	public boolean transitionState(JobStatus current, JobStatus newState) {
 		return transitionState(current, newState, null);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 9779dac1..2fe8340 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -90,6 +91,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 	private final ExecutionVertexOperations executionVertexOperations;
 
+	private final Set<ExecutionVertexID> verticesWaitingForRestart;
+
 	public DefaultScheduler(
 		final Logger log,
 		final JobGraph jobGraph,
@@ -151,6 +154,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			restartBackoffTimeStrategy);
 		this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
 		this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(getInputsLocationsRetriever());
+
+		this.verticesWaitingForRestart = new HashSet<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -211,6 +216,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 		final Set<ExecutionVertexVersion> executionVertexVersions =
 			new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
 
+		addVerticesToRestartPending(verticesToRestart);
+
 		final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart);
 
 		delayExecutor.schedule(
@@ -220,10 +227,24 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			TimeUnit.MILLISECONDS);
 	}
 
+	private void addVerticesToRestartPending(final Set<ExecutionVertexID> verticesToRestart) {
+		verticesWaitingForRestart.addAll(verticesToRestart);
+		transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
+	}
+
+	private void removeVerticesFromRestartPending(final Set<ExecutionVertexID> verticesToRestart) {
+		verticesWaitingForRestart.removeAll(verticesToRestart);
+		if (verticesWaitingForRestart.isEmpty()) {
+			transitionExecutionGraphState(JobStatus.RESTARTING, JobStatus.RUNNING);
+		}
+	}
+
 	private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions) {
 		return () -> {
 			final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
 
+			removeVerticesFromRestartPending(verticesToRestart);
+
 			resetForNewExecutions(verticesToRestart);
 
 			try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index bd62c1e..609877e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -417,6 +417,10 @@ public abstract class SchedulerBase implements SchedulerNG {
 				.collect(Collectors.toSet()));
 	}
 
+	protected void transitionExecutionGraphState(final JobStatus current, final JobStatus newState) {
+		executionGraph.transitionState(current, newState);
+	}
+
 	// ------------------------------------------------------------------------
 	// SchedulerNG
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
index de2ec72..04d1f48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
@@ -26,6 +26,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -143,6 +144,18 @@ public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
 		triggerNonPeriodicScheduledTasks();
 	}
 
+	/**
+	 * Triggers a single non-periodically scheduled task.
+	 *
+	 * @throws NoSuchElementException If there is no such task.
+	 */
+	public void triggerNonPeriodicScheduledTask() {
+		final ScheduledTask<?> poll = nonPeriodicScheduledTasks.remove();
+		if (poll != null) {
+			poll.execute();
+		}
+	}
+
 	public void triggerNonPeriodicScheduledTasks() {
 		final Iterator<ScheduledTask<?>> iterator = nonPeriodicScheduledTasks.iterator();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 40126cf..b1ef7c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -600,6 +600,53 @@ public class DefaultSchedulerTest extends TestLogger {
 	}
 
 	@Test
+	public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() {
+		final JobGraph jobGraph = singleJobVertexJobGraph(2);
+		final JobID jobId = jobGraph.getJobID();
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+		final Iterator<ArchivedExecutionVertex> vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
+		final ExecutionAttemptID attemptId1 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
+		final ExecutionAttemptID attemptId2 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId1, ExecutionState.FAILED, new RuntimeException("expected")));
+		final JobStatus jobStatusAfterFirstFailure = scheduler.requestJobStatus();
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobId, attemptId2, ExecutionState.FAILED, new RuntimeException("expected")));
+
+		taskRestartExecutor.triggerNonPeriodicScheduledTask();
+		final JobStatus jobStatusWithPendingRestarts = scheduler.requestJobStatus();
+		taskRestartExecutor.triggerNonPeriodicScheduledTask();
+		final JobStatus jobStatusAfterRestarts = scheduler.requestJobStatus();
+
+		assertThat(jobStatusAfterFirstFailure, equalTo(JobStatus.RESTARTING));
+		assertThat(jobStatusWithPendingRestarts, equalTo(JobStatus.RESTARTING));
+		assertThat(jobStatusAfterRestarts, equalTo(JobStatus.RUNNING));
+	}
+
+	@Test
+	public void cancelWhileRestartingShouldWaitForRunningTasks() {
+		final JobGraph jobGraph = singleJobVertexJobGraph(2);
+		final JobID jobid = jobGraph.getJobID();
+		final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+		final SchedulingTopology<?, ?> topology = scheduler.getSchedulingTopology();
+
+		final Iterator<ArchivedExecutionVertex> vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
+		final ExecutionAttemptID attemptId1 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
+		final ExecutionAttemptID attemptId2 = vertexIterator.next().getCurrentExecutionAttempt().getAttemptId();
+		final ExecutionVertexID executionVertex2 = scheduler.getExecutionVertexIdOrThrow(attemptId2);
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId1, ExecutionState.FAILED, new RuntimeException("expected")));
+		scheduler.cancel();
+		final ExecutionState vertex2StateAfterCancel = topology.getVertexOrThrow(executionVertex2).getState();
+		final JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus();
+		scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId2, ExecutionState.CANCELED, new RuntimeException("expected")));
+
+		assertThat(vertex2StateAfterCancel, is(equalTo(ExecutionState.CANCELING)));
+		assertThat(statusAfterCancelWhileRestarting, is(equalTo(JobStatus.CANCELLING)));
+		assertThat(scheduler.requestJobStatus(), is(equalTo(JobStatus.CANCELED)));
+	}
+
+	@Test
 	public void failureInfoIsSetAfterTaskFailure() {
 		final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
 		final JobID jobId = jobGraph.getJobID();