You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/19 08:17:51 UTC
[incubator-seatunnel] branch dev updated: [hotfix][engine][checkpoint] fix checkpoint error in master down (#3140)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 63dd9d074 [hotfix][engine][checkpoint] fix checkpoint error in master down (#3140)
63dd9d074 is described below
commit 63dd9d074d626e602bfd780540b0a3ba2f6c25cc
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Wed Oct 19 16:17:45 2022 +0800
[hotfix][engine][checkpoint] fix checkpoint error in master down (#3140)
* [hotfix][checkpoint] checkpoint barrier aligend parallel exception
* [hotfix][engine][checkpoint] fix checkpoint error in master down
* [checkpoint] notify checkpoint timeout
---
.../server/checkpoint/CheckpointCoordinator.java | 5 +++--
.../server/checkpoint/CheckpointManager.java | 22 +++++++++++++++-------
.../operation/TaskReportStatusOperation.java | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 1 +
.../engine/server/task/SeaTunnelTask.java | 6 +++---
.../server/task/SinkAggregatedCommitterTask.java | 3 +--
.../server/checkpoint/CheckpointManagerTest.java | 1 +
7 files changed, 25 insertions(+), 15 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index a2dab3d48..17f19f99b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -225,7 +225,7 @@ public class CheckpointCoordinator {
scheduler.schedule(this::tryTriggerPendingCheckpoint, delayMills, TimeUnit.MILLISECONDS);
}
- private void tryTriggerPendingCheckpoint() {
+ protected void tryTriggerPendingCheckpoint() {
synchronized (lock) {
final long currentTimestamp = Instant.now().toEpochMilli();
if (currentTimestamp - latestTriggerTimestamp.get() >= coordinatorConfig.getCheckpointInterval() &&
@@ -282,7 +282,7 @@ public class CheckpointCoordinator {
if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
- // TODO: notify job master to restore the pipeline.
+ checkpointManager.handleCheckpointTimeout(pipelineId);
}
}
}, coordinatorConfig.getCheckpointTimeout(),
@@ -427,6 +427,7 @@ public class CheckpointCoordinator {
} catch (IOException | CheckpointStorageException e) {
sneakyThrow(e);
}
+ LOG.info("pending checkpoint({}/{}@{}) notify finished!", completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId());
InvocationFuture<?>[] invocationFutures = notifyCheckpointCompleted(checkpointId);
CompletableFuture.allOf(invocationFutures).join();
// TODO: notifyCheckpointCompleted fail
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index fd48f7e65..e5a0ada2e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -38,11 +38,11 @@ import org.apache.seatunnel.engine.server.execution.Task;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
-import com.hazelcast.cluster.Address;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
@@ -67,8 +67,6 @@ public class CheckpointManager {
private final NodeEngine nodeEngine;
- private final Map<Long, Address> subtaskWithAddresses;
-
/**
* key: the pipeline id of the job;
* <br> value: the checkpoint coordinator of the pipeline;
@@ -77,13 +75,15 @@ public class CheckpointManager {
private final CheckpointStorage checkpointStorage;
+ private final JobMaster jobMaster;
public CheckpointManager(long jobId,
NodeEngine nodeEngine,
+ JobMaster jobMaster,
Map<Integer, CheckpointPlan> checkpointPlanMap,
CheckpointConfig checkpointConfig) throws CheckpointStorageException {
this.jobId = jobId;
this.nodeEngine = nodeEngine;
- this.subtaskWithAddresses = new HashMap<>();
+ this.jobMaster = jobMaster;
this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())
.create(new HashMap<>());
IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
@@ -131,6 +131,14 @@ public class CheckpointManager {
return getCheckpointCoordinator(pipelineId).startSavepoint();
}
+ public void reportedPipelineRunning(int pipelineId) {
+ getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
+ }
+
+ protected void handleCheckpointTimeout(int pipelineId) {
+ jobMaster.handleCheckpointTimeout(pipelineId);
+ }
+
private CheckpointCoordinator getCheckpointCoordinator(TaskLocation taskLocation) {
return getCheckpointCoordinator(taskLocation.getPipelineId());
}
@@ -147,9 +155,9 @@ public class CheckpointManager {
* Called by the {@link Task}.
* <br> used by Task to report the {@link SeaTunnelTaskState} of the state machine.
*/
- public void reportedTask(TaskReportStatusOperation reportStatusOperation, Address address) {
+ public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
// task address may change during restore.
- subtaskWithAddresses.put(reportStatusOperation.getLocation().getTaskID(), address);
+ log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(), reportStatusOperation.getStatus());
getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
}
@@ -211,6 +219,6 @@ public class CheckpointManager {
}
protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
- return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, subtaskWithAddresses.get(operation.getTaskLocation().getTaskID()));
+ return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
index d5ea0d078..171353e6b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
@@ -67,6 +67,6 @@ public class TaskReportStatusOperation extends Operation implements IdentifiedDa
((SeaTunnelServer) getService())
.getCoordinatorService().getJobMaster(location.getJobId())
.getCheckpointManager()
- .reportedTask(this, getCallerAddress());
+ .reportedTask(this);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 2df8df9c4..83a2af560 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -151,6 +151,7 @@ public class JobMaster extends Thread {
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
nodeEngine,
+ this,
planTuple.f1(),
checkpointConfig);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index b76bf208a..9a96d0f3e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -70,12 +70,12 @@ import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -94,9 +94,9 @@ public abstract class SeaTunnelTask extends AbstractTask {
protected List<CompletableFuture<Void>> flowFutures;
- protected final Map<Long, List<ActionSubtaskState>> checkpointStates = new HashMap<>();
+ protected final Map<Long, List<ActionSubtaskState>> checkpointStates = new ConcurrentHashMap<>();
- private final Map<Long, Integer> cycleAcks = new HashMap<>();
+ private final Map<Long, Integer> cycleAcks = new ConcurrentHashMap<>();
protected int indexID;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 8497c5883..f1206a897 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -48,7 +48,6 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -95,7 +94,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT> ex
public void init() throws Exception {
super.init();
currState = INIT;
- this.checkpointBarrierCounter = new HashMap<>();
+ this.checkpointBarrierCounter = new ConcurrentHashMap<>();
this.commitInfoCache = new ConcurrentHashMap<>();
this.writerAddressMap = new ConcurrentHashMap<>();
this.checkpointCommitInfoMap = new ConcurrentHashMap<>();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index 0ee8bbd80..c281a15f5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -64,6 +64,7 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
CheckpointManager checkpointManager = new CheckpointManager(
jobId,
nodeEngine,
+ null,
planMap,
new CheckpointConfig());
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));