You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/14 17:21:30 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #8693: [FLINK-8871] Support to cancel checkpoing via notification

pnowojski commented on a change in pull request #8693:
URL: https://github.com/apache/flink/pull/8693#discussion_r425288321



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
##########
@@ -311,6 +311,11 @@ public void notifyCheckpointComplete(long checkpointId) {
 		//Nothing to do
 	}
 
+	@Override
+	public void notifyCheckpointAborted(long checkpointId) {
+		// nothing to do
+	}
+

Review comment:
       As @rkhachatryan mentioned, this can be removed thanks to the default implementation?

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -464,6 +464,11 @@ public void notifyCheckpointComplete(long completedCheckpointId) throws Exceptio
 		}
 	}
 
+	@Override
+	public void notifyCheckpointAborted(long checkpointId) {
+		// nothing to do
+	}
+

Review comment:
       ditto about dropping?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
##########
@@ -38,4 +38,12 @@
 	 * @throws Exception
 	 */
 	void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+	/**
+	 * This method is called as a notification once a distributed checkpoint has been aborted.
+	 *
+	 * @param checkpointId The ID of the checkpoint that has been aborted.
+	 * @throws Exception
+	 */
+	default void notifyCheckpointAborted(long checkpointId) throws Exception {}

Review comment:
       TLDR; I would keep it as it is.
   
   I would lean toward a single interface. For me, when I'm implementing some operators/functions, it's for example quite annoying that I have to guess/google search what are all of relevant interfaces that I need. For example existing `CheckpointedFunction` and `CheckpointListener` are very often used together, both are exposing notifications for checkpointing, but we are requiring users to discover them separately. If they were combined into a single thing, users would learn much more easily what's exposed by Flink to functions/operators.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -60,17 +63,22 @@
 class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
+	private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
 
 	private final CachingCheckpointStorageWorkerView checkpointStorage;
 	private final String taskName;
-	private final CloseableRegistry closeableRegistry;
+	private final AsyncCheckpointRunnableRegistry asyncCheckpointRunnableRegistry;
 	private final ExecutorService executorService;
 	private final Environment env;
 	private final AsyncExceptionHandler asyncExceptionHandler;
 	private final ChannelStateWriter channelStateWriter;
 	private final StreamTaskActionExecutor actionExecutor;
 	private final boolean unalignedCheckpointEnabled;
 	private final BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, IOException> prepareInputSnapshot;
+	/** The IDs of the checkpoint for which we are notified aborted. */
+	private final NavigableSet<Long> abortedCheckpointIds;

Review comment:
       Why do we need to track the aborted checkpoints?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org