You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/20 06:59:40 UTC

[flink] 02/03: [FLINK-8871][checkpoint] Support to cancel checkpoing via notification on checkpoint coordinator side

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fcacc42e17f00cb47c5c16fe75af035f784ae1fa
Author: Yun Tang <my...@live.com>
AuthorDate: Mon May 11 13:49:18 2020 +0800

    [FLINK-8871][checkpoint] Support to cancel checkpoing via notification on checkpoint coordinator side
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 16 +++++++++++++++
 .../checkpoint/CheckpointFailureManager.java       |  1 +
 .../checkpoint/CheckpointFailureReason.java        |  2 ++
 .../flink/runtime/executiongraph/Execution.java    | 19 ++++++++++++++++++
 .../jobmanager/slots/TaskManagerGateway.java       | 14 +++++++++++++
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  5 +++++
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 23 +++++++++++++++++++++-
 .../runtime/taskexecutor/TaskExecutorGateway.java  | 11 +++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java      |  7 +++++++
 .../utils/SimpleAckingTaskManagerGateway.java      |  7 +++++++
 .../taskexecutor/TestingTaskExecutorGateway.java   |  5 +++++
 11 files changed, 109 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9e2a4a7..7689f29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -113,6 +113,7 @@ public class CheckpointCoordinator {
 	private final ExecutionVertex[] tasksToWaitFor;
 
 	/** Tasks who need to be sent a message when a checkpoint is confirmed. */
+	// TODO currently we use commit vertices to receive "abort checkpoint" messages.
 	private final ExecutionVertex[] tasksToCommitTo;
 
 	/** The operator coordinators that need to be checkpointed. */
@@ -1015,6 +1016,7 @@ public class CheckpointCoordinator {
 					}
 				});
 
+				sendAbortedMessages(checkpointId, pendingCheckpoint.getCheckpointTimestamp());
 				throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
 					CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
 			}
@@ -1067,6 +1069,19 @@ public class CheckpointCoordinator {
 		}
 	}
 
