You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/16 01:15:02 UTC
[incubator-seatunnel] branch dev updated: [hotfix][engine] Add master node switch test and fix bug (#3082)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 608be51bc [hotfix][engine] Add master node switch test and fix bug (#3082)
608be51bc is described below
commit 608be51bc4f367b15b25447587c754304a5262de
Author: Eric <ga...@gmail.com>
AuthorDate: Sun Oct 16 09:14:55 2022 +0800
[hotfix][engine] Add master node switch test and fix bug (#3082)
* Add master node switch test and fix bug
* Add withTryCatch to the whenComplete which may throw exception
---
.../apache/seatunnel/common/utils/FileUtils.java | 10 ++
.../file/sink/writer/TextWriteStrategy.java | 2 +
.../engine/e2e/ClusterFaultToleranceIT.java | 101 ++++++++++++++++++--
.../cluster_batch_fake_to_localfile_template.conf | 9 +-
.../engine/server/CoordinatorService.java | 105 +++++++++++++++------
.../seatunnel/engine/server/SeaTunnelServer.java | 97 +++++++++----------
.../engine/server/TaskExecutionService.java | 19 ++--
.../CheckpointCoordinatorConfiguration.java | 3 +-
.../engine/server/dag/physical/PhysicalPlan.java | 24 ++---
.../engine/server/dag/physical/PhysicalVertex.java | 27 +++---
.../engine/server/dag/physical/SubPlan.java | 50 +++++-----
.../seatunnel/engine/server/master/JobMaster.java | 27 ++++--
.../resourcemanager/AbstractResourceManager.java | 23 +++--
.../resourcemanager/ResourceRequestHandler.java | 14 +--
.../server/service/slot/DefaultSlotService.java | 50 +++++-----
.../engine/server/AbstractSeaTunnelServerTest.java | 19 +++-
.../engine/server/CoordinatorServiceTest.java | 65 ++++++++++++-
17 files changed, 447 insertions(+), 198 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
index 491ae2caa..a4738b6cc 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -121,6 +121,16 @@ public class FileUtils {
return getFileLineNumber(file.getPath());
}
+ /**
+ * create a dir, if the dir exists, clear the files and sub dirs in the dir.
+ * @param dirPath dirPath
+ */
+ public static void createNewDir(@NonNull String dirPath) {
+ deleteFile(dirPath);
+ File file = new File(dirPath);
+ file.mkdirs();
+ }
+
/**
* clear dir and the sub dir
*
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 4433775d5..e9579723d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -101,6 +101,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
}
needMoveFiles.put(key, getTargetLocation(key));
});
+ beingWrittenOutputStream.clear();
+ isFirstWrite.clear();
}
private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index a1b0b1a42..6fc5cee93 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -17,8 +17,11 @@
package org.apache.seatunnel.engine.e2e;
+import static com.google.common.base.Preconditions.checkArgument;
+
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
@@ -31,8 +34,10 @@ import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
@@ -46,14 +51,23 @@ import java.util.concurrent.TimeUnit;
/**
* Cluster fault tolerance test. Test the job recovery capability and data consistency assurance capability in case of cluster node failure
*/
+@Slf4j
public class ClusterFaultToleranceIT {
public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";
+ public static final String DYNAMIC_JOB_MODE = "dynamic_job_mode";
+
+ public static final String DYNAMIC_TEST_ROW_NUM_PER_PARALLELISM = "dynamic_test_row_num_per_parallelism";
+
+ public static final String DYNAMIC_TEST_PARALLELISM = "dynamic_test_parallelism";
+
@Test
public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedException {
String testCaseName = "testBatchJobRunOkIn3Node";
String testClusterName = "ClusterFaultToleranceIT_testBatchJobRunOkIn3Node";
+ long testRowNumber = 1000;
+ int testParallelism = 1;
HazelcastInstanceImpl node1 =
SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName(testClusterName));
@@ -72,7 +86,7 @@ public class ClusterFaultToleranceIT {
// TODO Need FakeSource support parallel first
Common.setDeployMode(DeployMode.CLIENT);
- ImmutablePair<String, String> testResources = createTestResources(testCaseName);
+ ImmutablePair<String, String> testResources = createTestResources(testCaseName, JobMode.BATCH, testRowNumber, testParallelism);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testCaseName);
@@ -88,12 +102,16 @@ public class ClusterFaultToleranceIT {
return clientJobProxy.waitForJobComplete();
});
- Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(200000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(
objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
- Assertions.assertEquals(100, fileLineNumberFromDir);
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+ node1.shutdown();
+ node2.shutdown();
+ node3.shutdown();
}
/**
@@ -101,17 +119,26 @@ public class ClusterFaultToleranceIT {
* It will delete the test sink target path before return the final job config file path
*
* @param testCaseName testCaseName
+ * @param jobMode jobMode
+ * @param rowNumber row.num per FakeSource parallelism
+ * @param parallelism FakeSource parallelism
* @return
*/
- private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName) {
+ private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName, @NonNull JobMode jobMode,
+ long rowNumber, int parallelism) {
+ checkArgument(rowNumber > 0, "rowNumber must greater than 0");
+ checkArgument(parallelism > 0, "parallelism must greater than 0");
Map<String, String> valueMap = new HashMap<>();
valueMap.put(DYNAMIC_TEST_CASE_NAME, testCaseName);
+ valueMap.put(DYNAMIC_JOB_MODE, jobMode.toString());
+ valueMap.put(DYNAMIC_TEST_ROW_NUM_PER_PARALLELISM, String.valueOf(rowNumber));
+ valueMap.put(DYNAMIC_TEST_PARALLELISM, String.valueOf(parallelism));
String targetDir = "/tmp/hive/warehouse/" + testCaseName;
- targetDir = targetDir.replaceAll("/", File.separator);
+ targetDir = targetDir.replace("/", File.separator);
// clear target dir before test
- FileUtils.deleteFile(targetDir);
+ FileUtils.createNewDir(targetDir);
String targetConfigFilePath =
File.separator + "tmp" + File.separator + "test_conf" + File.separator + testCaseName +
@@ -121,4 +148,66 @@ public class ClusterFaultToleranceIT {
return new ImmutablePair<>(targetDir, targetConfigFilePath);
}
+
+ @Test
+ @Disabled("Disable because we can not support changeless row number in FakeSource Stream Job")
+ public void testStreamJobRunOkIn3Node() throws ExecutionException, InterruptedException {
+ String testCaseName = "testStreamJobRunOkIn3Node";
+ String testClusterName = "ClusterFaultToleranceIT_testStreamJobRunOkIn3Node";
+ long testRowNumber = 1000;
+ int testParallelism = 1;
+ HazelcastInstanceImpl node1 =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ HazelcastInstanceImpl node2 =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ HazelcastInstanceImpl node3 =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(testClusterName));
+
+ // waiting all node added to cluster
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3, node1.getCluster().getMembers().size()));
+
+ // TODO Need FakeSource support parallel first
+ Common.setDeployMode(DeployMode.CLIENT);
+ ImmutablePair<String, String> testResources = createTestResources(testCaseName, JobMode.STREAMING, testRowNumber, testParallelism);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(testCaseName);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName(testClusterName));
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
+
+ Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ log.info("File Lines ==================" + FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ Assertions.assertTrue(JobStatus.RUNNING.equals(clientJobProxy.getJobStatus()) &&
+ testRowNumber * testParallelism == FileUtils.getFileLineNumberFromDir(testResources.getLeft()));
+ });
+
+ clientJobProxy.cancelJob();
+
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
+ Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(testRowNumber * testParallelism, fileLineNumberFromDir);
+
+ node1.shutdown();
+ node2.shutdown();
+ node3.shutdown();
+ }
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index 35b4e44c4..2789cbc3e 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -20,7 +20,7 @@
env {
# You can set flink configuration here
- job.mode = "BATCH"
+ job.mode = "${dynamic_job_mode}" # dynamic_job_mode will be replace to the final file name before test run
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -28,12 +28,12 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
- row.num = 100
+ row.num = ${dynamic_test_row_num_per_parallelism}
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
- parallelism = 1
+ parallelism = ${dynamic_test_parallelism}
schema = {
fields {
c_map = "map<string, array<int>>"
@@ -81,9 +81,6 @@ sink {
path="/tmp/hive/warehouse/${dynamic_test_case_name}" # dynamic_test_case_name will be replace to the final file name before test run
field_delimiter="\t"
row_delimiter="\n"
- partition_by=["c_string"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="text"
filename_time_format="yyyy.MM.dd"
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 2b4ef575a..78ec4158d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.cluster.Address;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.MembershipServiceEvent;
@@ -50,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -111,15 +113,21 @@ public class CoordinatorService {
private volatile boolean isActive = false;
private final ExecutorService executorService;
- private final ScheduledExecutorService monitorService;
+
+ private final SeaTunnelServer seaTunnelServer;
+
+ private ScheduledExecutorService masterActiveListener;
@SuppressWarnings("checkstyle:MagicNumber")
- public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull ExecutorService executorService) {
+ public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull SeaTunnelServer seaTunnelServer) {
this.nodeEngine = nodeEngine;
this.logger = nodeEngine.getLogger(getClass());
- this.executorService = executorService;
- this.monitorService = Executors.newSingleThreadScheduledExecutor();
- monitorService.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
+ this.executorService =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat("seatunnel-coordinator-service-%d").build());
+ this.seaTunnelServer = seaTunnelServer;
+ masterActiveListener = Executors.newSingleThreadScheduledExecutor();
+ masterActiveListener.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 100, TimeUnit.MILLISECONDS);
}
public JobMaster getJobMaster(Long jobId) {
@@ -146,8 +154,11 @@ public class CoordinatorService {
ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
List<CompletableFuture<Void>> collect = runningJobInfoIMap.entrySet().stream().map(entry -> {
- return CompletableFuture.runAsync(() -> restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue()),
- executorService);
+ return CompletableFuture.runAsync(() -> {
+ logger.info(String.format("begin restore job (%s) from master active switch", entry.getKey()));
+ restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue());
+ logger.info(String.format("restore job (%s) from master active switch finished", entry.getKey()));
+ }, executorService);
}).collect(Collectors.toList());
try {
@@ -170,7 +181,7 @@ public class CoordinatorService {
new JobMaster(runningJobInfo.getJobImmutableInformation(),
nodeEngine,
executorService,
- resourceManager,
+ getResourceManager(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap);
@@ -181,12 +192,18 @@ public class CoordinatorService {
throw new SeaTunnelEngineException(String.format("Job id %s init JobMaster failed", jobId));
}
+ String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
if (jobStatus.isEndState()) {
+ logger.info(String.format(
+ "The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info",
+ jobFullName, jobStatus));
removeJobIMap(jobMaster);
return;
}
if (jobStatus.ordinal() < JobStatus.RUNNING.ordinal()) {
+ logger.info(
+ String.format("The restore %s is state %s, cancel job and submit it again.", jobFullName, jobStatus));
jobMaster.cancelJob();
jobMaster.getJobMasterCompleteFuture().join();
submitJob(jobId, runningJobInfo.getJobImmutableInformation()).join();
@@ -197,6 +214,7 @@ public class CoordinatorService {
jobMaster.markRestore();
if (JobStatus.CANCELLING.equals(jobStatus)) {
+ logger.info(String.format("The restore %s is in %s state, cancel the job", jobFullName, jobStatus));
CompletableFuture.runAsync(() -> {
try {
jobMaster.cancelJob();
@@ -211,6 +229,8 @@ public class CoordinatorService {
}
if (JobStatus.RUNNING.equals(jobStatus)) {
+ logger.info(String.format("The restore %s is in %s state, restore pipeline and take over this job running",
+ jobFullName, jobStatus));
CompletableFuture.runAsync(() -> {
try {
jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
@@ -226,14 +246,20 @@ public class CoordinatorService {
}
private void checkNewActiveMaster() {
- if (!isActive && isMasterNode()) {
- logger.info("This node become a new active master node, begin init coordinator service");
- initCoordinatorService();
- isActive = true;
- } else if (isActive && !isMasterNode()) {
+ try {
+ if (!isActive && this.seaTunnelServer.isMasterNode()) {
+ logger.info("This node become a new active master node, begin init coordinator service");
+ initCoordinatorService();
+ isActive = true;
+ } else if (isActive && !this.seaTunnelServer.isMasterNode()) {
+ isActive = false;
+ logger.info("This node become leave active master node, begin clear coordinator service");
+ clearCoordinatorService();
+ }
+ } catch (Exception e) {
isActive = false;
- logger.info("This node become leave active master node, begin clear coordinator service");
- clearCoordinatorService();
+ logger.severe(ExceptionUtils.getMessage(e));
+ throw new SeaTunnelEngineException("check new active master error, stop loop");
}
}
@@ -255,15 +281,6 @@ public class CoordinatorService {
}
}
- public boolean isMasterNode() {
- Address masterAddress = nodeEngine.getMasterAddress();
- if (masterAddress == null) {
- return false;
- }
-
- return masterAddress.equals(nodeEngine.getThisAddress());
- }
-
/**
* Lazy load for resource manager
*/
@@ -387,10 +404,10 @@ public class CoordinatorService {
}
public void shutdown() {
- if (resourceManager != null) {
- resourceManager.close();
+ if (masterActiveListener != null) {
+ masterActiveListener.shutdownNow();
}
- monitorService.shutdown();
+ clearCoordinatorService();
}
/**
@@ -418,11 +435,12 @@ public class CoordinatorService {
ExecutionState executionState = physicalVertex.getExecutionState();
if (null != deployAddress && deployAddress.equals(lostAddress) &&
(executionState.equals(ExecutionState.DEPLOYING) ||
- executionState.equals(ExecutionState.RUNNING))) {
+ executionState.equals(ExecutionState.RUNNING) ||
+ executionState.equals(ExecutionState.CANCELING))) {
TaskGroupLocation taskGroupLocation = physicalVertex.getTaskGroupLocation();
physicalVertex.updateTaskExecutionState(
new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED,
- new SeaTunnelEngineException(
+ new JobException(
String.format("The taskGroup(%s) deployed node(%s) offline", taskGroupLocation,
lostAddress))));
}
@@ -433,4 +451,33 @@ public class CoordinatorService {
this.getResourceManager().memberRemoved(event);
this.failedTaskOnMemberRemoved(event);
}
+
+ public void printExecutionInfo() {
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
+ int activeCount = threadPoolExecutor.getActiveCount();
+ int corePoolSize = threadPoolExecutor.getCorePoolSize();
+ int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
+ int poolSize = threadPoolExecutor.getPoolSize();
+ long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
+ long taskCount = threadPoolExecutor.getTaskCount();
+ StringBuffer sbf = new StringBuffer();
+ sbf.append("activeCount=")
+ .append(activeCount)
+ .append("\n")
+ .append("corePoolSize=")
+ .append(corePoolSize)
+ .append("\n")
+ .append("maximumPoolSize=")
+ .append(maximumPoolSize)
+ .append("\n")
+ .append("poolSize=")
+ .append(poolSize)
+ .append("\n")
+ .append("completedTaskCount=")
+ .append(completedTaskCount)
+ .append("\n")
+ .append("taskCount=")
+ .append(taskCount);
+ logger.info(sbf.toString());
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 90a87a770..269fd3ed5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
@@ -25,7 +26,6 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.MembershipAwareService;
@@ -41,10 +41,8 @@ import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import lombok.NonNull;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class SeaTunnelServer implements ManagedService, MembershipAwareService, LiveOperationsTracker {
@@ -60,17 +58,14 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
private CoordinatorService coordinatorService;
private ScheduledExecutorService monitorService;
- private final ExecutorService executorService;
-
private final SeaTunnelConfig seaTunnelConfig;
+ private volatile boolean isRunning = true;
+
public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
this.logger = node.getLogger(getClass());
this.liveOperationRegistry = new LiveOperationRegistry();
this.seaTunnelConfig = seaTunnelConfig;
- this.executorService =
- Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setNameFormat("seatunnel-server-executor-%d").build());
logger.info("SeaTunnel server start...");
}
@@ -100,7 +95,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
);
taskExecutionService.start();
getSlotService();
- coordinatorService = new CoordinatorService(nodeEngine, executorService);
+ coordinatorService = new CoordinatorService(nodeEngine, this);
monitorService = Executors.newSingleThreadScheduledExecutor();
monitorService.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, TimeUnit.SECONDS);
}
@@ -112,15 +107,19 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
@Override
public void shutdown(boolean terminate) {
+ isRunning = false;
+ if (taskExecutionService != null) {
+ taskExecutionService.shutdown();
+ }
+ if (monitorService != null) {
+ monitorService.shutdownNow();
+ }
if (slotService != null) {
slotService.close();
}
if (coordinatorService != null) {
coordinatorService.shutdown();
}
- executorService.shutdown();
- taskExecutionService.shutdown();
- monitorService.shutdown();
}
@Override
@@ -131,7 +130,7 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
@Override
public void memberRemoved(MembershipServiceEvent event) {
try {
- if (coordinatorService.isMasterNode()) {
+ if (isMasterNode()) {
this.getCoordinatorService().memberRemoved(event);
}
} catch (SeaTunnelEngineException e) {
@@ -159,19 +158,25 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
@SuppressWarnings("checkstyle:MagicNumber")
public CoordinatorService getCoordinatorService() {
int retryCount = 0;
- while (coordinatorService.isMasterNode() && !coordinatorService.isCoordinatorActive() && retryCount < 20) {
- try {
- logger.warning("Waiting this node become the active master node");
- Thread.sleep(1000);
- retryCount++;
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ if (isMasterNode()) {
+ // TODO the retry count and sleep time need configurable
+ while (!coordinatorService.isCoordinatorActive() && retryCount < 20 && isRunning) {
+ try {
+ logger.warning("This is master node, waiting the coordinator service init finished");
+ Thread.sleep(1000);
+ retryCount++;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
+ if (coordinatorService.isCoordinatorActive()) {
+ return coordinatorService;
+ }
+
+ throw new SeaTunnelEngineException("Can not get coordinator service from an active master node.");
+ } else {
+ throw new SeaTunnelEngineException("Please don't get coordinator service from an inactive master node");
}
- if (!coordinatorService.isCoordinatorActive()) {
- throw new SeaTunnelEngineException("Can not get coordinator service from an inactive master node.");
- }
- return coordinatorService;
}
public TaskExecutionService getTaskExecutionService() {
@@ -180,11 +185,13 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
/**
* return whether task is end
+ *
* @param taskGroupLocation taskGroupLocation
* @return
*/
public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
- IMap<Object, Object> runningJobState = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
+ IMap<Object, Object> runningJobState =
+ nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
if (runningJobState == null) {
return false;
}
@@ -193,32 +200,20 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
return taskState == null ? false : ((ExecutionState) taskState).isEndState();
}
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public boolean isMasterNode() {
+ // must retry until the cluster have master node
+ try {
+ return RetryUtils.retryWithException(() -> {
+ return nodeEngine.getMasterAddress().equals(nodeEngine.getThisAddress());
+ }, new RetryUtils.RetryMaterial(20, true,
+ exception -> exception instanceof NullPointerException && isRunning, 1000));
+ } catch (Exception e) {
+ throw new SeaTunnelEngineException("cluster have no master node", e);
+ }
+ }
+
private void printExecutionInfo() {
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
- int activeCount = threadPoolExecutor.getActiveCount();
- int corePoolSize = threadPoolExecutor.getCorePoolSize();
- int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
- int poolSize = threadPoolExecutor.getPoolSize();
- long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
- long taskCount = threadPoolExecutor.getTaskCount();
- StringBuffer sbf = new StringBuffer();
- sbf.append("activeCount=")
- .append(activeCount)
- .append("\n")
- .append("corePoolSize=")
- .append(corePoolSize)
- .append("\n")
- .append("maximumPoolSize=")
- .append(maximumPoolSize)
- .append("\n")
- .append("poolSize=")
- .append(poolSize)
- .append("\n")
- .append("completedTaskCount=")
- .append(completedTaskCount)
- .append("\n")
- .append("taskCount=")
- .append(taskCount);
- logger.info(sbf.toString());
+ coordinatorService.printExecutionInfo();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 6f88f89dc..44d2002a9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -77,7 +77,7 @@ public class TaskExecutionService {
private final String hzInstanceName;
private final NodeEngineImpl nodeEngine;
private final ILogger logger;
- private volatile boolean isShutdown;
+ private volatile boolean isRunning = true;
private final LinkedBlockingDeque<TaskTracker> threadShareTaskQueue = new LinkedBlockingDeque<>();
private final ExecutorService executorService = newCachedThreadPool(new BlockingTaskThreadFactory());
private final RunBusWorkSupplier runBusWorkSupplier = new RunBusWorkSupplier(executorService, threadShareTaskQueue);
@@ -97,7 +97,7 @@ public class TaskExecutionService {
}
public void shutdown() {
- isShutdown = true;
+ isRunning = false;
executorService.shutdownNow();
}
@@ -211,10 +211,12 @@ public class TaskExecutionService {
logger.severe(ExceptionUtils.getMessage(t));
resultFuture.completeExceptionally(t);
}
- resultFuture.whenComplete((r, s) -> {
+ resultFuture.whenComplete(withTryCatch(logger, (r, s) -> {
+ logger.info(
+ String.format("Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState()));
InvocationFuture<Object> invoke = null;
long sleepTime = 1000;
- do {
+ while (isRunning && (invoke == null || !invoke.isDone())) {
if (null != invoke) {
logger.warning(String.format("notify the job of the task(%s) status failed, retry in %s millis",
taskGroup.getTaskGroupLocation(), sleepTime));
@@ -230,8 +232,8 @@ public class TaskExecutionService {
new NotifyTaskStatusOperation(taskGroup.getTaskGroupLocation(), r),
nodeEngine.getMasterAddress()).invoke();
invoke.join();
- } while (!invoke.isDone());
- });
+ }
+ }));
return new PassiveCompletableFuture<>(resultFuture);
}
@@ -270,7 +272,7 @@ public class TaskExecutionService {
ProgressState result;
do {
result = t.call();
- } while (!result.isDone() && !isShutdown &&
+ } while (!result.isDone() && isRunning &&
!tracker.taskGroupExecutionTracker.executionCompletedExceptionally());
} catch (Throwable e) {
logger.warning("Exception in " + t, e);
@@ -313,7 +315,7 @@ public class TaskExecutionService {
@SneakyThrows
@Override
public void run() {
- while (keep.get()) {
+ while (keep.get() && isRunning) {
TaskTracker taskTracker = null != exclusiveTaskTracker.get() ?
exclusiveTaskTracker.get() :
taskqueue.takeFirst();
@@ -431,6 +433,7 @@ public class TaskExecutionService {
}
void taskDone() {
+ logger.info("taskDone: " + taskGroup.getTaskGroupLocation());
if (completionLatch.decrementAndGet() == 0) {
TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
executionContexts.remove(taskGroupLocation);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
index 91c1e3ce4..257bdbe6a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
@@ -101,7 +101,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
@SuppressWarnings("MagicNumber")
public static final class Builder {
- private long checkpointInterval = 300000;
+ // TODO 5000 is for test, we can update checkpointInterval to 300000 after we support it read from job config
+ private long checkpointInterval = 5000;
private long checkpointTimeout = 300000;
private int maxConcurrentCheckpoints = 1;
private int tolerableFailureCheckpoints = 0;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index aacd99297..556d15809 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -169,20 +169,20 @@ public class PhysicalPlan {
jobMaster.releasePipelineResource(subPlan);
LOGGER.severe("Pipeline Failed, Begin to cancel other pipelines in this job.");
}
- } catch (Throwable e) {
- // Because only cancelJob or releasePipelineResource can throw exception, so we only output log here
- LOGGER.severe(ExceptionUtils.getMessage(e));
- }
- if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
- if (failedPipelineNum.get() > 0) {
- updateJobState(JobStatus.FAILING);
- } else if (canceledPipelineNum.get() > 0) {
- turnToEndState(JobStatus.CANCELED);
- } else {
- turnToEndState(JobStatus.FINISHED);
+ if (finishedPipelineNum.incrementAndGet() == this.pipelineList.size()) {
+ if (failedPipelineNum.get() > 0) {
+ updateJobState(JobStatus.FAILING);
+ } else if (canceledPipelineNum.get() > 0) {
+ turnToEndState(JobStatus.CANCELED);
+ } else {
+ turnToEndState(JobStatus.FINISHED);
+ }
+ jobEndFuture.complete((JobStatus) runningJobStateIMap.get(jobId));
}
- jobEndFuture.complete((JobStatus) runningJobStateIMap.get(jobId));
+ } catch (Throwable e) {
+ // Because only cancelJob or releasePipelineResource can throw exception, so we only output log here
+ LOGGER.severe("Never come here ", e);
}
});
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index afcaf4a3c..a6f352505 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -101,8 +101,6 @@ public class PhysicalVertex {
private final NodeEngine nodeEngine;
- private Address currentExecutionAddress;
-
private TaskGroupImmutableInformation taskGroupImmutableInformation;
private JobMaster jobMaster;
@@ -150,14 +148,15 @@ public class PhysicalVertex {
this.nodeEngine = nodeEngine;
this.taskFullName =
String.format(
- "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)]",
+ "Job %s (%s), Pipeline: [(%d/%d)], task: [%s (%d/%d)], taskGroupLocation: [%s]",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
pipelineId,
totalPipelineNum,
taskGroup.getTaskGroupName(),
subTaskGroupIndex + 1,
- parallelism);
+ parallelism,
+ taskGroupLocation);
this.taskFuture = new CompletableFuture<>();
this.runningJobStateIMap = runningJobStateIMap;
@@ -167,6 +166,9 @@ public class PhysicalVertex {
public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
this.taskFuture = new CompletableFuture<>();
ExecutionState executionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation);
+ if (executionState != null) {
+ LOGGER.info(String.format("The task %s is in state %s when init state future", taskFullName, executionState));
+ }
// If the task state is CANCELING we need call noticeTaskExecutionServiceCancel().
if (ExecutionState.CANCELING.equals(executionState)) {
noticeTaskExecutionServiceCancel();
@@ -202,7 +204,6 @@ public class PhysicalVertex {
// This method must not throw an exception
public void deploy(@NonNull SlotProfile slotProfile) {
try {
- currentExecutionAddress = slotProfile.getWorker();
if (slotProfile.getWorker().equals(nodeEngine.getThisAddress())) {
deployOnLocal(slotProfile);
} else {
@@ -257,12 +258,11 @@ public class PhysicalVertex {
return false;
}
+ updateStateTimestamps(endState);
+ runningJobStateIMap.set(taskGroupLocation, endState);
LOGGER.info(String.format("%s turn to end state %s.",
taskFullName,
endState));
- updateStateTimestamps(endState);
-
- runningJobStateIMap.set(taskGroupLocation, endState);
return true;
}
}
@@ -296,14 +296,12 @@ public class PhysicalVertex {
// now do the actual state transition
if (current.equals(runningJobStateIMap.get(taskGroupLocation))) {
+ updateStateTimestamps(targetState);
+ runningJobStateIMap.set(taskGroupLocation, targetState);
LOGGER.info(String.format("%s turn from state %s to %s.",
taskFullName,
current,
targetState));
-
- updateStateTimestamps(targetState);
-
- runningJobStateIMap.set(taskGroupLocation, targetState);
return true;
} else {
return false;
@@ -332,9 +330,10 @@ public class PhysicalVertex {
while (!taskFuture.isDone()) {
try {
i++;
+ LOGGER.info(String.format("send cancel %s operator to member %s", taskFullName, getCurrentExecutionAddress()));
nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
new CancelTaskOperation(taskGroupLocation),
- currentExecutionAddress)
+ getCurrentExecutionAddress())
.invoke().get();
return;
} catch (Exception e) {
@@ -403,7 +402,7 @@ public class PhysicalVertex {
}
public Address getCurrentExecutionAddress() {
- return currentExecutionAddress;
+ return jobMaster.getOwnedSlotProfiles(taskGroupLocation).getWorker();
}
public TaskGroupLocation getTaskGroupLocation() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index d9672c6d2..752c6741f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -143,28 +143,34 @@ public class SubPlan {
private void addPhysicalVertexCallBack(PassiveCompletableFuture<TaskExecutionState> future) {
future.thenAcceptAsync(executionState -> {
- // We need not handle t, Because we will not return t from PhysicalVertex
- if (ExecutionState.CANCELED.equals(executionState.getExecutionState())) {
- canceledTaskNum.incrementAndGet();
- } else if (ExecutionState.FAILED.equals(executionState.getExecutionState())) {
- LOGGER.severe(String.format("Task Failed in %s, Begin to cancel other tasks in this pipeline.",
- this.getPipelineFullName()));
- failedTaskNum.incrementAndGet();
- cancelPipeline();
- }
+ try {
+ // We need not handle t, Because we will not return t from PhysicalVertex
+ if (ExecutionState.CANCELED.equals(executionState.getExecutionState())) {
+ canceledTaskNum.incrementAndGet();
+ } else if (ExecutionState.FAILED.equals(executionState.getExecutionState())) {
+ LOGGER.severe(String.format("Task %s Failed in %s, Begin to cancel other tasks in this pipeline.",
+ executionState.getTaskGroupLocation(),
+ this.getPipelineFullName()));
+ failedTaskNum.incrementAndGet();
+ cancelPipeline();
+ }
- if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
- if (failedTaskNum.get() > 0) {
- turnToEndState(PipelineState.FAILED);
- LOGGER.info(String.format("%s end with state FAILED", this.pipelineFullName));
- } else if (canceledTaskNum.get() > 0) {
- turnToEndState(PipelineState.CANCELED);
- LOGGER.info(String.format("%s end with state CANCELED", this.pipelineFullName));
- } else {
- turnToEndState(PipelineState.FINISHED);
- LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
+ if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
+ if (failedTaskNum.get() > 0) {
+ turnToEndState(PipelineState.FAILED);
+ LOGGER.info(String.format("%s end with state FAILED", this.pipelineFullName));
+ } else if (canceledTaskNum.get() > 0) {
+ turnToEndState(PipelineState.CANCELED);
+ LOGGER.info(String.format("%s end with state CANCELED", this.pipelineFullName));
+ } else {
+ turnToEndState(PipelineState.FINISHED);
+ LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
+ }
+ pipelineFuture.complete((PipelineState) runningJobStateIMap.get(pipelineLocation));
}
- pipelineFuture.complete((PipelineState) runningJobStateIMap.get(pipelineLocation));
+ } catch (Throwable e) {
+ LOGGER.severe(String.format("Never come here. handle %s %s error",
+ executionState.getTaskGroupLocation(), executionState.getExecutionState()), e);
}
});
}
@@ -330,7 +336,9 @@ public class SubPlan {
pipelineRestoreNum++;
LOGGER.info(String.format("Restore pipeline %s", pipelineFullName));
// We must ensure the scheduler complete and then can handle pipeline state change.
- jobMaster.getScheduleFuture().join();
+ if (jobMaster.getScheduleFuture() != null) {
+ jobMaster.getScheduleFuture().join();
+ }
if (reSchedulerPipelineFuture != null) {
reSchedulerPipelineFuture.join();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index cd593c112..b17bdc705 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.master;
+import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
@@ -88,7 +90,7 @@ public class JobMaster extends Thread {
private final IMap<Object, Object> runningJobStateTimestampsIMap;
- private CompletableFuture<Void> scheduleFuture = new CompletableFuture<>();
+ private CompletableFuture<Void> scheduleFuture;
private volatile boolean restore = false;
@@ -113,9 +115,10 @@ public class JobMaster extends Thread {
public void init(long initializationTimestamp) throws Exception {
jobImmutableInformation = nodeEngine.getSerializationService().toObject(
jobImmutableInformationData);
- LOGGER.info("Job [" + jobImmutableInformation.getJobId() + "] submit");
- LOGGER.info(
- "Job [" + jobImmutableInformation.getJobId() + "] jar urls " + jobImmutableInformation.getPluginJarsUrls());
+ LOGGER.info(String.format("Init JobMaster for Job %s (%s) ", jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId()));
+ LOGGER.info(String.format("Job %s (%s) needed jar urls %s", jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId(), jobImmutableInformation.getPluginJarsUrls()));
LogicalDag logicalDag;
if (!CollectionUtils.isEmpty(jobImmutableInformation.getPluginJarsUrls())) {
@@ -149,14 +152,14 @@ public class JobMaster extends Thread {
public void initStateFuture() {
jobMasterCompleteFuture = new CompletableFuture<>();
PassiveCompletableFuture<JobStatus> jobStatusFuture = physicalPlan.initStateFuture();
- jobStatusFuture.whenComplete((v, t) -> {
+ jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> {
// We need not handle t, Because we will not return t from physicalPlan
if (JobStatus.FAILING.equals(v)) {
cleanJob();
physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
}
jobMasterCompleteFuture.complete(physicalPlan.getJobStatus());
- });
+ }));
}
@SuppressWarnings("checkstyle:MagicNumber")
@@ -184,13 +187,17 @@ public class JobMaster extends Thread {
public void handleCheckpointTimeout(long pipelineId) {
this.physicalPlan.getPipelineList().forEach(pipeline -> {
if (pipeline.getPipelineLocation().getPipelineId() == pipelineId) {
- LOGGER.warning(String.format("%s checkpoint timeout, cancel the pipeline", pipeline.getPipelineFullName()));
+ LOGGER.warning(
+ String.format("%s checkpoint timeout, cancel the pipeline", pipeline.getPipelineFullName()));
pipeline.cancelPipeline();
}
});
}
public PassiveCompletableFuture<Void> reSchedulerPipeline(SubPlan subPlan) {
+ if (jobScheduler == null) {
+ jobScheduler = new PipelineBaseScheduler(physicalPlan, this);
+ }
return new PassiveCompletableFuture<>(jobScheduler.reSchedulerPipeline(subPlan));
}
@@ -279,6 +286,12 @@ public class JobMaster extends Thread {
ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
}
+ public SlotProfile getOwnedSlotProfiles(@NonNull TaskGroupLocation taskGroupLocation) {
+ return ownedSlotProfilesIMap.get(
+ new PipelineLocation(taskGroupLocation.getJobId(), taskGroupLocation.getPipelineId()))
+ .get(taskGroupLocation);
+ }
+
public CompletableFuture<Void> getScheduleFuture() {
return scheduleFuture;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
index aac24d732..e13c02807 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java
@@ -54,6 +54,8 @@ public abstract class AbstractResourceManager implements ResourceManager {
private final ExecutionMode mode = ExecutionMode.LOCAL;
+ private volatile boolean isRunning = true;
+
public AbstractResourceManager(NodeEngine nodeEngine) {
this.registerWorker = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RESOURCE_MANAGER_REGISTER_WORKER);
this.nodeEngine = nodeEngine;
@@ -66,14 +68,17 @@ public abstract class AbstractResourceManager implements ResourceManager {
private void checkRegisterWorkerStillAlive() {
if (!registerWorker.isEmpty()) {
- List<Address> aliveWorker = nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress).collect(Collectors.toList());
- List<Address> dead = registerWorker.keySet().stream().filter(r -> !aliveWorker.contains(r)).collect(Collectors.toList());
+ List<Address> aliveWorker = nodeEngine.getClusterService().getMembers().stream().map(Member::getAddress)
+ .collect(Collectors.toList());
+ List<Address> dead =
+ registerWorker.keySet().stream().filter(r -> !aliveWorker.contains(r)).collect(Collectors.toList());
dead.forEach(registerWorker::remove);
}
}
@Override
- public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile) throws NoEnoughResourceException {
+ public CompletableFuture<SlotProfile> applyResource(long jobId, ResourceProfile resourceProfile)
+ throws NoEnoughResourceException {
CompletableFuture<SlotProfile> completableFuture = new CompletableFuture<>();
applyResources(jobId, Collections.singletonList(resourceProfile)).whenComplete((profile, error) -> {
if (error != null) {
@@ -89,7 +94,7 @@ public abstract class AbstractResourceManager implements ResourceManager {
if (ExecutionMode.LOCAL.equals(mode)) {
// Local mode, should wait worker(master node) register.
try {
- while (registerWorker.isEmpty()) {
+ while (registerWorker.isEmpty() && isRunning) {
LOGGER.info("waiting current worker register to resource manager...");
Thread.sleep(DEFAULT_WORKER_CHECK_INTERVAL);
}
@@ -108,7 +113,8 @@ public abstract class AbstractResourceManager implements ResourceManager {
@Override
public CompletableFuture<List<SlotProfile>> applyResources(long jobId,
- List<ResourceProfile> resourceProfile) throws NoEnoughResourceException {
+ List<ResourceProfile> resourceProfile)
+ throws NoEnoughResourceException {
waitingWorkerRegister();
return new ResourceRequestHandler(jobId, resourceProfile, registerWorker, this).request();
}
@@ -123,11 +129,13 @@ public abstract class AbstractResourceManager implements ResourceManager {
* @param resourceProfiles the worker should have resource profile list
*/
protected void findNewWorker(List<ResourceProfile> resourceProfiles) {
- throw new UnsupportedOperationException("Unsupported operation to find new worker in " + this.getClass().getName());
+ throw new UnsupportedOperationException(
+ "Unsupported operation to find new worker in " + this.getClass().getName());
}
@Override
public void close() {
+ isRunning = false;
}
protected <E> InvocationFuture<E> sendToMember(Operation operation, Address address) {
@@ -162,7 +170,8 @@ public abstract class AbstractResourceManager implements ResourceManager {
@Override
public boolean slotActiveCheck(SlotProfile profile) {
- return registerWorker.values().stream().flatMap(workerProfile -> Arrays.stream(workerProfile.getAssignedSlots()))
+ return registerWorker.values().stream()
+ .flatMap(workerProfile -> Arrays.stream(workerProfile.getAssignedSlots()))
.anyMatch(s -> s.getSlotID() == profile.getSlotID());
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
index 6b6b21ff4..b45306645 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceRequestHandler.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.resourcemanager;
+import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+
import org.apache.seatunnel.engine.common.runtime.DeployType;
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
@@ -83,7 +85,7 @@ public class ResourceRequestHandler {
}
}
// all resource preCheck done, also had sent request to worker
- getAllOfFuture(allRequestFuture).whenComplete((unused, error) -> {
+ getAllOfFuture(allRequestFuture).whenComplete(withTryCatch(LOGGER, (unused, error) -> {
if (error != null) {
completeRequestWithException(error);
}
@@ -95,7 +97,7 @@ public class ResourceRequestHandler {
completeRequestWithException(new NoEnoughResourceException("can't apply resource request: " + resourceProfile.get(findNullIndexInResultSlotProfiles())));
}
}
- });
+ }));
return completableFuture;
}
@@ -129,7 +131,7 @@ public class ResourceRequestHandler {
private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember(int i, ResourceProfile r, WorkerProfile workerProfile) {
InvocationFuture<SlotAndWorkerProfile> future = resourceManager.sendToMember(new RequestSlotOperation(jobId, r), workerProfile.getAddress());
return future.whenComplete(
- (slotAndWorkerProfile, error) -> {
+ withTryCatch(LOGGER, (slotAndWorkerProfile, error) -> {
if (error != null) {
throw new RuntimeException(error);
} else {
@@ -137,7 +139,7 @@ public class ResourceRequestHandler {
addSlotToCacheMap(i, slotAndWorkerProfile.getSlotProfile());
}
}
- );
+ ));
}
private Optional<WorkerProfile> preCheckWorkerResource(ResourceProfile r) {
@@ -169,7 +171,7 @@ public class ResourceRequestHandler {
}
}
resourceManager.findNewWorker(needApplyResource);
- resourceManager.applyResources(jobId, needApplyResource).whenComplete((s, e) -> {
+ resourceManager.applyResources(jobId, needApplyResource).whenComplete(withTryCatch(LOGGER, (s, e) -> {
if (e != null) {
completeRequestWithException(e);
return;
@@ -177,7 +179,7 @@ public class ResourceRequestHandler {
for (int i = 0; i < s.size(); i++) {
addSlotToCacheMap(needApplyIndex.get(i), s.get(i));
}
- });
+ }));
}
private void releaseAllResourceInternal() {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
index b2791ddf0..fc9a06d30 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/DefaultSlotService.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.engine.server.service.slot;
-import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.TaskExecutionService;
@@ -68,7 +67,8 @@ public class DefaultSlotService implements SlotService {
private final TaskExecutionService taskExecutionService;
private ConcurrentMap<Integer, SlotContext> contexts;
- public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot, int slotNumber) {
+ public DefaultSlotService(NodeEngineImpl nodeEngine, TaskExecutionService taskExecutionService, boolean dynamicSlot,
+ int slotNumber) {
this.nodeEngine = nodeEngine;
this.dynamicSlot = dynamicSlot;
this.taskExecutionService = taskExecutionService;
@@ -92,14 +92,13 @@ public class DefaultSlotService implements SlotService {
unassignedResource.set(getNodeResource());
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
- RetryUtils.retryWithException(() -> {
- LOGGER.fine("start send heartbeat to resource manager, this address: " + nodeEngine.getClusterService().getThisAddress());
- sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join();
- return null;
- }, new RetryUtils.RetryMaterial(HEARTBEAT_RETRY_TIME, true, e -> true, DEFAULT_HEARTBEAT_TIMEOUT));
+ LOGGER.fine("start send heartbeat to resource manager, this address: " +
+ nodeEngine.getClusterService().getThisAddress());
+ sendToMaster(new WorkerHeartbeatOperation(toWorkerProfile())).join();
} catch (Exception e) {
- LOGGER.severe(e);
- LOGGER.severe("failed send heartbeat to resource manager, will retry later. this address: " + nodeEngine.getClusterService().getThisAddress());
+ LOGGER.warning(e);
+ LOGGER.warning("failed send heartbeat to resource manager, will retry later. this address: " +
+ nodeEngine.getClusterService().getThisAddress());
}
}, 0, DEFAULT_HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
}
@@ -127,7 +126,8 @@ public class DefaultSlotService implements SlotService {
unassignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
unassignedSlots.remove(profile.getSlotID());
assignedSlots.put(profile.getSlotID(), profile);
- contexts.computeIfAbsent(profile.getSlotID(), p -> new SlotContext(profile.getSlotID(), taskExecutionService));
+ contexts.computeIfAbsent(profile.getSlotID(),
+ p -> new SlotContext(profile.getSlotID(), taskExecutionService));
}
return new SlotAndWorkerProfile(toWorkerProfile(), profile);
}
@@ -148,7 +148,7 @@ public class DefaultSlotService implements SlotService {
if (assignedSlots.get(profile.getSlotID()).getOwnerJobID() != jobId) {
throw new WrongTargetSlotException(String.format("The profile %s not belong with job %d",
- assignedSlots.get(profile.getSlotID()), jobId));
+ assignedSlots.get(profile.getSlotID()), jobId));
}
assignedResource.accumulateAndGet(profile.getResourceProfile(), ResourceProfile::subtract);
@@ -163,7 +163,9 @@ public class DefaultSlotService implements SlotService {
@Override
public void close() {
- scheduledExecutorService.shutdown();
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ }
}
private SlotProfile selectBestMatchSlot(ResourceProfile profile) {
@@ -176,14 +178,17 @@ public class DefaultSlotService implements SlotService {
}
} else {
Optional<SlotProfile> result = unassignedSlots.values().stream()
- .filter(slot -> slot.getResourceProfile().enoughThan(profile))
- .min((slot1, slot2) -> {
- if (slot1.getResourceProfile().getHeapMemory().getBytes() != slot2.getResourceProfile().getHeapMemory().getBytes()) {
- return slot1.getResourceProfile().getHeapMemory().getBytes() - slot2.getResourceProfile().getHeapMemory().getBytes() >= 0 ? 1 : -1;
- } else {
- return slot1.getResourceProfile().getCpu().getCore() - slot2.getResourceProfile().getCpu().getCore();
- }
- });
+ .filter(slot -> slot.getResourceProfile().enoughThan(profile))
+ .min((slot1, slot2) -> {
+ if (slot1.getResourceProfile().getHeapMemory().getBytes() !=
+ slot2.getResourceProfile().getHeapMemory().getBytes()) {
+ return slot1.getResourceProfile().getHeapMemory().getBytes() -
+ slot2.getResourceProfile().getHeapMemory().getBytes() >= 0 ? 1 : -1;
+ } else {
+ return slot1.getResourceProfile().getCpu().getCore() -
+ slot2.getResourceProfile().getCpu().getCore();
+ }
+ });
return result.orElse(null);
}
return null;
@@ -193,7 +198,7 @@ public class DefaultSlotService implements SlotService {
long maxMemory = Runtime.getRuntime().maxMemory();
for (int i = 0; i < slotNumber; i++) {
unassignedSlots.put(i, new SlotProfile(nodeEngine.getThisAddress(), i,
- new ResourceProfile(CPU.of(0), Memory.of(maxMemory / slotNumber))));
+ new ResourceProfile(CPU.of(0), Memory.of(maxMemory / slotNumber))));
}
}
@@ -211,7 +216,8 @@ public class DefaultSlotService implements SlotService {
}
public <E> InvocationFuture<E> sendToMaster(Operation operation) {
- InvocationBuilder invocationBuilder = nodeEngine.getOperationService().createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
+ InvocationBuilder invocationBuilder = nodeEngine.getOperationService()
+ .createInvocationBuilder(SeaTunnelServer.SERVICE_NAME, operation, nodeEngine.getMasterAddress());
return invocationBuilder.invoke();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 403bbd128..af945ee23 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -17,13 +17,17 @@
package org.apache.seatunnel.engine.server;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
+@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractSeaTunnelServerTest {
@@ -36,7 +40,7 @@ public abstract class AbstractSeaTunnelServerTest {
protected static ILogger LOGGER;
@BeforeAll
- public void before() {
+ public void before() {
instance = SeaTunnelServerStarter.createHazelcastInstance(
TestUtils.getClusterName("AbstractSeaTunnelServerTest_" + System.currentTimeMillis()));
nodeEngine = instance.node.nodeEngine;
@@ -46,7 +50,16 @@ public abstract class AbstractSeaTunnelServerTest {
@AfterAll
public void after() {
- server.shutdown(true);
- instance.shutdown();
+ try {
+ if (server != null) {
+ server.shutdown(true);
+ }
+
+ if (instance != null) {
+ instance.shutdown();
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getMessage(e));
+ }
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 1397206fe..b84225793 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -47,6 +47,7 @@ public class CoordinatorServiceTest {
SeaTunnelServer server1 = instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
SeaTunnelServer server2 = instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+ Assertions.assertTrue(server1.isMasterNode());
CoordinatorService coordinatorService1 = server1.getCoordinatorService();
Assertions.assertTrue(coordinatorService1.isCoordinatorActive());
@@ -62,14 +63,13 @@ public class CoordinatorServiceTest {
await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
try {
+ Assertions.assertTrue(server2.isMasterNode());
CoordinatorService coordinatorService = server2.getCoordinatorService();
- Assertions.assertTrue(coordinatorService.isMasterNode());
+ Assertions.assertTrue(coordinatorService.isCoordinatorActive());
} catch (SeaTunnelEngineException e) {
Assertions.assertTrue(false);
}
});
- CoordinatorService coordinatorService2 = server2.getCoordinatorService();
- Assertions.assertTrue(coordinatorService2.isCoordinatorActive());
instance2.shutdown();
}
@@ -107,10 +107,65 @@ public class CoordinatorServiceTest {
// because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished.
Assertions.assertTrue(JobStatus.FINISHED.equals(coordinatorService.getJobStatus(jobId)));
+ coordinatorServiceTest.shutdown();
}
@Test
- public void testJobRestoreWhenMasterNodeSwitch() {
- // TODO wait CheckpointManager support restore from master node switch.
+ public void testJobRestoreWhenMasterNodeSwitch() throws InterruptedException {
+ HazelcastInstanceImpl instance1 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("CoordinatorServiceTest_testJobRestoreWhenMasterNodeSwitch"));
+ HazelcastInstanceImpl instance2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("CoordinatorServiceTest_testJobRestoreWhenMasterNodeSwitch"));
+
+ SeaTunnelServer server1 = instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+ SeaTunnelServer server2 = instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+
+ CoordinatorService coordinatorService = server1.getCoordinatorService();
+ Assertions.assertTrue(coordinatorService.isCoordinatorActive());
+
+ Long jobId = instance1.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
+ LogicalDag testLogicalDag =
+ TestUtils.createTestLogicalPlan("stream_fakesource_to_file.conf", "testJobRestoreWhenMasterNodeSwitch",
+ jobId);
+
+ JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobId,
+ instance1.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
+ Collections.emptyList());
+
+ Data data = instance1.getSerializationService().toData(jobImmutableInformation);
+
+ coordinatorService.submitJob(jobId, data).join();
+
+ // waiting for job status turn to running
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(JobStatus.RUNNING, coordinatorService.getJobStatus(jobId)));
+
+ // test master node shutdown
+ instance1.shutdown();
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ try {
+ Assertions.assertTrue(server2.isMasterNode());
+ Assertions.assertTrue(server2.getCoordinatorService().isCoordinatorActive());
+ } catch (SeaTunnelEngineException e) {
+ Assertions.assertTrue(false);
+ }
+ });
+
+ // wait job restore
+ Thread.sleep(5000);
+
+ // job will recovery running state
+ await().atMost(200000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> Assertions.assertEquals(JobStatus.RUNNING, server2.getCoordinatorService().getJobStatus(jobId)));
+
+ server2.getCoordinatorService().cancelJob(jobId);
+
+ // because runningJobMasterMap is empty and we have no JobHistoryServer, so return finished.
+ await().atMost(200000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> Assertions.assertEquals(JobStatus.FINISHED, server2.getCoordinatorService().getJobStatus(jobId)));
+ instance2.shutdown();
}
}