You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/05 08:23:57 UTC

[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #2620: [engine][checkpoint] The full flow of checkpoint

ashulin commented on code in PR #2620:
URL: https://github.com/apache/incubator-seatunnel/pull/2620#discussion_r962625181


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -200,33 +326,62 @@ private Map<Long, TaskStatistics> getTaskStatistics() {
     }
 
     public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier checkpointBarrier) {
+        // TODO: some tasks have completed and don't need to trigger
         return plan.getStartingSubtasks()
             .stream()
-            .map(taskLocation -> new CheckpointTriggerOperation(checkpointBarrier, taskLocation))
-            .map(checkpointManager::triggerCheckpoint)
+            .map(taskLocation -> new CheckpointBarrierTriggerOperation(checkpointBarrier, taskLocation))
+            .map(checkpointManager::sendOperationToMemberNode)
             .toArray(InvocationFuture[]::new);
     }
 
+    protected void cleanPendingCheckpoint() {
+        // TODO: clear related future & scheduler task
+        pendingCheckpoints.clear();
+    }
+
     protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
-        final long checkpointId = ackOperation.getCheckpointId();
+        final long checkpointId = ackOperation.getBarrier().getId();
         final PendingCheckpoint pendingCheckpoint = pendingCheckpoints.get(checkpointId);
-        if (pendingCheckpoint == null) {
-            LOG.debug("job: {}, pipeline: {}, the checkpoint({}) don't exist.", jobId, pipelineId, checkpointId);
+        TaskLocation location = ackOperation.getTaskLocation();
+        LOG.debug("task[{}]({}/{}) ack. {}", location.getTaskID(), location.getPipelineId(), location.getJobId(), ackOperation.getBarrier().toString());
+        if (checkpointId == Barrier.PREPARE_CLOSE_BARRIER_ID) {
+            synchronized (autoSavepointLock) {
+                if (pendingCheckpoints.get(checkpointId) == null) {
+                    CompletableFuture<PendingCheckpoint> future = createPendingCheckpoint(
+                        Instant.now().toEpochMilli(),
+                        CompletableFuture.completedFuture(Barrier.PREPARE_CLOSE_BARRIER_ID),
+                        CheckpointType.AUTO_SAVEPOINT_TYPE);
+                    startTriggerPendingCheckpoint(future);
+                    future.join();
+                }
+            }
+            pendingCheckpoints.values().parallelStream()
+                .forEach(cp -> cp.acknowledgeTask(ackOperation.getTaskLocation(), ackOperation.getStates(), SubtaskStatus.AUTO_PREPARE_CLOSE));
+            return;
+        } else if (pendingCheckpoint == null) {
+            LOG.info("job: {}, pipeline: {}, the checkpoint({}) don't exist.", jobId, pipelineId, checkpointId);
             return;
         }
-        pendingCheckpoint.acknowledgeTask(ackOperation.getTaskLocation(), ackOperation.getStates());
+
+        pendingCheckpoint.acknowledgeTask(location, ackOperation.getStates(),
+            CheckpointType.SAVEPOINT_TYPE == pendingCheckpoint.getCheckpointType() ?
+                SubtaskStatus.SAVEPOINT_PREPARE_CLOSE :
+                SubtaskStatus.RUNNING);
     }
 
     public void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) {
+        LOG.info("pending checkpoint({}/{}@{}) completed!", pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getPipelineId(), pendingCheckpoint.getJobId());
+        pendingCounter.decrementAndGet();
         final long checkpointId = pendingCheckpoint.getCheckpointId();
-        InvocationFuture<?>[] invocationFutures = notifyCheckpointCompleted(pendingCheckpoint.getCheckpointId());
+        InvocationFuture<?>[] invocationFutures = notifyCheckpointCompleted(checkpointId);
         CompletableFuture.allOf(invocationFutures).join();
         CompletedCheckpoint completedCheckpoint = pendingCheckpoint.toCompletedCheckpoint();
         pendingCheckpoints.remove(checkpointId);
         if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
             // latest checkpoint completed time > checkpoint interval
             tryTriggerPendingCheckpoint();
         }
+        latestCompletedCheckpoint = completedCheckpoint;
         completedCheckpoints.addLast(completedCheckpoint);
         try {
             byte[] states = serializer.serialize(completedCheckpoint);

Review Comment:
   The connector supports exactly-once/idempotent-writing, so there is no duplication. If the connector does not support this, any abnormal pipeline restart will cause data duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org