You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/29 05:22:01 UTC

[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #2556: [engine][checkpoint] checkpoint flow

EricJoy2048 commented on code in PR #2556:
URL: https://github.com/apache/incubator-seatunnel/pull/2556#discussion_r956874888


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java:
##########
@@ -28,24 +40,60 @@
  */
 public class CheckpointManager {
 
+    private final Long jobId;
+
+    private final NodeEngine nodeEngine;
+
+    private final Map<Long, Integer> subtaskWithPipelines;
+
+    private final Map<Long, Address> subtaskWithAddresses;
+
     /**
      * key: the pipeline id of the job;
      * <br> value: the checkpoint plan of the pipeline;
      */
-    private final Map<Long, CheckpointPlan> checkpointPlanMap;
+    private final Map<Integer, CheckpointPlan> checkpointPlanMap;
 
     /**
      * key: the pipeline id of the job;
      * <br> value: the checkpoint coordinator of the pipeline;
      */
-    private final Map<Long, CheckpointCoordinator> coordinatorMap;
+    private final Map<Integer, CheckpointCoordinator> coordinatorMap;
+
+    private final CheckpointCoordinatorConfiguration config;
 
-    public CheckpointManager(Map<Long, CheckpointPlan> checkpointPlanMap) {
+    public CheckpointManager(long jobId, NodeEngine nodeEngine, Map<Integer, CheckpointPlan> checkpointPlanMap, CheckpointCoordinatorConfiguration config) {
+        this.jobId = jobId;
+        this.nodeEngine = nodeEngine;
         this.checkpointPlanMap = checkpointPlanMap;
+        this.config = config;
         this.coordinatorMap = new HashMap<>(checkpointPlanMap.size());
+        this.subtaskWithPipelines = checkpointPlanMap.values().stream().flatMap(plan -> plan.getPipelineTaskIds().keySet().stream().map(taskId -> Tuple2.tuple2(taskId, plan.getPipelineId()))).collect(Collectors.toMap(Tuple2::f0, Tuple2::f1));
+        this.subtaskWithAddresses = new HashMap<>(subtaskWithPipelines.size());
+    }
+
+    private int getPipelineId(long subtaskId) {
+        return subtaskWithPipelines.get(subtaskId);
+    }
+
+    private CheckpointCoordinator getCheckpointCoordinator(TaskInfo taskInfo) {
+        return coordinatorMap.get(getPipelineId(taskInfo.getSubtaskId()));
+    }
+
+    public void acknowledgeTask(TaskAcknowledgeOperation ackOperation, Address address) {
+        getCheckpointCoordinator(ackOperation.getTaskInfo()).acknowledgeTask(ackOperation);
+        subtaskWithAddresses.putIfAbsent(ackOperation.getTaskInfo().getSubtaskId(), address);
+    }
+
+    public void taskCompleted(TaskInfo taskInfo) {
+        getCheckpointCoordinator(taskInfo).taskCompleted(taskInfo);
+    }
+
+    public InvocationFuture<?> triggerCheckpoint(CheckpointTriggerOperation operation) {
+        return NodeEngineUtil.sendOperationToMasterNode(nodeEngine, operation);
     }
 
-    public CheckpointCoordinator getCheckpointCoordinator(long pipelineId) {
-        return coordinatorMap.get(pipelineId);
+    public InvocationFuture<?> notifyCheckpointFinished(CheckpointFinishedOperation operation) {
+        return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, subtaskWithAddresses.get(operation.getTaskInfo().getSubtaskId()));

Review Comment:
   Same as above.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java:
##########
@@ -50,21 +51,23 @@ public int getClassId() {
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        out.writeLong(jobId);
-        out.writeLong(pipelineId);
         out.writeLong(checkpointId);
+        out.writeObject(taskInfo);
+        out.writeBoolean(successful);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        jobId = in.readLong();
-        pipelineId = in.readLong();
         checkpointId = in.readLong();
+        taskInfo = in.readObject(TaskInfo.class);
+        successful = in.readBoolean();
     }
 
     @Override
     protected PassiveCompletableFuture<?> doRun() throws Exception {
-        // TODO: Notifies all tasks of the pipeline about the status of the checkpoint
+        SeaTunnelServer server = getService();
+        Task task = server.getTaskExecutionService().getExecutionContext(taskInfo.getTaskGroupId())

Review Comment:
   Please note that this may cause a `NullPointException` because `server.getTaskExecutionService().getExecutionContext(taskInfo.getTaskGroupId())` may return `null`. So I suggest you add retry at the place where this operation is called.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointTriggerOperation.java:
##########
@@ -44,16 +53,21 @@ public int getClassId() {
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         out.writeObject(checkpointBarrier);
+        out.writeObject(taskInfo);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         checkpointBarrier = in.readObject(CheckpointBarrier.class);
+        taskInfo = in.readObject(TaskInfo.class);
     }
 
     @Override
-    protected PassiveCompletableFuture<?> doRun() throws Exception {
-        // TODO: All source Vertexes executed
-        return null;
+    public void run() {
+        SeaTunnelServer server = getService();
+        SourceSplitEnumeratorTask<?> task = server.getTaskExecutionService()

Review Comment:
   Same as above.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskCompletedOperation.java:
##########
@@ -54,12 +59,10 @@ protected void readInternal(ObjectDataInput in) throws IOException {
     }
 
     @Override
-    protected PassiveCompletableFuture<?> doRun() throws Exception {
-        CheckpointCoordinator checkpointCoordinator = ((SeaTunnelServer) getService())
+    public void run() {

Review Comment:
   Because `PhysicalPlan` and `CheckPointManager` all run on Coordinator Node, So you can get the taskFuture by `JobMaster.PhysicalPlan.SubPlan.PhysicalVertex.taskFuture` and add you callback function .



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -42,11 +54,60 @@ public class CheckpointCoordinator {
 
     private final CheckpointCoordinatorConfiguration config;
 
-    public CheckpointCoordinator(CheckpointPlan plan,
+    public CheckpointCoordinator(CheckpointManager manager,
+                                 long jobId,
+                                 CheckpointPlan plan,
                                  CheckpointCoordinatorConfiguration config) {
+        this.checkpointManager = manager;
+        this.jobId = jobId;
+        this.pipelineId = plan.getPipelineId();
         this.plan = plan;
         this.config = config;
         this.pendingCheckpoints = new LinkedHashMap<>();
         this.completedCheckpoints = new LinkedHashMap<>();
     }
+
+    protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
+        final long checkpointId = ackOperation.getCheckpointId();
+        final PendingCheckpoint pendingCheckpoint = pendingCheckpoints.get(checkpointId);
+        if (pendingCheckpoint == null) {
+            LOG.debug("job: {}, pipeline: {}, the checkpoint({}) don't exist.", jobId, pipelineId, checkpointId);
+            return;
+        }
+        pendingCheckpoint.acknowledgeTask(ackOperation.getTaskInfo(), ackOperation.getStates());
+        if (pendingCheckpoint.isFullyAcknowledged()) {
+            CompletedCheckpoint completedCheckpoint = completePendingCheckpoint(pendingCheckpoint);
+            pendingCheckpoints.remove(checkpointId);
+            completedCheckpoints.put(checkpointId, completedCheckpoint);
+        }
+    }
+
+    public CompletedCheckpoint completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) {
+        notifyCheckpointCompleted(pendingCheckpoint.getCheckpointId());
+        return new CompletedCheckpoint(
+            jobId,
+            pipelineId,
+            pendingCheckpoint.getCheckpointId(),
+            pendingCheckpoint.getCheckpointTimestamp(),
+            System.currentTimeMillis(),
+            pendingCheckpoint.getTaskStates(),
+            pendingCheckpoint.getTaskStatistics());
+    }
+
+    public List<InvocationFuture> notifyCheckpointCompleted(long checkpointId) {
+        return plan.getPipelineTaskIds()
+            .entrySet()
+            .stream()
+            .map(entry ->
+                new CheckpointFinishedOperation(checkpointId,
+                    new TaskInfo(jobId, entry.getValue(), entry.getKey()),
+                    true)
+            ).map(checkpointManager::notifyCheckpointFinished)

Review Comment:
   We cannot be prepared to know whether the task is running, we only know it deploy complete. So I suggest add retry here. You can use `RetryUtils.retryWithException` to do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org