You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/05 12:52:54 UTC

[GitHub] [incubator-seatunnel] hk-lrzy commented on a diff in pull request #2620: [engine][checkpoint] The full flow of checkpoint

hk-lrzy commented on code in PR #2620:
URL: https://github.com/apache/incubator-seatunnel/pull/2620#discussion_r962860414


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -81,33 +91,45 @@ public class CheckpointCoordinator {
      */
     private final Map<Long, Integer> pipelineTasks;
 
+    private final Map<Long, SeaTunnelTaskState> pipelineTaskStatus;
+
     private final CheckpointPlan plan;
 
-    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
+    private final ConcurrentHashMap<Long, PendingCheckpoint> pendingCheckpoints;
 
     private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+    private CompletedCheckpoint latestCompletedCheckpoint;
+
     private final CheckpointCoordinatorConfiguration coordinatorConfig;
 
+    private int tolerableFailureCheckpoints;
     private final transient ScheduledExecutorService scheduler;
 
     private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
 
+    private final AtomicInteger pendingCounter = new AtomicInteger(0);
+
     private final Object lock = new Object();
+
+    private final Object autoSavepointLock = new Object();
     public CheckpointCoordinator(CheckpointManager manager,
                                  CheckpointStorage checkpointStorage,
                                  CheckpointStorageConfiguration storageConfig,
                                  long jobId,
                                  CheckpointPlan plan,
                                  CheckpointCoordinatorConfiguration coordinatorConfig) {
+
         this.checkpointManager = manager;
         this.checkpointStorage = checkpointStorage;
         this.storageConfig = storageConfig;
         this.jobId = jobId;
         this.pipelineId = plan.getPipelineId();
         this.plan = plan;
         this.coordinatorConfig = coordinatorConfig;
-        this.pendingCheckpoints = new LinkedHashMap<>();
+        this.latestCompletedCheckpoint = plan.getRestoredCheckpoint();

Review Comment:
   `latestCompletedCheckpoint` always null in beginning?



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -146,9 +227,47 @@ public static Map<Long, Integer> getPipelineTasks(Set<TaskLocation> pipelineSubt
             .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().size()));
     }
 
-    public void startTriggerPendingCheckpoint(long triggerTimestamp) {
-        CompletableFuture<PendingCheckpoint> completableFuture = new CompletableFuture<>();
-        CompletableFuture.supplyAsync(() -> {
+    public PassiveCompletableFuture<PendingCheckpoint> startSavepoint() {
+        CompletableFuture<PendingCheckpoint> savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), CheckpointType.SAVEPOINT_TYPE);
+        startTriggerPendingCheckpoint(savepoint);
+        return new PassiveCompletableFuture<>(savepoint);
+    }
+
+    private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint> pendingCompletableFuture) {
+        // Trigger the barrier and wait for all tasks to ACK
+        pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
+            if (CheckpointType.AUTO_SAVEPOINT_TYPE != pendingCheckpoint.getCheckpointType()) {
+                LOG.debug("trigger checkpoint barrier" + pendingCheckpoint);
+                CompletableFuture.supplyAsync(() ->
+                        new CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
+                            pendingCheckpoint.getCheckpointTimestamp(),
+                            pendingCheckpoint.getCheckpointType()))
+                    .thenApplyAsync(this::triggerCheckpoint)
+                    .thenApplyAsync(invocationFutures -> CompletableFuture.allOf(invocationFutures).join());
+            }
+            LOG.debug("wait checkpoint completed: " + pendingCheckpoint);
+            pendingCheckpoint.getCompletableFuture()
+                .thenAcceptAsync(this::completePendingCheckpoint);

Review Comment:
   if one of future have any exception , should there have a discard pipeline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org