You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/19 08:17:51 UTC

[incubator-seatunnel] branch dev updated: [hotfix][engine][checkpoint] fix checkpoint error in master down (#3140)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 63dd9d074 [hotfix][engine][checkpoint] fix checkpoint error in master down (#3140)
63dd9d074 is described below

commit 63dd9d074d626e602bfd780540b0a3ba2f6c25cc
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Wed Oct 19 16:17:45 2022 +0800

    [hotfix][engine][checkpoint] fix checkpoint error in master down (#3140)
    
    * [hotfix][checkpoint] checkpoint barrier aligend parallel exception
    
    * [hotfix][engine][checkpoint] fix checkpoint error in master down
    
    * [checkpoint] notify checkpoint timeout
---
 .../server/checkpoint/CheckpointCoordinator.java   |  5 +++--
 .../server/checkpoint/CheckpointManager.java       | 22 +++++++++++++++-------
 .../operation/TaskReportStatusOperation.java       |  2 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  1 +
 .../engine/server/task/SeaTunnelTask.java          |  6 +++---
 .../server/task/SinkAggregatedCommitterTask.java   |  3 +--
 .../server/checkpoint/CheckpointManagerTest.java   |  1 +
 7 files changed, 25 insertions(+), 15 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index a2dab3d48..17f19f99b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -225,7 +225,7 @@ public class CheckpointCoordinator {
         scheduler.schedule(this::tryTriggerPendingCheckpoint, delayMills, TimeUnit.MILLISECONDS);
     }
 
-    private void tryTriggerPendingCheckpoint() {
+    protected void tryTriggerPendingCheckpoint() {
         synchronized (lock) {
             final long currentTimestamp = Instant.now().toEpochMilli();
             if (currentTimestamp - latestTriggerTimestamp.get() >= coordinatorConfig.getCheckpointInterval() &&
@@ -282,7 +282,7 @@ public class CheckpointCoordinator {
                     if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) {
                         if (tolerableFailureCheckpoints-- <= 0) {
                             cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
-                            // TODO: notify job master to restore the pipeline.
+                            checkpointManager.handleCheckpointTimeout(pipelineId);
                         }
                     }
                 }, coordinatorConfig.getCheckpointTimeout(),
@@ -427,6 +427,7 @@ public class CheckpointCoordinator {
         } catch (IOException | CheckpointStorageException e) {
             sneakyThrow(e);
         }
+        LOG.info("pending checkpoint({}/{}@{}) notify finished!", completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId());
         InvocationFuture<?>[] invocationFutures = notifyCheckpointCompleted(checkpointId);
         CompletableFuture.allOf(invocationFutures).join();
         // TODO: notifyCheckpointCompleted fail
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index fd48f7e65..e5a0ada2e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -38,11 +38,11 @@ import org.apache.seatunnel.engine.server.execution.Task;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
 import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
-import com.hazelcast.cluster.Address;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -67,8 +67,6 @@ public class CheckpointManager {
 
     private final NodeEngine nodeEngine;
 
-    private final Map<Long, Address> subtaskWithAddresses;
-
     /**
      * key: the pipeline id of the job;
      * <br> value: the checkpoint coordinator of the pipeline;
@@ -77,13 +75,15 @@ public class CheckpointManager {
 
     private final CheckpointStorage checkpointStorage;
 
+    private final JobMaster jobMaster;
     public CheckpointManager(long jobId,
                              NodeEngine nodeEngine,
+                             JobMaster jobMaster,
                              Map<Integer, CheckpointPlan> checkpointPlanMap,
                              CheckpointConfig checkpointConfig) throws CheckpointStorageException {
         this.jobId = jobId;
         this.nodeEngine = nodeEngine;
-        this.subtaskWithAddresses = new HashMap<>();
+        this.jobMaster = jobMaster;
         this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())
             .create(new HashMap<>());
         IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
@@ -131,6 +131,14 @@ public class CheckpointManager {
         return getCheckpointCoordinator(pipelineId).startSavepoint();
     }
 
+    public void reportedPipelineRunning(int pipelineId) {
+        getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
+    }
+
+    protected void handleCheckpointTimeout(int pipelineId) {
+        jobMaster.handleCheckpointTimeout(pipelineId);
+    }
+
     private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) {
         return getCheckpointCoordinator(taskLocation.getPipelineId());
     }
@@ -147,9 +155,9 @@ public class CheckpointManager {
      * Called by the {@link Task}.
      * <br> used by Task to report the {@link SeaTunnelTaskState} of the state machine.
      */
-    public void reportedTask(TaskReportStatusOperation reportStatusOperation, Address address) {
+    public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
         // task address may change during restore.
-        subtaskWithAddresses.put(reportStatusOperation.getLocation().getTaskID(), address);
+        log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(), reportStatusOperation.getStatus());
         getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
     }
 
@@ -211,6 +219,6 @@ public class CheckpointManager {
     }
 
     protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
-        return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, subtaskWithAddresses.get(operation.getTaskLocation().getTaskID()));
+        return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
index d5ea0d078..171353e6b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
@@ -67,6 +67,6 @@ public class TaskReportStatusOperation extends Operation implements IdentifiedDa
         ((SeaTunnelServer) getService())
             .getCoordinatorService().getJobMaster(location.getJobId())
             .getCheckpointManager()
-            .reportedTask(this, getCallerAddress());
+            .reportedTask(this);
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 2df8df9c4..83a2af560 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -151,6 +151,7 @@ public class JobMaster extends Thread {
         this.checkpointManager = new CheckpointManager(
             jobImmutableInformation.getJobId(),
             nodeEngine,
+            this,
             planTuple.f1(),
             checkpointConfig);
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index b76bf208a..9a96d0f3e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -70,12 +70,12 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
@@ -94,9 +94,9 @@ public abstract class SeaTunnelTask extends AbstractTask {
 
     protected List<CompletableFuture<Void>> flowFutures;
 
-    protected final Map<Long, List<ActionSubtaskState>> checkpointStates = new HashMap<>();
+    protected final Map<Long, List<ActionSubtaskState>> checkpointStates = new ConcurrentHashMap<>();
 
-    private final Map<Long, Integer> cycleAcks = new HashMap<>();
+    private final Map<Long, Integer> cycleAcks = new ConcurrentHashMap<>();
 
     protected int indexID;
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 8497c5883..f1206a897 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -48,7 +48,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -95,7 +94,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT> ex
     public void init() throws Exception {
         super.init();
         currState = INIT;
-        this.checkpointBarrierCounter = new HashMap<>();
+        this.checkpointBarrierCounter = new ConcurrentHashMap<>();
         this.commitInfoCache = new ConcurrentHashMap<>();
         this.writerAddressMap = new ConcurrentHashMap<>();
         this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index 0ee8bbd80..c281a15f5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -64,6 +64,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
         CheckpointManager checkpointManager = new CheckpointManager(
             jobId,
             nodeEngine,
+            null,
             planMap,
             new CheckpointConfig());
         Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));