+	private void sendAbortedMessages(long checkpointId, long timeStamp) {
+		// send notification of aborted checkpoints asynchronously.
+		executor.execute(() -> {
+			// send the "abort checkpoint" messages to necessary vertices.
+			for (ExecutionVertex ev : tasksToCommitTo) {
+				Execution ee = ev.getCurrentExecutionAttempt();
+				if (ee != null) {
+					ee.notifyCheckpointAborted(checkpointId, timeStamp);
+				}
+			}
+		});
+	}
+
 	/**
 	 * Fails all pending checkpoints which have not been acknowledged by the given execution
 	 * attempt id.
@@ -1576,6 +1591,7 @@ public class CheckpointCoordinator {
 						exception, pendingCheckpoint.getCheckpointId());
 				}
 			} finally {
+				sendAbortedMessages(pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointTimestamp());
 				pendingCheckpoints.remove(pendingCheckpoint.getCheckpointId());
 				rememberRecentCheckpointId(pendingCheckpoint.getCheckpointId());
 				timer.execute(this::executeQueuedRequest);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 9e162e7..0dc655b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -120,6 +120,7 @@ public class CheckpointFailureManager {
 			case CHECKPOINT_EXPIRED:
 			case TASK_FAILURE:
 			case TASK_CHECKPOINT_FAILURE:
+			case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
 			case TRIGGER_CHECKPOINT_FAILURE:
 			case FINALIZE_CHECKPOINT_FAILURE:
 				//ignore
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 023f9bf..cd787d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -68,6 +68,8 @@ public enum CheckpointFailureReason {
 
 	TASK_CHECKPOINT_FAILURE(false, "Task local checkpoint failure."),
 
+	UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE(false, "Unknown task for the checkpoint to notify."),
+
 	FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."),
 
 	TRIGGER_CHECKPOINT_FAILURE(false, "Trigger checkpoint failure.");
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 7d60a6c..0102ce7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -994,6 +994,25 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	/**
+	 * Notify the task of this execution about a aborted checkpoint.
+	 *
+	 * @param abortCheckpointId of the subsumed checkpoint
+	 * @param timestamp of the subsumed checkpoint
+	 */
+	public void notifyCheckpointAborted(long abortCheckpointId, long timestamp) {
+		final LogicalSlot slot = assignedResource;
+
+		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+			taskManagerGateway.notifyCheckpointAborted(attemptId, getVertex().getJobId(), abortCheckpointId, timestamp);
+		} else {
+			LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
+				"no longer running.");
+		}
+	}
+
+	/**
 	 * Trigger a new checkpoint on the task of this execution.
 	 *
 	 * @param checkpointId of th checkpoint to trigger
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index da21982..e7cbeae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -120,6 +120,20 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway {
 		long timestamp);
 
 	/**
+	 * Notify the given task about a aborted checkpoint.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param jobId identifying the job to which the task belongs
+	 * @param checkpointId of the subsumed checkpoint
+	 * @param timestamp of the subsumed checkpoint
+	 */
+	void notifyCheckpointAborted(
+		ExecutionAttemptID executionAttemptID,
+		JobID jobId,
+		long checkpointId,
+		long timestamp);
+
+	/**
 	 * Trigger for the given task a checkpoint.
 	 *
 	 * @param executionAttemptID identifying the task
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 9aec97e..2a20b71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -93,6 +93,11 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 	}
 
 	@Override
+	public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+		taskExecutorGateway.abortCheckpoint(executionAttemptID, checkpointId, timestamp);
+	}
+
+	@Override
 	public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
 		taskExecutorGateway.triggerCheckpoint(
 			executionAttemptID,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d74532f..f8438f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -860,7 +860,28 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';
 
 			log.debug(message);
-			return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
+			return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
+		}
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> abortCheckpoint(
+			ExecutionAttemptID executionAttemptID,
+			long checkpointId,
+			long checkpointTimestamp) {
+		log.debug("Abort checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
+
+		final Task task = taskSlotTable.getTask(executionAttemptID);
+
+		if (task != null) {
+			task.notifyCheckpointAborted(checkpointId);
+
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		} else {
+			final String message = "TaskManager received an aborted checkpoint for unknown task " + executionAttemptID + '.';
+
+			log.debug(message);
+			return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ad37db0..6867451 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -156,6 +156,17 @@ public interface TaskExecutorGateway extends RpcGateway, TaskExecutorOperatorEve
 	CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
 
 	/**
+	 * Abort a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
+	 * and the checkpoint timestamp.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param checkpointId unique id for the checkpoint
+	 * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
+	 * @return Future acknowledge if the checkpoint has been successfully confirmed
+	 */
+	CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
+
+	/**
 	 * Cancel the given task.
 	 *
 	 * @param executionAttemptID identifying the task
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ac48106..e799836 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -425,6 +425,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+			verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+
 			assertTrue(checkpoint1.isDiscarded());
 
 			// validate that we have only one pending checkpoint left
@@ -453,6 +456,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
 			assertTrue(checkpoint1.isDiscarded());
 
+			// will not notify abort message again
+			verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+			verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+
 			coord.shutdown(JobStatus.FINISHED);
 		}
 		catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 7e6ee2d..59aee81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -129,6 +129,13 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 			long timestamp) {}
 
 	@Override
+	public void notifyCheckpointAborted(
+			ExecutionAttemptID executionAttemptID,
+			JobID jobId,
+			long checkpointId,
+			long timestamp) {}
+
+	@Override
 	public void triggerCheckpoint(
 			ExecutionAttemptID executionAttemptID,
 			JobID jobId,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index f7e82d2..2a039c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -169,6 +169,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 	}
 
 	@Override
+	public CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	@Override
 	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
 		return cancelTaskFunction.apply(executionAttemptID);
 	}