You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/24 02:02:28 UTC
[incubator-seatunnel] branch 2.3.0-beta-release updated: [Bug]add 3node worker done test and fix some bug (#3115)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch 2.3.0-beta-release
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/2.3.0-beta-release by this push:
new 2f98fc540 [Bug]add 3node worker done test and fix some bug (#3115)
2f98fc540 is described below
commit 2f98fc540927e81d1f3a8cec66ce9b0a53db1d0e
Author: Eric <ga...@gmail.com>
AuthorDate: Mon Oct 24 09:55:30 2022 +0800
[Bug]add 3node worker done test and fix some bug (#3115)
* Add master node switch test and fix bug
Add master node switch test and fix bug
* fix ci error
* add todo
* fix ci error
* fix ci error
* add testBatchJobRestoreIn3NodeWorkerDown and testStreamJobRestoreIn3NodeWorkerDown
* add testBatchJobRestoreIn3NodeMasterDown and testStreamJobRestoreIn3NodeMasterDown
* Add withTryCatch to the whenComplete which may throw exception
* Add withTryCatch to the whenComplete which may throw exception
* fix ci error
* improve test case
* notify checkpoint state
* add worker node and master node done test
* revert some file
* fix ci error
* improve test
* add checkpoint.interval to test job config
* fix some error
* add try catch to test case
* fix ci error
* fix checkpoint error
* [engine][tests] add cluster test case
* [engine] Close checkpoint at the end of the pipeline
* [engine] Fix hazelcast master node switching exception
* [hotfix][connector][file] fix file sink error
* fix checkpoint error
# Conflicts:
# seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
* fix PipelineState to PipelineStatus bug
* [engine][checkpoint] add barrier flow operation (#3158)
* fix operator retry error
* fix operator retry error
* fix checkpoint error
* fix checkpoint error
* fix checkpoint error
* fix TaskExecutionService task is not exist when cancel task
Co-authored-by: Zongwen Li <zo...@apache.org>
---
.../seatunnel/file/sink/BaseFileSinkWriter.java | 5 +-
.../sink/commit/FileSinkAggregatedCommitter.java | 1 +
.../seatunnel/file/sink/util/FileSystemUtils.java | 12 +-
.../file/sink/writer/AbstractWriteStrategy.java | 13 +-
.../seatunnel/file/sink/writer/WriteStrategy.java | 2 +
.../engine/e2e/ClusterFaultToleranceIT.java | 558 ++++++++++++++++++---
.../cluster_batch_fake_to_localfile_template.conf | 4 +-
.../seatunnel/engine/client/SeaTunnelClient.java | 6 +
.../engine/client/SeaTunnelHazelcastClient.java | 6 +
.../common/config/server/ServerConfigOptions.java | 2 +-
.../src/main/resources/hazelcast.yaml | 11 +-
.../engine/server/CoordinatorService.java | 8 +-
.../seatunnel/engine/server/SeaTunnelServer.java | 3 +
.../engine/server/SeaTunnelServerStarter.java | 4 +-
.../engine/server/TaskExecutionService.java | 27 +-
.../server/checkpoint/CheckpointCoordinator.java | 52 +-
.../server/checkpoint/CheckpointFailureReason.java | 2 +-
.../server/checkpoint/CheckpointManager.java | 45 +-
.../server/checkpoint/IMapCheckpointIDCounter.java | 8 +-
.../engine/server/dag/physical/PhysicalPlan.java | 17 +
.../engine/server/dag/physical/PhysicalVertex.java | 3 +-
.../engine/server/dag/physical/SubPlan.java | 2 +
.../seatunnel/engine/server/master/JobMaster.java | 14 +
.../engine/server/CoordinatorServiceTest.java | 7 +-
.../server/checkpoint/CheckpointManagerTest.java | 5 +-
25 files changed, 662 insertions(+), 155 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 8513af2f2..1a57947a4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -46,12 +46,15 @@ public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitIn
if (!fileSinkStates.isEmpty()) {
List<String> transactionIds = writeStrategy.getTransactionIdFromStates(fileSinkStates);
transactionIds.forEach(writeStrategy::abortPrepare);
- writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId());
+ writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1);
+ } else {
+ writeStrategy.beginTransaction(1L);
}
}
public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String jobId) {
this(writeStrategy, hadoopConf, context, jobId, Collections.emptyList());
+ writeStrategy.beginTransaction(1L);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index de3354642..a0f1516e9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -83,6 +83,7 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
*/
@Override
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
+ log.info("rollback aggregate commit");
if (aggregatedCommitInfos == null || aggregatedCommitInfos.size() == 0) {
return;
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index 652f7e6ac..bb69cf6b3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -76,12 +76,19 @@ public class FileSystemUtils {
* @param rmWhenExist if this is true, we will delete the target file when it already exists
* @throws IOException throw IOException
*/
- public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) throws IOException {
+ public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist)
+ throws IOException {
FileSystem fileSystem = getFileSystem(newName);
log.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
Path oldPath = new Path(oldName);
Path newPath = new Path(newName);
+
+ if (!fileExist(oldPath.toString())) {
+ log.warn("rename file :[" + oldPath + "] to [" + newPath + "] already finished in the last commit, skip");
+ return;
+ }
+
if (rmWhenExist) {
if (fileExist(newName) && fileExist(oldName)) {
fileSystem.delete(newPath, true);
@@ -119,6 +126,9 @@ public class FileSystemUtils {
public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
FileSystem fileSystem = getFileSystem(filePath);
List<Path> pathList = new ArrayList<>();
+ if (!fileExist(filePath)) {
+ return pathList;
+ }
Path fileName = new Path(filePath);
FileStatus[] status = fileSystem.listStatus(fileName);
if (status != null && status.length > 0) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 2548ea92a..2d0f3155a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -70,7 +70,9 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
protected Map<String, String> beingWrittenFile;
private Map<String, List<String>> partitionDirAndValuesMap;
protected SeaTunnelRowType seaTunnelRowType;
- protected Long checkpointId = 1L;
+
+ // Checkpoint id from engine is start with 1
+ protected Long checkpointId = 0L;
public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
this.textFileSinkConfig = textFileSinkConfig;
@@ -88,7 +90,6 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
this.jobId = jobId;
this.subTaskIndex = subTaskIndex;
FileSystemUtils.CONF = getConfiguration(hadoopConf);
- this.beginTransaction(this.checkpointId);
}
/**
@@ -233,6 +234,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
* @param checkpointId checkpoint id
*/
public void beginTransaction(Long checkpointId) {
+ this.checkpointId = checkpointId;
this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId + Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT + checkpointId;
this.transactionDirectory = getTransactionDir(this.transactionId);
this.needMoveFiles = new HashMap<>();
@@ -265,8 +267,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
@Override
public List<FileSinkState> snapshotState(long checkpointId) {
ArrayList<FileSinkState> fileState = Lists.newArrayList(new FileSinkState(this.transactionId, this.checkpointId));
- this.checkpointId = checkpointId;
- this.beginTransaction(checkpointId);
+ this.beginTransaction(checkpointId + 1);
return fileState;
}
@@ -303,4 +304,8 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
Matcher.quoteReplacement(textFileSinkConfig.getPath()));
return tmpPath.replaceAll(Constant.NON_PARTITION + Matcher.quoteReplacement(File.separator), "");
}
+
+ public long getCheckpointId() {
+ return this.checkpointId;
+ }
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index aecc80b0d..55ac378db 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -72,4 +72,6 @@ public interface WriteStrategy extends Transaction, Serializable {
* when a transaction is triggered, release resources
*/
void finishAndCloseFile();
+
+ long getCheckpointId();
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 6fc5cee93..c7f8d5eaf 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -37,7 +37,6 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
@@ -62,56 +61,79 @@ public class ClusterFaultToleranceIT {
public static final String DYNAMIC_TEST_PARALLELISM = "dynamic_test_parallelism";
+ @SuppressWarnings("checkstyle:RegexpSingleline")
@Test
public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedException {
String testCaseName = "testBatchJobRunOkIn3Node";
String testClusterName = "ClusterFaultToleranceIT_testBatchJobRunOkIn3Node";
long testRowNumber = 1000;
- int testParallelism = 1;
- HazelcastInstanceImpl node1 =
- SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ int testParallelism = 6;
- HazelcastInstanceImpl node2 =
- SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName(testClusterName));
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ HazelcastInstanceImpl node3 = null;
+ SeaTunnelClient engineClient = null;
- HazelcastInstanceImpl node3 =
- SeaTunnelServerStarter.createHazelcastInstance(
+ try {
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
- // waiting all node added to cluster
- Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3, node1.getCluster().getMembers().size()));
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
- // TODO Need FakeSource support parallel first
- Common.setDeployMode(DeployMode.CLIENT);
- ImmutablePair<String, String> testResources = createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
- JobConfig jobConfig = new JobConfig();
- jobConfig.setName(testCaseName);
+ node3 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
- ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
- clientConfig.setClusterName(
- TestUtils.getClusterName(testClusterName));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- JobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(testResources.getRight(), jobConfig);
- ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
- CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
- return clientJobProxy.waitForJobComplete();
- });
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources =
+ createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
- Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertTrue(
- objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
- Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
- node1.shutdown();
- node2.shutdown();
- node3.shutdown();
+ Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Thread.sleep(2000);
+ System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get()));
+ });
+
+ Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+
+ if (node3 != null) {
+ node3.shutdown();
+ }
+ }
}
/**
@@ -120,8 +142,8 @@ public class ClusterFaultToleranceIT {
*
* @param testCaseName testCaseName
* @param jobMode jobMode
- * @param rowNumber row.num per FakeSource parallelism
- * @param parallelism FakeSource parallelism
+ * @param rowNumber row.num per FakeSource parallelism
+ * @param parallelism FakeSource parallelism
* @return
*/
private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName, @NonNull JobMode jobMode,
@@ -149,65 +171,445 @@ public class ClusterFaultToleranceIT {
return new ImmutablePair<>(targetDir, targetConfigFilePath);
}
+ @SuppressWarnings("checkstyle:RegexpSingleline")
@Test
- @Disabled("Disable because we can not support changeless row number in FakeSource Stream Job")
public void testStreamJobRunOkIn3Node() throws ExecutionException, InterruptedException {
String testCaseName = "testStreamJobRunOkIn3Node";
String testClusterName = "ClusterFaultToleranceIT_testStreamJobRunOkIn3Node";
long testRowNumber = 1000;
- int testParallelism = 1;
- HazelcastInstanceImpl node1 =
- SeaTunnelServerStarter.createHazelcastInstance(
+ int testParallelism = 6;
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ HazelcastInstanceImpl node3 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node3 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
+
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources =
+ createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
+
+ Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Thread.sleep(2000);
+ System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ });
+
+ clientJobProxy.cancelJob();
+
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+ Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+
+ if (node3 != null) {
+ node3.shutdown();
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ @Test
+ public void testBatchJobRestoreIn3NodeWorkerDown() throws ExecutionException, InterruptedException {
+ String testCaseName = "testBatchJobRestoreIn3NodeWorkerDown";
+ String testClusterName = "ClusterFaultToleranceIT_testBatchJobRestoreIn3NodeWorkerDown";
+ long testRowNumber = 1000;
+ int testParallelism = 2;
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ HazelcastInstanceImpl node3 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node3 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
+
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources =
+ createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
+
+ Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // Wait some tasks commit finished
+ Thread.sleep(2000);
+ System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+ });
+
+ // shutdown on worker node
+ node2.shutdown();
+
+ Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+
+ Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+
+ if (node3 != null) {
+ node3.shutdown();
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ @Test
+ public void testStreamJobRestoreIn3NodeWorkerDown() throws ExecutionException, InterruptedException {
+ String testCaseName = "testStreamJobRestoreIn3NodeWorkerDown";
+ String testClusterName = "ClusterFaultToleranceIT_testStreamJobRestoreIn3NodeWorkerDown";
+ long testRowNumber = 1000;
+ int testParallelism = 6;
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ HazelcastInstanceImpl node3 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
- HazelcastInstanceImpl node2 =
- SeaTunnelServerStarter.createHazelcastInstance(
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
- HazelcastInstanceImpl node3 =
- SeaTunnelServerStarter.createHazelcastInstance(
+ node3 = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
- // waiting all node added to cluster
- Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertEquals(3, node1.getCluster().getMembers().size()));
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
- // TODO Need FakeSource support parallel first
- Common.setDeployMode(DeployMode.CLIENT);
- ImmutablePair<String, String> testResources = createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
- JobConfig jobConfig = new JobConfig();
- jobConfig.setName(testCaseName);
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources =
+ createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
- ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
- clientConfig.setClusterName(
- TestUtils.getClusterName(testClusterName));
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- JobExecutionEnvironment jobExecutionEnv =
- engineClient.createExecutionContext(testResources.getRight(), jobConfig);
- ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
- return clientJobProxy.waitForJobComplete();
- });
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
- Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> {
- log.info("File Lines ==================" + FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
- Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
- testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // Wait some tasks commit finished, and we can get rows from the sink target dir
+ Thread.sleep(2000);
+ System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+ });
+
+ Thread.sleep(5000);
+ // shutdown on worker node
+ node2.shutdown();
+
+ Awaitility.await().atMost(180000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // Wait job write all rows in file
+ Thread.sleep(2000);
+ System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ });
+
+ // sleep 10s and expect the job don't write more rows.
+ Thread.sleep(10000);
+ clientJobProxy.cancelJob();
+
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+ // check the final rows
+ Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+
+ if (node3 != null) {
+ node3.shutdown();
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ @Test
+ public void testBatchJobRestoreIn3NodeMasterDown() throws ExecutionException, InterruptedException {
+ String testCaseName = "testBatchJobRestoreIn3NodeMasterDown";
+ String testClusterName = "ClusterFaultToleranceIT_testBatchJobRestoreIn3NodeMasterDown";
+ long testRowNumber = 1000;
+ int testParallelism = 6;
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ HazelcastInstanceImpl node3 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node3 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
+
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources =
+ createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
});
- clientJobProxy.cancelJob();
+ Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // Wait some tasks commit finished
+ Thread.sleep(2000);
+ System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+ });
+
+ // shutdown master node
+ node1.shutdown();
+
+ Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+
+ Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+
+ if (node3 != null) {
+ node3.shutdown();
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ @Test
+ public void testStreamJobRestoreIn3NodeMasterDown() throws ExecutionException, InterruptedException {
+ String testCaseName = "testStreamJobRestoreIn3NodeMasterDown";
+ String testClusterName = "ClusterFaultToleranceIT_testStreamJobRestoreIn3NodeMasterDown";
+ long testRowNumber = 1000;
+ int testParallelism = 6;
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ HazelcastInstanceImpl node3 = null;
+ SeaTunnelClient engineClient = null;
+
+ try {
+ node1 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ node3 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
- Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertTrue(
- objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+ // waiting all node added to cluster
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3, finalNode.getCluster().getMembers().size()));
- Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
- Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources =
+ createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
- node1.shutdown();
- node2.shutdown();
- node3.shutdown();
+ Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // Wait some tasks commit finished, and we can get rows from the sink target dir
+ Thread.sleep(2000);
+ System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ FileUtils.getFileLineNumberFromDir(testResources.getLeft()) > 1);
+ });
+
+ // shutdown master node
+ node1.shutdown();
+
+ Awaitility.await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // Wait job write all rows in file
+ Thread.sleep(2000);
+ System.out.println(FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ });
+
+ // sleep 10s and expect the job don't write more rows.
+ Thread.sleep(10000);
+ clientJobProxy.cancelJob();
+
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+ // check the final rows
+ Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+
+ if (node3 != null) {
+ node3.shutdown();
+ }
+ }
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index 2789cbc3e..b066fd9c9 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -21,7 +21,7 @@
env {
# You can set flink configuration here
job.mode = "${dynamic_job_mode}" # dynamic_job_mode will be replace to the final file name before test run
- execution.checkpoint.interval = 5000
+ checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -29,6 +29,8 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
row.num = ${dynamic_test_row_num_per_parallelism}
+ split.num = 5
+ split.read-interval = 3000
map.size = 10
array.size = 10
bytes.length = 10
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index e3331a4e7..0f5d8fa4e 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -58,4 +58,10 @@ public class SeaTunnelClient implements SeaTunnelClientInstance {
SeaTunnelPrintMessageCodec::decodeResponse
);
}
+
+ public void shutdown() {
+ if (hazelcastClient != null) {
+ hazelcastClient.shutdown();
+ }
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
index e893988b0..c57413585 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelHazelcastClient.java
@@ -125,4 +125,10 @@ public class SeaTunnelHazelcastClient {
UUID masterUuid = hazelcastClient.getClientClusterService().getMasterMember().getUuid();
return requestAndGetCompletableFuture(masterUuid, request);
}
+
+ public void shutdown() {
+ if (hazelcastClient != null) {
+ hazelcastClient.shutdown();
+ }
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index cde0901ac..22be81e01 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -35,7 +35,7 @@ public class ServerConfigOptions {
public static final Option<Integer> CHECKPOINT_INTERVAL = Options.key("interval").intType().defaultValue(300000).withDescription("The interval (in milliseconds) between two consecutive checkpoints.");
- public static final Option<Integer> CHECKPOINT_TIMEOUT = Options.key("timeout").intType().defaultValue(300000).withDescription("The timeout (in milliseconds) for a checkpoint.");
+ public static final Option<Integer> CHECKPOINT_TIMEOUT = Options.key("timeout").intType().defaultValue(30000).withDescription("The timeout (in milliseconds) for a checkpoint.");
public static final Option<Integer> CHECKPOINT_MAX_CONCURRENT = Options.key("max-concurrent").intType().defaultValue(1).withDescription("The maximum number of concurrent checkpoints.");
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
index 21f4d544d..c23ddeeff 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast.yaml
@@ -27,11 +27,6 @@ hazelcast:
auto-increment: true
port-count: 100
port: 5801
- map:
- map-name-template:
- map-store:
- enabled: true
- initial-mode: EAGER
- class-name: org.apache.seatunnel.engine.server.persistence.FileMapStore
- properties:
- path: /tmp/file-store-map
\ No newline at end of file
+ properties:
+ hazelcast.invocation.max.retry.count: 20
+ hazelcast.tcp.join.port.try.count: 100
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 09e91eb92..51793a8f9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -64,7 +64,7 @@ public class CoordinatorService {
private volatile ResourceManager resourceManager;
/**
- * IMap key is jobId and value is a Tuple2.
+ * IMap key is jobId and value is {@link RunningJobInfo}.
* Tuple2 key is JobMaster init timestamp and value is the jobImmutableInformation which is sent by client when submit job
* <p>
* This IMap is used to recovery runningJobInfoIMap in JobMaster when a new master node active
@@ -195,7 +195,7 @@ public class CoordinatorService {
try {
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
} catch (Exception e) {
- throw new SeaTunnelEngineException(String.format("Job id %s init JobMaster failed", jobId));
+ throw new SeaTunnelEngineException(String.format("Job id %s init JobMaster failed", jobId), e);
}
String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
@@ -265,7 +265,7 @@ public class CoordinatorService {
} catch (Exception e) {
isActive = false;
logger.severe(ExceptionUtils.getMessage(e));
- throw new SeaTunnelEngineException("check new active master error, stop loop");
+ throw new SeaTunnelEngineException("check new active master error, stop loop", e);
}
}
@@ -279,7 +279,7 @@ public class CoordinatorService {
executorService.awaitTermination(20, TimeUnit.SECONDS);
runningJobMasterMap = new ConcurrentHashMap<>();
} catch (InterruptedException e) {
- throw new SeaTunnelEngineException("wait clean executor service error");
+ throw new SeaTunnelEngineException("wait clean executor service error", e);
}
if (resourceManager != null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 397f60434..231869318 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -203,6 +203,9 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
return nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress());
}, new RetryUtils.RetryMaterial(20, true,
exception -> exception instanceof NullPointerException && isRunning, 1000));
+ } catch (InterruptedException e) {
+ LOGGER.info("master node check interrupted");
+ return false;
} catch (Exception e) {
throw new SeaTunnelEngineException("cluster have no master node", e);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 447a7fe7c..5a9b49507 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -36,7 +36,7 @@ public class SeaTunnelServerStarter {
return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
- new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+ new SeaTunnelNodeContext(seaTunnelConfig))).getOriginal();
}
public static HazelcastInstanceImpl createHazelcastInstance() {
@@ -44,6 +44,6 @@ public class SeaTunnelServerStarter {
return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
- new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+ new SeaTunnelNodeContext(seaTunnelConfig))).getOriginal();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index fea05ddd4..3b44634b9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -61,6 +61,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
@@ -214,24 +215,30 @@ public class TaskExecutionService {
resultFuture.whenComplete(withTryCatch(logger, (r, s) -> {
logger.info(
String.format("Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState()));
- InvocationFuture<Object> invoke = null;
long sleepTime = 1000;
- while (isRunning && (invoke == null || !invoke.isDone())) {
- if (null != invoke) {
+ boolean notifyStateSuccess = false;
+ while (isRunning && !notifyStateSuccess) {
+ InvocationFuture<Object> invoke = nodeEngine.getOperationService().createInvocationBuilder(
+ SeaTunnelServer.SERVICE_NAME,
+ new NotifyTaskStatusOperation(taskGroup.getTaskGroupLocation(), r),
+ nodeEngine.getMasterAddress()).invoke();
+ try {
+ invoke.get();
+ notifyStateSuccess = true;
+ } catch (InterruptedException e) {
+ logger.severe(e);
+ Thread.interrupted();
+ } catch (ExecutionException e) {
+ logger.warning(ExceptionUtils.getMessage(e));
logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis",
taskGroup.getTaskGroupLocation(), sleepTime));
try {
- Thread.sleep(sleepTime += 1000);
- } catch (InterruptedException e) {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ex) {
logger.severe(e);
Thread.interrupted();
}
}
- invoke = nodeEngine.getOperationService().createInvocationBuilder(
- SeaTunnelServer.SERVICE_NAME,
- new NotifyTaskStatusOperation(taskGroup.getTaskGroupLocation(), r),
- nodeEngine.getMasterAddress()).invoke();
- invoke.join();
}
}));
return new PassiveCompletableFuture<>(resultFuture);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 5ed069f76..dd82f64c2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -22,6 +22,7 @@ import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLET
import static org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
import static org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
@@ -44,6 +45,7 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.Getter;
+import lombok.Setter;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,7 +110,7 @@ public class CheckpointCoordinator {
private final CheckpointConfig coordinatorConfig;
private int tolerableFailureCheckpoints;
- private final transient ScheduledExecutorService scheduler;
+ private transient ScheduledExecutorService scheduler;
private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
@@ -119,6 +121,9 @@ public class CheckpointCoordinator {
/** Flag marking the coordinator as shut down (not accepting any messages any more). */
private volatile boolean shutdown;
+ @Setter
+ private volatile boolean isAllTaskReady = false;
+
@SneakyThrows
public CheckpointCoordinator(CheckpointManager manager,
CheckpointStorage checkpointStorage,
@@ -208,6 +213,7 @@ public class CheckpointCoordinator {
return;
}
}
+ isAllTaskReady = true;
InvocationFuture<?>[] futures = notifyTaskStart();
CompletableFuture.allOf(futures).join();
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
@@ -229,7 +235,7 @@ public class CheckpointCoordinator {
synchronized (lock) {
final long currentTimestamp = Instant.now().toEpochMilli();
if (currentTimestamp - latestTriggerTimestamp.get() >= coordinatorConfig.getCheckpointInterval() &&
- pendingCounter.get() < coordinatorConfig.getMaxConcurrentCheckpoints()) {
+ pendingCounter.get() < coordinatorConfig.getMaxConcurrentCheckpoints() && isAllTaskReady) {
CompletableFuture<PendingCheckpoint> pendingCheckpoint = createPendingCheckpoint(currentTimestamp, CheckpointType.CHECKPOINT_TYPE);
startTriggerPendingCheckpoint(pendingCheckpoint);
pendingCounter.incrementAndGet();
@@ -261,22 +267,30 @@ public class CheckpointCoordinator {
private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint> pendingCompletableFuture) {
pendingCompletableFuture.thenAcceptAsync(pendingCheckpoint -> {
- LOG.debug("wait checkpoint completed: " + pendingCheckpoint);
+ LOG.info("wait checkpoint completed: " + pendingCheckpoint.getCheckpointId());
PassiveCompletableFuture<CompletedCheckpoint> completableFuture = pendingCheckpoint.getCompletableFuture();
completableFuture.thenAcceptAsync(this::completePendingCheckpoint);
if (COMPLETED_POINT_TYPE != pendingCheckpoint.getCheckpointType()) {
// Trigger the barrier and wait for all tasks to ACK
LOG.debug("trigger checkpoint barrier {}/{}/{}, {}", pendingCheckpoint.getJobId(), pendingCheckpoint.getPipelineId(), pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointType());
- CompletableFuture.supplyAsync(() ->
+ CompletableFuture<InvocationFuture<?>[]> completableFutureArray = CompletableFuture.supplyAsync(() ->
new CheckpointBarrier(pendingCheckpoint.getCheckpointId(),
pendingCheckpoint.getCheckpointTimestamp(),
pendingCheckpoint.getCheckpointType()))
- .thenApplyAsync(this::triggerCheckpoint)
- .thenApplyAsync(invocationFutures -> CompletableFuture.allOf(invocationFutures).join());
+ .thenApplyAsync(this::triggerCheckpoint);
+
+ try {
+ CompletableFuture.allOf(completableFutureArray).get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ LOG.error(ExceptionUtils.getMessage(e));
+ return;
+ }
}
- LOG.debug("Start a scheduled task to prevent checkpoint timeouts");
+ LOG.info("Start a scheduled task to prevent checkpoint timeouts");
scheduler.schedule(() -> {
// If any task is not acked within the checkpoint timeout
if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) {
@@ -361,19 +375,29 @@ public class CheckpointCoordinator {
}
protected void cleanPendingCheckpoint(CheckpointFailureReason failureReason) {
- pendingCheckpoints.values().forEach(pendingCheckpoint ->
- pendingCheckpoint.abortCheckpoint(failureReason, null)
- );
- // TODO: clear related future & scheduler task
- pendingCheckpoints.clear();
- scheduler.shutdown();
+ synchronized (lock) {
+ pendingCheckpoints.values().forEach(pendingCheckpoint ->
+ pendingCheckpoint.abortCheckpoint(failureReason, null)
+ );
+ // TODO: clear related future & scheduler task
+ pendingCheckpoints.clear();
+ scheduler.shutdownNow();
+ scheduler = Executors.newScheduledThreadPool(
+ 1, runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ thread.setName(String.format("checkpoint-coordinator-%s/%s", pipelineId, jobId));
+ return thread;
+ });
+ isAllTaskReady = false;
+ }
}
protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
final long checkpointId = ackOperation.getBarrier().getId();
final PendingCheckpoint pendingCheckpoint = pendingCheckpoints.get(checkpointId);
TaskLocation location = ackOperation.getTaskLocation();
- LOG.debug("task[{}]({}/{}) ack. {}", location.getTaskID(), location.getPipelineId(), location.getJobId(), ackOperation.getBarrier().toString());
+ LOG.info("task[{}]({}/{}) ack. {}", location.getTaskID(), location.getPipelineId(), location.getJobId(), ackOperation.getBarrier().toString());
if (ackOperation.getBarrier().getCheckpointType() == COMPLETED_POINT_TYPE) {
synchronized (lock) {
if (pendingCheckpoints.get(Barrier.PREPARE_CLOSE_BARRIER_ID) == null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
index 8d4276f5e..703e6e1d7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointFailureReason.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
public enum CheckpointFailureReason {
- TASK_FAILURE("Task has failed."),
+ PIPELINE_END("Pipeline turn to end state."),
CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown.");
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 972643551..e7ff7ae20 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -34,24 +34,22 @@ import org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOp
import org.apache.seatunnel.engine.server.checkpoint.operation.TaskReportStatusOperation;
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
-import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.Task;
-import org.apache.seatunnel.engine.server.execution.TaskGroup;
-import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+import com.hazelcast.cluster.Address;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -77,6 +75,7 @@ public class CheckpointManager {
private final CheckpointStorage checkpointStorage;
private final JobMaster jobMaster;
+
public CheckpointManager(long jobId,
NodeEngine nodeEngine,
JobMaster jobMaster,
@@ -85,9 +84,12 @@ public class CheckpointManager {
this.jobId = jobId;
this.nodeEngine = nodeEngine;
this.jobMaster = jobMaster;
- this.checkpointStorage = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class, checkpointConfig.getStorage().getStorage())
- .create(new HashMap<>());
- IMap<Integer, Long> checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
+ this.checkpointStorage =
+ FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), CheckpointStorageFactory.class,
+ checkpointConfig.getStorage().getStorage())
+ .create(new ConcurrentHashMap<>());
+ IMap<Integer, Long> checkpointIdMap =
+ nodeEngine.getHazelcastInstance().getMap(String.format("checkpoint-id-%d", jobId));
this.coordinatorMap = checkpointPlanMap.values().parallelStream()
.map(plan -> {
IMapCheckpointIDCounter idCounter = new IMapCheckpointIDCounter(plan.getPipelineId(), checkpointIdMap);
@@ -96,7 +98,9 @@ public class CheckpointManager {
// TODO: support savepoint
PipelineState pipelineState = null;
if (idCounter.get() != CheckpointIDCounter.INITIAL_CHECKPOINT_ID) {
- pipelineState = checkpointStorage.getCheckpoint(String.valueOf(jobId), String.valueOf(plan.getPipelineId()), String.valueOf(idCounter.get() - 1));
+ pipelineState =
+ checkpointStorage.getCheckpoint(String.valueOf(jobId), String.valueOf(plan.getPipelineId()),
+ String.valueOf(idCounter.get() - 1));
}
return new CheckpointCoordinator(this,
checkpointStorage,
@@ -133,6 +137,7 @@ public class CheckpointManager {
}
public void reportedPipelineRunning(int pipelineId) {
+ getCheckpointCoordinator(pipelineId).setAllTaskReady(true);
getCheckpointCoordinator(pipelineId).tryTriggerPendingCheckpoint();
}
@@ -158,25 +163,18 @@ public class CheckpointManager {
*/
public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
// task address may change during restore.
- log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(), reportStatusOperation.getStatus());
+ log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(),
+ reportStatusOperation.getStatus());
getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
}
/**
* Called by the JobMaster.
- * <br> Listen to the {@link ExecutionState} of the {@link TaskGroup}, which is used to cancel the running {@link PendingCheckpoint} when the task group is abnormal.
+ * <br> Listen to the {@link PipelineStatus} of the {@link SubPlan}, which is used to cancel the running {@link PendingCheckpoint} when the SubPlan is abnormal.
*/
- public void listenTaskGroup(TaskGroupLocation groupLocation, ExecutionState executionState) {
- if (jobId != groupLocation.getJobId()) {
- return;
- }
- switch (executionState) {
- case FAILED:
- case CANCELED:
- getCheckpointCoordinator(groupLocation.getPipelineId()).cleanPendingCheckpoint(CheckpointFailureReason.TASK_FAILURE);
- return;
- default:
- }
+ public CompletableFuture<Void> listenPipelineRetry(int pipelineId, PipelineStatus pipelineStatus) {
+ getCheckpointCoordinator(pipelineId).cleanPendingCheckpoint(CheckpointFailureReason.PIPELINE_END);
+ return CompletableFuture.completedFuture(null);
}
/**
@@ -220,6 +218,9 @@ public class CheckpointManager {
}
protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
- return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
+ Address address =
+ jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId());
+ return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation,
+ jobMaster.queryTaskGroupAddress(operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index 8f43ac260..a6861b210 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.engine.server.checkpoint;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.map.IMap;
import java.util.concurrent.CompletableFuture;
@@ -36,7 +39,10 @@ public class IMapCheckpointIDCounter implements CheckpointIDCounter {
@Override
public void start() throws Exception {
- checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
+ RetryUtils.retryWithException(() -> {
+ return checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
+ }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+ exception -> exception instanceof HazelcastInstanceNotActiveException, Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index e086d02c9..6a48f5255 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -143,6 +143,9 @@ public class PhysicalPlan {
PassiveCompletableFuture<PipelineStatus> future = subPlan.initStateFuture();
future.thenAcceptAsync(pipelineState -> {
try {
+ // Notify checkpoint manager when the pipeline end, Whether the pipeline will be restarted or not
+ jobMaster.getCheckpointManager()
+ .listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()).join();
if (PipelineStatus.CANCELED.equals(pipelineState)) {
if (canRestorePipeline(subPlan)) {
subPlan.restorePipeline();
@@ -170,6 +173,8 @@ public class PhysicalPlan {
LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
}
+ notifyCheckpointManagerPipelineEnd(subPlan);
+
if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
if (failedPipelineNum.get() > 0) {
updateJobState(JobStatus.FAILING);
@@ -187,6 +192,15 @@ public class PhysicalPlan {
});
}
+ /**
+ * only call when the pipeline will never restart
+ * @param subPlan subPlan
+ */
+ private void notifyCheckpointManagerPipelineEnd(@NonNull SubPlan subPlan) {
+ jobMaster.getCheckpointManager()
+ .listenPipeline(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()).join();
+ }
+
private boolean canRestorePipeline(SubPlan subPlan) {
return needRestore && subPlan.getPipelineRestoreNum() < PIPELINE_MAX_RESTORE_NUM;
}
@@ -245,6 +259,9 @@ public class PhysicalPlan {
throw new IllegalStateException(message);
}
+ // notify checkpoint manager
+ jobMaster.getCheckpointManager().shutdown(endState);
+
LOGGER.info(String.format("%s end with state %s", getJobFullName(), endState));
// we must update runningJobStateTimestampsIMap first and then can update runningJobStateIMap
updateStateTimestamps(endState);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index a6f352505..38069a16d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -242,7 +242,6 @@ public class PhysicalVertex {
private boolean turnToEndState(@NonNull ExecutionState endState) {
synchronized (this) {
- jobMaster.getCheckpointManager().listenTaskGroup(taskGroupLocation, endState);
// consistency check
ExecutionState currentState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
if (currentState.isEndState()) {
@@ -327,7 +326,7 @@ public class PhysicalVertex {
private void noticeTaskExecutionServiceCancel() {
int i = 0;
// In order not to generate uncontrolled tasks, We will try again until the taskFuture is completed
- while (!taskFuture.isDone()) {
+ while (!taskFuture.isDone() && nodeEngine.getClusterService().getMember(getCurrentExecutionAddress()) != null) {
try {
i++;
LOGGER.info(String.format("send cancel %s operator to member %s", taskFullName, getCurrentExecutionAddress()));
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index f299feb65..b68f9199e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -381,6 +381,8 @@ public class SubPlan {
restorePipeline();
} else if (PipelineStatus.CANCELING.equals(getPipelineState())) {
cancelPipelineTasks();
+ } else if (PipelineStatus.RUNNING.equals(getPipelineState())) {
+ jobMaster.getCheckpointManager().reportedPipelineRunning(this.getPipelineLocation().getPipelineId());
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 83a2af560..76cca50ad 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -20,11 +20,13 @@ package org.apache.seatunnel.engine.server.master;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
@@ -98,6 +100,8 @@ public class JobMaster extends Thread {
private final EngineConfig engineConfig;
+ private boolean isRunning = true;
+
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@@ -304,9 +308,18 @@ public class JobMaster extends Thread {
return ownedSlotProfilesIMap.get(pipelineLocation);
}
+ @SuppressWarnings("checkstyle:MagicNumber")
public void setOwnedSlotProfiles(@NonNull PipelineLocation pipelineLocation,
@NonNull Map<TaskGroupLocation, SlotProfile> pipelineOwnedSlotProfiles) {
ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
+ try {
+ RetryUtils.retryWithException(() -> {
+ return pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation));
+ }, new RetryUtils.RetryMaterial(20, true,
+ exception -> exception instanceof NullPointerException && isRunning, 1000));
+ } catch (Exception e) {
+ throw new SeaTunnelEngineException("Can not sync pipeline owned slot profiles with IMap", e);
+ }
}
public SlotProfile getOwnedSlotProfiles(@NonNull TaskGroupLocation taskGroupLocation) {
@@ -325,6 +338,7 @@ public class JobMaster extends Thread {
public void interrupt() {
try {
+ isRunning = false;
jobMasterCompleteFuture.cancel(true);
} finally {
super.interrupt();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index b84225793..2f11954c3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.serialization.Data;
@@ -155,10 +156,12 @@ public class CoordinatorServiceTest {
// wait job restore
Thread.sleep(5000);
- // job will recovery running state
+ // pipeline will recovery running state
await().atMost(200000, TimeUnit.MILLISECONDS)
.untilAsserted(
- () -> Assertions.assertEquals(JobStatus.RUNNING, server2.getCoordinatorService().getJobStatus(jobId)));
+ () -> Assertions.assertEquals(PipelineStatus.RUNNING,
+ server2.getCoordinatorService().getJobMaster(jobId).getPhysicalPlan().getPipelineList().get(0)
+ .getPipelineState()));
server2.getCoordinatorService().cancelJob(jobId);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index f3d95ed16..6b19101d6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -69,10 +69,9 @@ public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
planMap,
new CheckpointConfig());
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
- CompletableFuture<Void> future = checkpointManager.listenPipeline(1, PipelineStatus.FINISHED);
- future.join();
+ checkpointManager.listenPipeline(1, PipelineStatus.FINISHED);
Assertions.assertNull(checkpointIdMap.get(1));
- future = checkpointManager.shutdown(JobStatus.FINISHED);
+ CompletableFuture<Void> future = checkpointManager.shutdown(JobStatus.FINISHED);
future.join();
Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + "").isEmpty());
}