You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/29 05:51:32 UTC
[incubator-seatunnel] branch st-engine updated: [Feature][ST-Engine] Add Job Cancel Feature (#2527)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 44bdc0c28 [Feature][ST-Engine] Add Job Cancel Feature (#2527)
44bdc0c28 is described below
commit 44bdc0c28444bcc251db31f5047463098018a2f4
Author: Eric <ga...@gmail.com>
AuthorDate: Mon Aug 29 13:51:26 2022 +0800
[Feature][ST-Engine] Add Job Cancel Feature (#2527)
* complete job cancel feature
* fix task state error
* fix license header
* fix NullpointException in operator
* update ci
* update ci
* fix review problems
* fix cancel error
---
.github/workflows/backend.yml | 6 +-
.github/workflows/engine_backend.yml | 2 +
generate_client_protocol.sh | 2 +-
.../core/starter/seatunnel/SeaTunnelStarter.java | 9 +-
seatunnel-e2e/seatunnel-engine-e2e/pom.xml | 4 +
.../org/apache/seatunnel/engine/e2e/TestUtils.java | 4 +
.../engine/e2e/engine/JobExecutionIT.java | 58 ++++++--
..._to_file.conf => batch_fakesource_to_file.conf} | 8 +-
....conf => batch_fakesource_to_file_complex.conf} | 8 +-
.../streaming_fakesource_to_file_complex.conf | 8 +-
seatunnel-engine/seatunnel-engine-client/pom.xml | 5 +
.../job/{JobProxy.java => ClientJobProxy.java} | 76 +++++++---
.../client/{ => job}/ConnectorInstanceLoader.java | 20 +--
.../seatunnel/engine/client/job/JobClient.java | 4 +-
.../engine/client/job/JobConfigParser.java | 61 ++++----
.../engine/client/job/JobExecutionEnvironment.java | 12 +-
.../engine/client/JobConfigParserTest.java | 4 +-
.../engine/client/LogicalDagGeneratorTest.java | 4 +-
.../engine/client/SeaTunnelClientTest.java | 18 ++-
.../apache/seatunnel/engine/client/TestUtils.java | 4 +
..._to_file.conf => batch_fakesource_to_file.conf} | 0
....conf => batch_fakesource_to_file_complex.conf} | 0
.../apache/seatunnel/engine/common/Constant.java | 4 +
.../seatunnel/engine/common/config/JobConfig.java | 6 +-
.../org/apache/seatunnel/engine/core/job/Job.java | 11 +-
.../protocol/codec/SeaTunnelCancelJobCodec.java | 82 +++++++++++
.../protocol/codec/SeaTunnelGetJobStatusCodec.java | 91 ++++++++++++
.../SeaTunnelEngine.yaml | 36 +++++
.../seatunnel/engine/server/SeaTunnelServer.java | 23 +++
.../engine/server/TaskExecutionService.java | 12 +-
.../engine/server/dag/physical/PhysicalPlan.java | 77 ++++++++--
.../engine/server/dag/physical/PhysicalVertex.java | 134 +++++++++++++++---
.../engine/server/dag/physical/SubPlan.java | 88 +++++++++---
.../seatunnel/engine/server/master/JobMaster.java | 10 +-
...yTaskOperation.java => CancelJobOperation.java} | 37 ++---
...skOperation.java => GetJobStatusOperation.java} | 41 ++++--
.../engine/server/protocol/task/CancelJobTask.java | 49 +++++++
.../server/protocol/task/GetJobStatusTask.java | 50 +++++++
.../task/SeaTunnelMessageTaskFactoryProvider.java | 6 +
.../server/scheduler/PipelineBaseScheduler.java | 156 ++++++++++++++-------
.../serializable/OperationDataSerializerHook.java | 11 +-
.../serializable/TaskDataSerializerHook.java | 10 ++
...uestOperation.java => CancelTaskOperation.java} | 47 +++----
.../{ => task}/operation/DeployTaskOperation.java | 7 +-
.../operation/source/AssignSplitOperation.java | 13 +-
.../operation/source/CloseRequestOperation.java | 12 +-
.../operation/source/RequestSplitOperation.java | 13 +-
.../source/SourceNoMoreElementOperation.java | 12 +-
.../seatunnel/engine/server/dag/TaskTest.java | 2 -
49 files changed, 1052 insertions(+), 305 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 0bc67c0a0..26e9b43eb 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -26,6 +26,8 @@ on:
- 'seatunnel-engine/**'
- 'seatunnel-engine/seatunnel-engine-e2e/**'
- 'seatunnel-examples/seatunnel-engine-examples/**'
+ - 'seatunnel-core/seatunnel-seatunnel-starter/**'
+ - 'generate_client_protocol.sh'
concurrency:
group: backend-${{ github.event.pull_request.number || github.ref }}
@@ -146,7 +148,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest', 'windows-latest' ]
- timeout-minutes: 50
+ timeout-minutes: 80
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
@@ -169,7 +171,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 50
+ timeout-minutes: 80
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git a/.github/workflows/engine_backend.yml b/.github/workflows/engine_backend.yml
index 5dcca74ad..a57f6d9ac 100644
--- a/.github/workflows/engine_backend.yml
+++ b/.github/workflows/engine_backend.yml
@@ -23,6 +23,8 @@ on:
- 'seatunnel-engine/**'
- 'seatunnel-engine/seatunnel-engine-e2e/**'
- 'seatunnel-examples/seatunnel-engine-examples/**'
+ - 'seatunnel-core/seatunnel-seatunnel-starter/**'
+ - 'generate_client_protocol.sh'
concurrency:
group: backend-${{ github.event.pull_request.number || github.ref }}
diff --git a/generate_client_protocol.sh b/generate_client_protocol.sh
index 2b26ecda3..7ad1d5b16 100755
--- a/generate_client_protocol.sh
+++ b/generate_client_protocol.sh
@@ -52,7 +52,7 @@ cd $PROTOCOL_DIRECTORY
$PIP3 install -r requirements.txt
$PYTHON generator.py -r $SEATUNNEL_ENGINE_HOME -p $SEATUNNEL_ENGINE_HOME/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition \
--o seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec \
+-o /tmp/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec \
-n org.apache.seatunnel.engine.core.protocol.codec --no-binary --no-id-check
rm -rf $PROTOCOL_DIRECTORY
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
index 90b12b48e..94335cca9 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
+++ b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
@@ -22,8 +22,8 @@ import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.job.JobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -39,16 +39,15 @@ public class SeaTunnelStarter {
Common.setDeployMode(DeployMode.CLIENT);
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
- // TODO change jobConfig mode
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
- JobProxy jobProxy;
+ ClientJobProxy clientJobProxy;
try {
- jobProxy = jobExecutionEnv.execute();
- jobProxy.waitForJobComplete();
+ clientJobProxy = jobExecutionEnv.execute();
+ clientJobProxy.waitForJobComplete();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
index 292665a77..ea8a31f3a 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
@@ -66,5 +66,9 @@
<artifactId>seatunnel-engine-server</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index 26e4defc9..786a34ce6 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -30,6 +30,10 @@ public class TestUtils {
return System.getProperty("user.dir") + "/src/test/resources" + confFile;
}
+ public static String getClusterName(String testClassName) {
+ return System.getProperty("user.name") + "_" + testClassName;
+ }
+
public static void initPluginDir() {
// copy connectors to project_root/connectors dir
System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
index fa6679e3d..8b286c482 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
@@ -17,11 +17,14 @@
package org.apache.seatunnel.engine.e2e.engine;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.job.JobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
@@ -39,7 +42,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
public class JobExecutionIT {
private static final Logger LOGGER = LoggerFactory.getLogger(JobExecutionIT.class);
@@ -47,6 +51,7 @@ public class JobExecutionIT {
@BeforeClass
public static void beforeClass() throws Exception {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("JobExecutionIT"));
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
@@ -55,6 +60,7 @@ public class JobExecutionIT {
@Test
public void testSayHello() {
SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
+ seaTunnelClientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
@@ -67,20 +73,56 @@ public class JobExecutionIT {
public void testExecuteJob() {
TestUtils.initPluginDir();
Common.setDeployMode(DeployMode.CLIENT);
- String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ String filePath = TestUtils.getResource("/batch_fakesource_to_file.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("fake_to_file");
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
+
+ try {
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(JobStatus.FINISHED, clientJobProxy.waitForJobComplete()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void cancelJobTest() {
+ TestUtils.initPluginDir();
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("/streaming_fakesource_to_file_complex.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
- JobProxy jobProxy = null;
try {
- jobProxy = jobExecutionEnv.execute();
- JobStatus jobStatus = jobProxy.waitForJobComplete();
- Assert.assertEquals(JobStatus.FINISHED, jobStatus);
- } catch (ExecutionException | InterruptedException e) {
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ JobStatus jobStatus1 = clientJobProxy.getJobStatus();
+ Assert.assertFalse(jobStatus1.isEndState());
+ ClientJobProxy finalClientJobProxy = clientJobProxy;
+ CompletableFuture<Object> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ JobStatus jobStatus = finalClientJobProxy.waitForJobComplete();
+ Assert.assertEquals(JobStatus.CANCELED, jobStatus);
+ return null;
+ });
+ Thread.sleep(1000);
+ clientJobProxy.cancelJob();
+
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Assert.assertTrue(objectCompletableFuture.isDone());
+ });
+
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
similarity index 72%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
index c6f0f7bbf..1e49074ba 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
@@ -21,7 +21,7 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "STREAMING"
+ job.mode = "BATCH"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -34,13 +34,9 @@ source {
parallelism = 3
}
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}
transform {
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
sink {
@@ -60,6 +56,4 @@ sink {
}
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf
similarity index 74%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf
index dc73c2f09..d946739b6 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -21,7 +21,7 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "STREAMING"
+ job.mode = "BATCH"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -40,13 +40,9 @@ source {
parallelism = 3
}
- # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}
transform {
- # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
sink {
@@ -66,6 +62,4 @@ sink {
source_table_name="fake"
}
- # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
- # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf
similarity index 96%
copy from seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
copy to seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf
index 258c32352..0a52e7a0d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -21,7 +21,7 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "BATCH"
+ job.mode = "STREAMING"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
@@ -31,14 +31,15 @@ source {
FakeSource {
result_table_name = "fake"
field_name = "name,age",
- parallelism = 3
+ parallelism = 1
}
FakeSource {
result_table_name = "fake"
field_name = "name,age",
- parallelism = 3
+ parallelism = 1
}
+
}
transform {
@@ -60,4 +61,5 @@ sink {
save_mode="error",
source_table_name="fake"
}
+
}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
index 3bfecac59..91297bf8c 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -74,5 +74,10 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
similarity index 59%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 0ac8ac789..f25ee6998 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -18,11 +18,15 @@
package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
@@ -31,17 +35,16 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
-import java.util.concurrent.ExecutionException;
-
-public class JobProxy implements Job {
- private static final ILogger LOGGER = Logger.getLogger(JobProxy.class);
+public class ClientJobProxy implements Job {
+ private static final ILogger LOGGER = Logger.getLogger(ClientJobProxy.class);
private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
private JobImmutableInformation jobImmutableInformation;
- public JobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
- @NonNull JobImmutableInformation jobImmutableInformation) {
+ public ClientJobProxy(@NonNull SeaTunnelHazelcastClient seaTunnelHazelcastClient,
+ @NonNull JobImmutableInformation jobImmutableInformation) {
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobImmutableInformation = jobImmutableInformation;
+ submitJob();
}
@Override
@@ -49,37 +52,66 @@ public class JobProxy implements Job {
return jobImmutableInformation.getJobId();
}
- @Override
- public void submitJob() throws ExecutionException, InterruptedException {
+ private void submitJob() {
ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(jobImmutableInformation.getJobId(),
seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
PassiveCompletableFuture<Void> submitJobFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
- submitJobFuture.get();
+ submitJobFuture.join();
}
- @Override
+ /**
+ * This method will block even the Job turn to a EndState
+ *
+ * @return
+ */
public JobStatus waitForJobComplete() {
JobStatus jobStatus = null;
- PassiveCompletableFuture<JobStatus> jobFuture =
- seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
- SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
- response -> {
- return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
- });
try {
- jobStatus = jobFuture.get();
- LOGGER.info(String.format("Job %s (%s) end with state %s",
- jobImmutableInformation.getJobId(),
- jobImmutableInformation.getJobConfig().getName(),
- jobStatus));
- } catch (InterruptedException | ExecutionException e) {
+ jobStatus = RetryUtils.retryWithException(() -> {
+ PassiveCompletableFuture<JobStatus> jobFuture = doWaitForJobComplete();
+ return jobFuture.get();
+ }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+ exception -> exception instanceof RuntimeException, Constant.OPERATION_RETRY_SLEEP));
+ } catch (Exception e) {
LOGGER.info(String.format("Job %s (%s) end with unknown state, and throw Exception: %s",
jobImmutableInformation.getJobId(),
jobImmutableInformation.getJobConfig().getName(),
ExceptionUtils.getMessage(e)));
throw new RuntimeException(e);
}
+ LOGGER.info(String.format("Job %s (%s) end with state %s",
+ jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId(),
+ jobStatus));
return jobStatus;
}
+
+ @Override
+ public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
+ PassiveCompletableFuture<JobStatus> jobFuture =
+ seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+ SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
+ response -> {
+ return JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
+ });
+ return jobFuture;
+ }
+
+ @Override
+ public void cancelJob() {
+ PassiveCompletableFuture<Void> cancelFuture = seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+ SeaTunnelCancelJobCodec.encodeRequest(jobImmutableInformation.getJobId()));
+
+ cancelFuture.join();
+ }
+
+ @Override
+ public JobStatus getJobStatus() {
+ int jobStatusOrdinal = seaTunnelHazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelGetJobStatusCodec.encodeRequest(jobImmutableInformation.getJobId()),
+ response -> SeaTunnelGetJobStatusCodec.decodeResponse(response));
+ return JobStatus.values()[jobStatusOrdinal];
+ }
+
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
similarity index 84%
rename from seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
rename to seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
index d4e8f703d..c62ca2871 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/ConnectorInstanceLoader.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorInstanceLoader.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client;
+package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -46,7 +46,8 @@ public class ConnectorInstanceLoader {
throw new IllegalStateException("Utility class");
}
- public static ImmutablePair<SeaTunnelSource, Set<URL>> loadSourceInstance(Config sourceConfig) {
+ public static ImmutablePair<SeaTunnelSource, Set<URL>> loadSourceInstance(Config sourceConfig,
+ SeaTunnelContext seaTunnelContext) {
SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -57,8 +58,8 @@ public class ConnectorInstanceLoader {
SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
- if (SeaTunnelContext.getContext().getJobMode() == JobMode.BATCH
+ seaTunnelSource.setSeaTunnelContext(seaTunnelContext);
+ if (seaTunnelContext.getJobMode() == JobMode.BATCH
&& seaTunnelSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
throw new UnsupportedOperationException(
String.format("'%s' source don't support off-line job.", seaTunnelSource.getPluginName()));
@@ -67,7 +68,7 @@ public class ConnectorInstanceLoader {
}
public static ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>> loadSinkInstance(
- Config sinkConfig) {
+ Config sinkConfig, SeaTunnelContext seaTunnelContext) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -78,11 +79,12 @@ public class ConnectorInstanceLoader {
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setTypeInfo(null);
- seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
+ seaTunnelSink.setSeaTunnelContext(seaTunnelContext);
return new ImmutablePair<>(seaTunnelSink, new HashSet<>(pluginJarPaths));
}
- public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>> loadTransformInstance(Config transformConfig) {
+ public static ImmutablePair<SeaTunnelTransform<?>, Set<URL>> loadTransformInstance(Config transformConfig,
+ SeaTunnelContext seaTunnelContext) {
SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery();
PluginIdentifier pluginIdentifier = PluginIdentifier.of(
CollectionConstants.SEATUNNEL_PLUGIN,
@@ -91,7 +93,9 @@ public class ConnectorInstanceLoader {
List<URL> pluginJarPaths = transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier));
SeaTunnelTransform<?> seaTunnelTransform =
- transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+ transformPluginDiscovery.createPluginInstance(pluginIdentifier);
+ seaTunnelTransform.prepare(transformConfig);
+ seaTunnelTransform.setSeaTunnelContext(seaTunnelContext);
return new ImmutablePair<>(seaTunnelTransform, new HashSet<>(pluginJarPaths));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 964a7891e..bbcfea8ad 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -34,7 +34,7 @@ public class JobClient {
return hazelcastClient.getHazelcastInstance().getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME).newId();
}
- public JobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
- return new JobProxy(hazelcastClient, jobImmutableInformation);
+ public ClientJobProxy createJobProxy(@NonNull JobImmutableInformation jobImmutableInformation) {
+ return new ClientJobProxy(hazelcastClient, jobImmutableInformation);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 31cfa9a16..964bc5593 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.client.job;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -26,8 +27,8 @@ import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.engine.client.ConnectorInstanceLoader;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -77,8 +78,8 @@ public class JobConfigParser {
private JobConfig jobConfig;
public JobConfigParser(@NonNull String jobDefineFilePath,
- @NonNull IdGenerator idGenerator,
- @NonNull JobConfig jobConfig) {
+ @NonNull IdGenerator idGenerator,
+ @NonNull JobConfig jobConfig) {
this.jobDefineFilePath = jobDefineFilePath;
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
@@ -107,8 +108,14 @@ public class JobConfigParser {
return new ImmutablePair<>(actions, jarUrlsSet);
}
- private void jobConfigAnalyze(Config envConfigs) {
- // TODO Resolve env configuration and set jobConfig
+ private void jobConfigAnalyze(@NonNull Config envConfigs) {
+ SeaTunnelContext context = SeaTunnelContext.getContext();
+ if (envConfigs.hasPath("job.mode")) {
+ context.setJobMode(envConfigs.getEnum(JobMode.class, "job.mode"));
+ } else {
+ context.setJobMode(JobMode.BATCH);
+ }
+ jobConfig.setSeaTunnelContext(context);
}
/**
@@ -126,7 +133,7 @@ public class JobConfigParser {
for (Config config : sinkConfigs) {
ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>>
sinkListImmutablePair =
- ConnectorInstanceLoader.loadSinkInstance(config);
+ ConnectorInstanceLoader.loadSinkInstance(config, jobConfig.getSeaTunnelContext());
SinkAction sinkAction =
createSinkAction(idGenerator.getNextId(), sinkListImmutablePair.getLeft().getPluginName(),
@@ -157,7 +164,7 @@ public class JobConfigParser {
List<Config> sourceConfigList = sourceResultTableNameMap.get(sourceTableName);
if (CollectionUtils.isEmpty(sourceConfigList)) {
throw new JobDefineCheckException(action.getName()
- + " source table name [" + sourceTableName + "] can not be found");
+ + " source table name [" + sourceTableName + "] can not be found");
}
// If a transform have more than one upstream action, the parallelism of this transform is the sum of the parallelism
@@ -166,13 +173,13 @@ public class JobConfigParser {
AtomicInteger totalParallelism = new AtomicInteger();
for (Config sourceConfig : sourceConfigList) {
ImmutablePair<SeaTunnelSource, Set<URL>> seaTunnelSourceListImmutablePair =
- ConnectorInstanceLoader.loadSourceInstance(sourceConfig);
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfig, jobConfig.getSeaTunnelContext());
dataType = seaTunnelSourceListImmutablePair.getLeft().getProducedType();
SourceAction sourceAction = createSourceAction(
- idGenerator.getNextId(),
- sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
- seaTunnelSourceListImmutablePair.getLeft(),
- seaTunnelSourceListImmutablePair.getRight());
+ idGenerator.getNextId(),
+ sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+ seaTunnelSourceListImmutablePair.getLeft(),
+ seaTunnelSourceListImmutablePair.getRight());
int sourceParallelism = getSourceParallelism(sourceConfig);
sourceAction.setParallelism(sourceParallelism);
@@ -193,16 +200,16 @@ public class JobConfigParser {
SeaTunnelDataType<?> dataTypeResult = null;
for (Config config : transformConfigList) {
ImmutablePair<SeaTunnelTransform<?>, Set<URL>> transformListImmutablePair =
- ConnectorInstanceLoader.loadTransformInstance(config);
+ ConnectorInstanceLoader.loadTransformInstance(config, jobConfig.getSeaTunnelContext());
TransformAction transformAction = createTransformAction(
- idGenerator.getNextId(),
- transformListImmutablePair.getLeft().getPluginName(),
- transformListImmutablePair.getLeft(),
- transformListImmutablePair.getRight());
+ idGenerator.getNextId(),
+ transformListImmutablePair.getLeft().getPluginName(),
+ transformListImmutablePair.getLeft(),
+ transformListImmutablePair.getRight());
action.addUpstream(transformAction);
SeaTunnelDataType dataType = transformAnalyze(config.getString(Plugin.SOURCE_TABLE_NAME),
- transformAction);
+ transformAction);
transformListImmutablePair.getLeft().setTypeInfo(dataType);
dataTypeResult = transformListImmutablePair.getLeft().getProducedType();
totalParallelism.set(totalParallelism.get() + transformAction.getParallelism());
@@ -257,32 +264,32 @@ public class JobConfigParser {
List<? extends Config> transformConfigs,
List<? extends Config> sinkConfigs) {
ImmutablePair<SeaTunnelSource, Set<URL>> pair =
- ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0));
+ ConnectorInstanceLoader.loadSourceInstance(sourceConfigs.get(0), jobConfig.getSeaTunnelContext());
SourceAction sourceAction =
createSourceAction(idGenerator.getNextId(), pair.getLeft().getPluginName(), pair.getLeft(),
pair.getRight());
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
SeaTunnelDataType dataType = sourceAction.getSource().getProducedType();
ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>, Set<URL>>
- sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0));
+ sinkListImmutablePair = ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0), jobConfig.getSeaTunnelContext());
Action sinkUpstreamAction = sourceAction;
if (!CollectionUtils.isEmpty(transformConfigs)) {
ImmutablePair<SeaTunnelTransform<?>, Set<URL>> transformListImmutablePair =
- ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0));
+ ConnectorInstanceLoader.loadTransformInstance(transformConfigs.get(0), jobConfig.getSeaTunnelContext());
transformListImmutablePair.getLeft().setTypeInfo(dataType);
dataType = transformListImmutablePair.getLeft().getProducedType();
TransformAction transformAction = createTransformAction(
- idGenerator.getNextId(),
- transformListImmutablePair.getLeft().getPluginName(),
- Lists.newArrayList(sourceAction),
- transformListImmutablePair.getLeft(),
- transformListImmutablePair.getRight());
+ idGenerator.getNextId(),
+ transformListImmutablePair.getLeft().getPluginName(),
+ Lists.newArrayList(sourceAction),
+ transformListImmutablePair.getLeft(),
+ transformListImmutablePair.getRight());
initTransformParallelism(transformConfigs, sourceAction, transformListImmutablePair.getLeft(),
- transformAction);
+ transformAction);
sinkUpstreamAction = transformAction;
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 5c83874e7..7176867a2 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.engine.client.job;
-import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -76,22 +75,15 @@ public class JobExecutionEnvironment {
return actions;
}
- public JobProxy execute() throws ExecutionException, InterruptedException {
+ public ClientJobProxy execute() throws ExecutionException, InterruptedException {
JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
- initSeaTunnelContext();
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(
jobClient.getNewJobId(),
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
jobConfig,
jarUrls);
- JobProxy jobProxy = jobClient.createJobProxy(jobImmutableInformation);
- jobProxy.submitJob();
- return jobProxy;
- }
-
- private void initSeaTunnelContext() {
- SeaTunnelContext.getContext().setJobMode(jobConfig.getMode());
+ return jobClient.createJobProxy(jobImmutableInformation);
}
public LogicalDag getLogicalDag() {
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index 919b492d5..39231a6df 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -41,7 +41,7 @@ public class JobConfigParserTest {
@Test
public void testSimpleJobParse() {
Common.setDeployMode(DeployMode.CLIENT);
- String filePath = TestUtils.getResource("/fakesource_to_file.conf");
+ String filePath = TestUtils.getResource("/batch_fakesource_to_file.conf");
JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator(), new JobConfig());
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
@@ -59,7 +59,7 @@ public class JobConfigParserTest {
@Test
public void testComplexJobParse() {
Common.setDeployMode(DeployMode.CLIENT);
- String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ String filePath = TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
JobConfigParser jobConfigParser = new JobConfigParser(filePath, new IdGenerator(), new JobConfig());
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 1984bbef5..1c0da5503 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.engine.client.job.JobConfigParser;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -43,9 +42,8 @@ public class LogicalDagGeneratorTest {
@Test
public void testLogicalGenerator() {
Common.setDeployMode(DeployMode.CLIENT);
- String filePath = TestUtils.getResource("/fakesource_to_file_complex.conf");
+ String filePath = TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
JobConfig jobConfig = new JobConfig();
- jobConfig.setMode(JobMode.BATCH);
jobConfig.setName("fake_to_file");
IdGenerator idGenerator = new IdGenerator();
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 5ba9787db..a4745687c 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -17,11 +17,13 @@
package org.apache.seatunnel.engine.client;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.client.job.JobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
@@ -39,6 +41,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
@SuppressWarnings("checkstyle:MagicNumber")
@RunWith(JUnit4.class)
@@ -49,6 +52,7 @@ public class SeaTunnelClientTest {
@Before
public void beforeClass() throws Exception {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
instance = HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
@@ -57,6 +61,7 @@ public class SeaTunnelClientTest {
@Test
public void testSayHello() {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
String msg = "Hello world";
@@ -69,18 +74,17 @@ public class SeaTunnelClientTest {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
- jobConfig.setMode(JobMode.BATCH);
jobConfig.setName("fake_to_file");
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
- JobProxy jobProxy = null;
try {
- jobProxy = jobExecutionEnv.execute();
- JobStatus jobStatus = jobProxy.waitForJobComplete();
- Assert.assertEquals(JobStatus.FINISHED, jobStatus);
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(JobStatus.FINISHED, clientJobProxy.waitForJobComplete()));
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
index c392fdaf5..4a16a6c6a 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/TestUtils.java
@@ -21,4 +21,8 @@ public class TestUtils {
public static String getResource(String confFile) {
return System.getProperty("user.dir") + "/src/test/resources" + confFile;
}
+
+ public static String getClusterName(String testClassName) {
+ return System.getProperty("user.name") + "_" + testClassName;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
similarity index 100%
rename from seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file.conf
rename to seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
similarity index 100%
rename from seatunnel-engine/seatunnel-engine-client/src/test/resources/fakesource_to_file_complex.conf
rename to seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index 9c016d91f..6aab140cd 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -35,4 +35,8 @@ public class Constant {
public static final String HAZELCAST_SEATUNNEL_CONF_FILE_PREFIX = "seatunnel";
public static final String HAZELCAST_SEATUNNEL_DEFAULT_YAML = "seatunnel-default.yaml";
+
+ public static final int OPERATION_RETRY_TIME = 5;
+
+ public static final int OPERATION_RETRY_SLEEP = 2000;
}
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
index 6bb41c469..2b31a1115 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.engine.common.config;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.engine.common.serializeable.ConfigDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
@@ -30,7 +30,7 @@ import java.io.IOException;
@Data
public class JobConfig implements IdentifiedDataSerializable {
private String name;
- private JobMode mode;
+ private SeaTunnelContext seaTunnelContext;
@Override
public int getFactoryId() {
@@ -45,10 +45,12 @@ public class JobConfig implements IdentifiedDataSerializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeString(name);
+ out.writeObject(seaTunnelContext);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
this.name = in.readString();
+ this.seaTunnelContext = in.readObject();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index fa1065518..517d5b1f2 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -17,12 +17,19 @@
package org.apache.seatunnel.engine.core.job;
-import java.util.concurrent.ExecutionException;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+/**
+ * Job interface define the Running job apis
+ */
public interface Job {
long getJobId();
- void submitJob() throws ExecutionException, InterruptedException;
+ PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
+
+ void cancelJob();
+
+ JobStatus getJobStatus();
JobStatus waitForJobComplete();
}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
new file mode 100644
index 000000000..0c48d17f6
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelCancelJobCodec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.ForwardFrameIterator;
+import static com.hazelcast.client.impl.protocol.ClientMessage.Frame;
+import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+
+@Generated("b8660c8a07cf0fd33e4191f33a26b46e")
+public final class SeaTunnelCancelJobCodec {
+ //hex: 0xDE0400
+ public static final int REQUEST_MESSAGE_TYPE = 14550016;
+ //hex: 0xDE0401
+ public static final int RESPONSE_MESSAGE_TYPE = 14550017;
+ private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+ private SeaTunnelCancelJobCodec() {
+ }
+
+ public static ClientMessage encodeRequest(long jobId) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ clientMessage.setRetryable(true);
+ clientMessage.setOperationName("SeaTunnel.CancelJob");
+ Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+ clientMessage.add(initialFrame);
+ return clientMessage;
+ }
+
+ public static long decodeRequest(ClientMessage clientMessage) {
+ ForwardFrameIterator iterator = clientMessage.frameIterator();
+ Frame initialFrame = iterator.next();
+ return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+ }
+
+ public static ClientMessage encodeResponse() {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+ clientMessage.add(initialFrame);
+
+ return clientMessage;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStatusCodec.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStatusCodec.java
new file mode 100644
index 000000000..f6002a6c7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetJobStatusCodec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import static com.hazelcast.client.impl.protocol.ClientMessage.ForwardFrameIterator;
+import static com.hazelcast.client.impl.protocol.ClientMessage.Frame;
+import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+
+@Generated("069a370867d61e85d3d51ea5453d880a")
+public final class SeaTunnelGetJobStatusCodec {
+ //hex: 0xDE0500
+ public static final int REQUEST_MESSAGE_TYPE = 14550272;
+ //hex: 0xDE0501
+ public static final int RESPONSE_MESSAGE_TYPE = 14550273;
+ private static final int REQUEST_JOB_ID_FIELD_OFFSET = PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int REQUEST_INITIAL_FRAME_SIZE = REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
+ private static final int RESPONSE_JOB_STATUS_FIELD_OFFSET = RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE = RESPONSE_JOB_STATUS_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+
+ private SeaTunnelGetJobStatusCodec() {
+ }
+
+ public static ClientMessage encodeRequest(long jobId) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ clientMessage.setRetryable(true);
+ clientMessage.setOperationName("SeaTunnel.GetJobStatus");
+ Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
+ clientMessage.add(initialFrame);
+ return clientMessage;
+ }
+
+ public static long decodeRequest(ClientMessage clientMessage) {
+ ForwardFrameIterator iterator = clientMessage.frameIterator();
+ Frame initialFrame = iterator.next();
+ return decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
+ }
+
+ public static ClientMessage encodeResponse(int jobStatus) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET, jobStatus);
+ clientMessage.add(initialFrame);
+
+ return clientMessage;
+ }
+
+ public static int decodeResponse(ClientMessage clientMessage) {
+ ForwardFrameIterator iterator = clientMessage.frameIterator();
+ Frame initialFrame = iterator.next();
+ return decodeInt(initialFrame.content, RESPONSE_JOB_STATUS_FIELD_OFFSET);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 19f616d29..39cb3c386 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -80,3 +80,39 @@ methods:
nullable: false
since: 2.0
doc: ''
+
+ - id: 4
+ name: cancelJob
+ since: 2.0
+ doc: ''
+ request:
+ retryable: true
+ partitionIdentifier: -1
+ params:
+ - name: jobId
+ type: long
+ nullable: false
+ since: 2.0
+ doc: ''
+ response: {}
+
+ - id: 5
+ name: getJobStatus
+ since: 2.0
+ doc: ''
+ request:
+ retryable: true
+ partitionIdentifier: -1
+ params:
+ - name: jobId
+ type: long
+ nullable: false
+ since: 2.0
+ doc: ''
+ response:
+ params:
+ - name: jobStatus
+ type: int
+ nullable: false
+ since: 2.0
+ doc: ''
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 019a97797..38eb6205f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -167,4 +167,27 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
return runningJobMaster.getJobMasterCompleteFuture();
}
}
+
+ public PassiveCompletableFuture<Void> cancelJob(long jodId) {
+ JobMaster runningJobMaster = runningJobMasterMap.get(jodId);
+ if (runningJobMaster == null) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.complete(null);
+ return new PassiveCompletableFuture<>(future);
+ } else {
+ return new PassiveCompletableFuture<>(CompletableFuture.supplyAsync(() -> {
+ runningJobMaster.cancelJob();
+ return null;
+ }));
+ }
+ }
+
+ public JobStatus getJobStatus(long jobId) {
+ JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
+ if (runningJobMaster == null) {
+ // TODO Get Job Status from JobHistoryStorage
+ return JobStatus.FINISHED;
+ }
+ return runningJobMaster.getJobStatus();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index df9651768..1bd2b4b9e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -25,7 +25,6 @@ import static java.util.stream.Collectors.partitioningBy;
import static java.util.stream.Collectors.toList;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.loader.SeatunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
@@ -183,11 +182,18 @@ public class TaskExecutionService {
return new PassiveCompletableFuture<>(resultFuture);
}
+ /**
+ * JobMaster call this method to cancel a task, and then {@link TaskExecutionService} cancel this task and send the
+ * {@link TaskExecutionState} to JobMaster.
+ *
+ * @param taskId TaskGroup.getId()
+ */
public void cancelTaskGroup(long taskId) {
+ logger.info(String.format("Task (%s) need cancel.", taskId));
if (cancellationFutures.containsKey(taskId)) {
cancellationFutures.get(taskId).cancel(false);
} else {
- throw new SeaTunnelEngineException(String.format("taskId : %s is not exist", taskId));
+ logger.warning(String.format("need cancel taskId : %s is not exist", taskId));
}
}
@@ -378,7 +384,7 @@ public class TaskExecutionService {
if (ex == null) {
future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FINISHED, null));
} else if (isCancel.get()) {
- future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.CANCELED, ex));
+ future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.CANCELED, null));
} else {
future.complete(new TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, ex));
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index b769f76be..dc79fceec 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -32,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
public class PhysicalPlan {
@@ -72,6 +74,8 @@ public class PhysicalPlan {
private final ExecutorService executorService;
+ private final String jobFullName;
+
public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
@NonNull ExecutorService executorService,
@NonNull JobImmutableInformation jobImmutableInformation,
@@ -90,6 +94,7 @@ public class PhysicalPlan {
if (pipelineList.isEmpty()) {
throw new UnknownPhysicalPlanException("The physical plan didn't have any can execute pipeline");
}
+ this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(), jobImmutableInformation.getJobId());
Arrays.stream(this.waitForCompleteBySubPlan).forEach(x -> {
x.whenComplete((v, t) -> {
// We need not handle t, Because we will not return t from Pipeline
@@ -109,9 +114,9 @@ public class PhysicalPlan {
if (failedPipelineNum.get() > 0) {
updateJobState(JobStatus.FAILING);
} else if (canceledPipelineNum.get() > 0) {
- updateJobState(JobStatus.CANCELED);
+ turnToEndState(JobStatus.CANCELED);
} else {
- updateJobState(JobStatus.FINISHED);
+ turnToEndState(JobStatus.FINISHED);
}
jobEndFuture.complete(jobStatus.get());
}
@@ -119,25 +124,64 @@ public class PhysicalPlan {
});
}
- public PassiveCompletableFuture<Void> cancelJob() {
- CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
- // TODO Implement cancel pipeline in job.
- return null;
- });
+ public void cancelJob() {
+ if (!updateJobState(JobStatus.CREATED, JobStatus.CANCELED)) {
+ // may be running, failing, failed, canceling , canceled, finished
+ if (updateJobState(JobStatus.RUNNING, JobStatus.CANCELLING)) {
+ cancelRunningJob();
+ } else {
+ LOGGER.info(String.format("%s in a non cancellable state: %s, skip cancel", jobFullName, jobStatus.get()));
+ }
+ }
+ }
- cancelFuture.complete(null);
- return new PassiveCompletableFuture<>(cancelFuture);
+ private void cancelRunningJob() {
+ List<CompletableFuture<Void>> collect = pipelineList.stream().map(pipeline -> {
+ if (!pipeline.getPipelineState().get().isEndState() &&
+ !PipelineState.CANCELING.equals(pipeline.getPipelineState().get())) {
+ CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
+ pipeline.cancelPipeline();
+ return null;
+ });
+ return future;
+ }
+ return null;
+ }).filter(x -> x != null).collect(Collectors.toList());
+
+ try {
+ CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
+ collect.toArray(new CompletableFuture[collect.size()]));
+ voidCompletableFuture.get();
+ } catch (Exception e) {
+ LOGGER.severe(
+ String.format("%s cancel error with exception: %s", jobFullName, ExceptionUtils.getMessage(e)));
+ }
}
public List<SubPlan> getPipelineList() {
return pipelineList;
}
- public void turnToRunning() {
- if (!updateJobState(JobStatus.CREATED, JobStatus.RUNNING)) {
- throw new IllegalStateException(
- "Job may only be scheduled from state " + JobStatus.CREATED);
+ public boolean turnToRunning() {
+ return updateJobState(JobStatus.CREATED, JobStatus.RUNNING);
+ }
+
+ private void turnToEndState(@NonNull JobStatus endState) {
+ // consistency check
+ if (jobStatus.get().isEndState()) {
+ String message = "Job is trying to leave terminal state " + jobStatus.get();
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
}
+
+ if (!endState.isEndState()) {
+ String message = "Need a end state, not " + endState;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ jobStatus.set(endState);
+ stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
}
public boolean updateJobState(@NonNull JobStatus targetState) {
@@ -153,8 +197,7 @@ public class PhysicalPlan {
}
// now do the actual state transition
- if (jobStatus.get() == current) {
- jobStatus.set(targetState);
+ if (jobStatus.compareAndSet(current, targetState)) {
LOGGER.info(String.format("Job %s (%s) turn from state %s to %s.",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
@@ -179,4 +222,8 @@ public class PhysicalPlan {
public JobStatus getJobStatus() {
return jobStatus.get();
}
+
+ public String getJobFullName() {
+ return jobFullName;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index 84f9c25db..d63c87859 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -19,14 +19,16 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionVertex;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
-import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
+import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
import com.hazelcast.cluster.Address;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -100,6 +102,10 @@ public class PhysicalVertex {
private final NodeEngine nodeEngine;
+ private Address currentExecutionAddress;
+
+ private TaskGroupImmutableInformation taskGroupImmutableInformation;
+
public PhysicalVertex(long physicalVertexId,
int subTaskGroupIndex,
@NonNull ExecutorService executorService,
@@ -145,25 +151,56 @@ public class PhysicalVertex {
@SuppressWarnings("checkstyle:MagicNumber")
// This method must not throw an exception
public void deploy(@NonNull Address address) {
- TaskGroupImmutableInformation taskGroupImmutableInformation =
- new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
- nodeEngine.getSerializationService().toData(this.taskGroup),
- this.pluginJarsUrls);
+ currentExecutionAddress = address;
+ taskGroupImmutableInformation = new TaskGroupImmutableInformation(flakeIdGenerator.newId(),
+ nodeEngine.getSerializationService().toData(this.taskGroup),
+ this.pluginJarsUrls);
try {
- waitForCompleteByExecutionService = new PassiveCompletableFuture<>(
- nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
- new DeployTaskOperation(nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
- address)
- .invoke());
- updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
+ if (ExecutionState.DEPLOYING.equals(executionState.get())) {
+ waitForCompleteByExecutionService = new PassiveCompletableFuture<>(
+ nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ new DeployTaskOperation(
+ nodeEngine.getSerializationService().toData(taskGroupImmutableInformation)),
+ currentExecutionAddress)
+ .invoke());
+
+ // may be canceling
+ if (!updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+ // If we found the task state turned to CANCELING after deployed to TaskExecutionService. We need
+ // notice the TaskExecutionService to cancel this task.
+ noticeTaskExecutionServiceCancel();
+ if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
+ turnToEndState(ExecutionState.CANCELED);
+ taskFuture.complete(
+ new TaskExecutionState(this.physicalVertexId, ExecutionState.CANCELED, null));
+ } else {
+ turnToEndState(ExecutionState.FAILED);
+ taskFuture.complete(new TaskExecutionState(this.physicalVertexId, ExecutionState.FAILED,
+ new JobException(String.format("%s turn to a unexpected state: %s, make it Failed",
+ this.getTaskFullName(), executionState.get()))));
+ }
+ }
+ } else if (ExecutionState.CANCELING.equals(this.getExecutionState().get())) {
+ turnToEndState(ExecutionState.CANCELED);
+ taskFuture.complete(new TaskExecutionState(this.physicalVertexId, executionState.get(), null));
+ } else {
+ turnToEndState(ExecutionState.FAILED);
+ taskFuture.complete(new TaskExecutionState(this.physicalVertexId, executionState.get(),
+ new JobException(String.format("%s turn to a unexpected state"))));
+ }
+
} catch (Throwable th) {
LOGGER.severe(String.format("%s deploy error with Exception: %s",
this.taskFullName,
ExceptionUtils.getMessage(th)));
- updateTaskState(ExecutionState.DEPLOYING, ExecutionState.FAILED);
+ turnToEndState(ExecutionState.FAILED);
taskFuture.complete(
- new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED, th));
+ new TaskExecutionState(this.physicalVertexId, ExecutionState.FAILED, th));
+ }
+
+ if (waitForCompleteByExecutionService == null) {
+ return;
}
waitForCompleteByExecutionService.whenComplete((v, t) -> {
@@ -174,7 +211,7 @@ public class PhysicalVertex {
new TaskExecutionState(taskGroupImmutableInformation.getExecutionId(), ExecutionState.FAILED,
t));
} else {
- updateTaskState(executionState.get(), v.getExecutionState());
+ turnToEndState(v.getExecutionState());
if (v.getThrowable() != null) {
LOGGER.severe(String.format("%s end with state %s and Exception: %s",
this.taskFullName,
@@ -190,7 +227,7 @@ public class PhysicalVertex {
} catch (Throwable th) {
LOGGER.severe(
String.format("%s end with Exception: %s", this.taskFullName, ExceptionUtils.getMessage(th)));
- updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED);
+ turnToEndState(ExecutionState.FAILED);
v = new TaskExecutionState(v.getTaskExecutionId(), ExecutionState.FAILED, th);
taskFuture.complete(v);
}
@@ -201,6 +238,27 @@ public class PhysicalVertex {
return physicalVertexId;
}
+ private void turnToEndState(@NonNull ExecutionState endState) {
+ // consistency check
+ if (executionState.get().isEndState()) {
+ String message = "Task is trying to leave terminal state " + executionState.get();
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ if (!endState.isEndState()) {
+ String message = "Need a end state, not " + endState;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ LOGGER.info(String.format("%s turn to end state %s.",
+ taskFullName,
+ endState));
+ executionState.set(endState);
+ stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
+ }
+
public boolean updateTaskState(@NonNull ExecutionState current, @NonNull ExecutionState targetState) {
// consistency check
if (current.isEndState()) {
@@ -228,8 +286,7 @@ public class PhysicalVertex {
}
// now do the actual state transition
- if (executionState.get() == current) {
- executionState.set(targetState);
+ if (executionState.compareAndSet(current, targetState)) {
LOGGER.info(String.format("%s turn from state %s to %s.",
taskFullName,
current,
@@ -245,4 +302,47 @@ public class PhysicalVertex {
public TaskGroupDefaultImpl getTaskGroup() {
return taskGroup;
}
+
+ public void cancel() {
+ if (updateTaskState(ExecutionState.CREATED, ExecutionState.CANCELED) ||
+ updateTaskState(ExecutionState.SCHEDULED, ExecutionState.CANCELED)) {
+ taskFuture.complete(new TaskExecutionState(this.physicalVertexId, ExecutionState.CANCELED, null));
+ } else if (updateTaskState(ExecutionState.DEPLOYING, ExecutionState.CANCELING)) {
+ // do nothing, because even if task is deployed to TaskExecutionService, we can do the cancel in deploy method
+ } else if (updateTaskState(ExecutionState.RUNNING, ExecutionState.CANCELING)) {
+ noticeTaskExecutionServiceCancel();
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void noticeTaskExecutionServiceCancel() {
+ int i = 0;
+ // In order not to generate uncontrolled tasks, We will try again until the taskFuture is completed
+ while (!taskFuture.isDone()) {
+ try {
+ i++;
+ nodeEngine.getOperationService().createInvocationBuilder(Constant.SEATUNNEL_SERVICE_NAME,
+ new CancelTaskOperation(taskGroup.getId()),
+ currentExecutionAddress)
+ .invoke().get();
+ return;
+ } catch (Exception e) {
+ LOGGER.warning(String.format("%s cancel failed with Exception: %s, retry %s", this.getTaskFullName(),
+ ExceptionUtils.getMessage(e), i));
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ public AtomicReference<ExecutionState> getExecutionState() {
+ return executionState;
+ }
+
+ public String getTaskFullName() {
+ return taskFullName;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index 950a63e95..fbccceef8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineState;
@@ -32,6 +33,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
public class SubPlan {
private static final ILogger LOGGER = Logger.getLogger(SubPlan.class);
@@ -122,13 +124,13 @@ public class SubPlan {
if (finishedTaskNum.incrementAndGet() == (physicalVertexList.size() + coordinatorVertexList.size())) {
if (failedTaskNum.get() > 0) {
- updatePipelineState(PipelineState.FAILED);
+ turnToEndState(PipelineState.FAILED);
LOGGER.info(String.format("%s end with state FAILED", this.pipelineFullName));
} else if (canceledTaskNum.get() > 0) {
- updatePipelineState(PipelineState.CANCELED);
+ turnToEndState(PipelineState.CANCELED);
LOGGER.info(String.format("%s end with state CANCELED", this.pipelineFullName));
} else {
- updatePipelineState(PipelineState.FINISHED);
+ turnToEndState(PipelineState.FINISHED);
LOGGER.info(String.format("%s end with state FINISHED", this.pipelineFullName));
}
pipelineFuture.complete(pipelineState.get());
@@ -137,8 +139,22 @@ public class SubPlan {
});
}
- public boolean updatePipelineState(@NonNull PipelineState targetState) {
- return updatePipelineState(pipelineState.get(), targetState);
+ private void turnToEndState(@NonNull PipelineState endState) {
+ // consistency check
+ if (pipelineState.get().isEndState()) {
+ String message = "Pipeline is trying to leave terminal state " + pipelineState.get();
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ if (!endState.isEndState()) {
+ String message = "Need a end state, not " + endState;
+ LOGGER.severe(message);
+ throw new IllegalStateException(message);
+ }
+
+ pipelineState.set(endState);
+ stateTimestamps[endState.ordinal()] = System.currentTimeMillis();
}
public boolean updatePipelineState(@NonNull PipelineState current, @NonNull PipelineState targetState) {
@@ -168,8 +184,7 @@ public class SubPlan {
}
// now do the actual state transition
- if (pipelineState.get() == current) {
- pipelineState.set(targetState);
+ if (pipelineState.compareAndSet(current, targetState)) {
LOGGER.info(String.format("%s turn from state %s to %s.",
pipelineFullName,
current,
@@ -182,20 +197,57 @@ public class SubPlan {
}
}
- public PassiveCompletableFuture<Void> cancelPipeline() {
- CompletableFuture<Void> cancelFuture = CompletableFuture.supplyAsync(() -> {
- // TODO Implement cancel tasks in pipeline.
- return null;
- });
+ public void cancelPipeline() {
+ if (!updatePipelineState(PipelineState.CREATED, PipelineState.CANCELED) &&
+ !updatePipelineState(PipelineState.SCHEDULED, PipelineState.CANCELED)) {
+ // may be deploying, running, failed, canceling , canceled, finished
+ if (updatePipelineState(PipelineState.DEPLOYING, PipelineState.CANCELING) ||
+ updatePipelineState(PipelineState.RUNNING, PipelineState.CANCELING)) {
+ cancelRunningPipeline();
+ } else {
+ LOGGER.info(
+ String.format("%s in a non cancellable state: %s, skip cancel", pipelineFullName, pipelineState.get()));
+ }
+ } else {
+ pipelineFuture.complete(PipelineState.CANCELED);
+ }
+ }
+
+ private void cancelRunningPipeline() {
+ List<CompletableFuture<Void>> coordinatorCancelList =
+ coordinatorVertexList.stream().map(coordinator -> cancelTask(coordinator)).filter(x -> x != null)
+ .collect(Collectors.toList());
+
+ List<CompletableFuture<Void>> taskCancelList =
+ physicalVertexList.stream().map(task -> cancelTask(task)).filter(x -> x != null)
+ .collect(Collectors.toList());
+
+ try {
+ coordinatorCancelList.addAll(taskCancelList);
+ CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
+ coordinatorCancelList.toArray(new CompletableFuture[coordinatorCancelList.size()]));
+ voidCompletableFuture.get();
+ } catch (Exception e) {
+ LOGGER.severe(
+ String.format("%s cancel error with exception: %s", pipelineFullName, ExceptionUtils.getMessage(e)));
+ }
+ }
- cancelFuture.complete(null);
- return new PassiveCompletableFuture<>(cancelFuture);
+ private CompletableFuture<Void> cancelTask(@NonNull PhysicalVertex task) {
+ if (!task.getExecutionState().get().isEndState() &&
+ !ExecutionState.CANCELING.equals(task.getExecutionState().get())) {
+ CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
+ task.cancel();
+ return null;
+ });
+ return future;
+ }
+ return null;
}
public void failedWithNoEnoughResource() {
LOGGER.severe(String.format("%s failed with have no enough resource to run.", this.getPipelineFullName()));
- updatePipelineState(PipelineState.SCHEDULED, PipelineState.FAILED);
- pipelineFuture.complete(PipelineState.FAILED);
+ cancelPipeline();
}
public int getPipelineIndex() {
@@ -213,4 +265,8 @@ public class SubPlan {
public String getPipelineFullName() {
return pipelineFullName;
}
+
+ public AtomicReference<PipelineState> getPipelineState() {
+ return pipelineState;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 3a7895389..1383797f9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -134,7 +134,11 @@ public class JobMaster implements Runnable {
}
public void cleanJob() {
- // TODO clean something
+ // TODO Add some job clean operation
+ }
+
+ public void cancelJob() {
+ this.physicalPlan.cancelJob();
}
public ResourceManager getResourceManager() {
@@ -152,4 +156,8 @@ public class JobMaster implements Runnable {
public JobImmutableInformation getJobImmutableInformation() {
return jobImmutableInformation;
}
+
+ public JobStatus getJobStatus() {
+ return physicalPlan.getJobStatus();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
similarity index 53%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
index 4c48060b4..86c020cd4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CancelJobOperation.java
@@ -21,44 +21,23 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
-import com.hazelcast.internal.nio.IOUtil;
-import com.hazelcast.internal.serialization.Data;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import lombok.NonNull;
-
-import java.io.IOException;
-
-public class DeployTaskOperation extends AsyncOperation {
- private Data taskImmutableInformation;
-
- public DeployTaskOperation() {
+public class CancelJobOperation extends AbstractJobAsyncOperation {
+ public CancelJobOperation() {
+ super();
}
- public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
- this.taskImmutableInformation = taskImmutableInformation;
+ public CancelJobOperation(long jobId) {
+ super(jobId);
}
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
- SeaTunnelServer server = getService();
- return server.getTaskExecutionService().deployTask(taskImmutableInformation);
+ SeaTunnelServer service = getService();
+ return service.cancelJob(jobId);
}
@Override
public int getClassId() {
- return OperationDataSerializerHook.DEPLOY_TASK_OPERATOR;
- }
-
- @Override
- protected void writeInternal(ObjectDataOutput out) throws IOException {
- super.writeInternal(out);
- IOUtil.writeData(out, taskImmutableInformation);
- }
-
- @Override
- protected void readInternal(ObjectDataInput in) throws IOException {
- super.readInternal(in);
- taskImmutableInformation = IOUtil.readData(in);
+ return OperationDataSerializerHook.CANCEL_JOB_OPERATOR;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
similarity index 61%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
index 4c48060b4..9d6d4cd14 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobStatusOperation.java
@@ -17,48 +17,59 @@
package org.apache.seatunnel.engine.server.operation;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
-import com.hazelcast.internal.nio.IOUtil;
-import com.hazelcast.internal.serialization.Data;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
-import lombok.NonNull;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-public class DeployTaskOperation extends AsyncOperation {
- private Data taskImmutableInformation;
+public class GetJobStatusOperation extends Operation implements IdentifiedDataSerializable, AllowedDuringPassiveState {
+ private long jobId;
- public DeployTaskOperation() {
+ private int response;
+
+ public GetJobStatusOperation() {
}
- public DeployTaskOperation(@NonNull Data taskImmutableInformation) {
- this.taskImmutableInformation = taskImmutableInformation;
+ public GetJobStatusOperation(long jobId) {
+ this.jobId = jobId;
}
@Override
- protected PassiveCompletableFuture<?> doRun() throws Exception {
- SeaTunnelServer server = getService();
- return server.getTaskExecutionService().deployTask(taskImmutableInformation);
+ public final int getFactoryId() {
+ return OperationDataSerializerHook.FACTORY_ID;
}
@Override
public int getClassId() {
- return OperationDataSerializerHook.DEPLOY_TASK_OPERATOR;
+ return OperationDataSerializerHook.GET_JOB_STATUS_OPERATOR;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- IOUtil.writeData(out, taskImmutableInformation);
+ out.writeLong(jobId);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- taskImmutableInformation = IOUtil.readData(in);
+ jobId = in.readLong();
+ }
+
+ @Override
+ public void run() {
+ SeaTunnelServer service = getService();
+ response = service.getJobStatus(jobId).ordinal();
+ }
+
+ @Override
+ public Object getResponse() {
+ return response;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
new file mode 100644
index 000000000..043de6eb3
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/CancelJobTask.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class CancelJobTask extends AbstractSeaTunnelMessageTask<Long, Void> {
+ protected CancelJobTask(ClientMessage clientMessage, Node node, Connection connection) {
+ super(clientMessage, node, connection,
+ SeaTunnelCancelJobCodec::decodeRequest,
+ x -> SeaTunnelCancelJobCodec.encodeResponse());
+ }
+
+ @Override
+ protected Operation prepareOperation() {
+ return new CancelJobOperation(parameters);
+ }
+
+ @Override
+ public String getMethodName() {
+ return "cancelJob";
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return new Object[0];
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStatusTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStatusTask.java
new file mode 100644
index 000000000..4202c804e
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobStatusTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
+import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetJobStatusTask extends AbstractSeaTunnelMessageTask<Long, Integer> {
+
+ protected GetJobStatusTask(ClientMessage clientMessage, Node node, Connection connection) {
+ super(clientMessage, node, connection,
+ SeaTunnelGetJobStatusCodec::decodeRequest,
+ SeaTunnelGetJobStatusCodec::encodeResponse);
+ }
+
+ @Override
+ protected Operation prepareOperation() {
+ return new GetJobStatusOperation(parameters);
+ }
+
+ @Override
+ public String getMethodName() {
+ return "getJobStatus";
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return new Object[0];
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 9876f489e..902e42cb6 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.protocol.task;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;
@@ -49,5 +51,9 @@ public class SeaTunnelMessageTaskFactoryProvider implements MessageTaskFactoryPr
(clientMessage, connection) -> new SubmitJobTask(clientMessage, node, connection));
factories.put(SeaTunnelWaitForJobCompleteCodec.REQUEST_MESSAGE_TYPE,
(clientMessage, connection) -> new WaitForJobCompleteTask(clientMessage, node, connection));
+ factories.put(SeaTunnelCancelJobCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) -> new CancelJobTask(clientMessage, node, connection));
+ factories.put(SeaTunnelGetJobStatusCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) -> new GetJobStatusTask(clientMessage, node, connection));
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
index 5d608a62a..8d42737d2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.engine.server.scheduler;
+import org.apache.seatunnel.engine.common.exception.JobException;
import org.apache.seatunnel.engine.common.exception.JobNoEnoughResourceException;
+import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineState;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
+import org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex;
import org.apache.seatunnel.engine.server.dag.physical.SubPlan;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.master.JobMaster;
@@ -47,34 +50,43 @@ public class PipelineBaseScheduler implements JobScheduler {
@Override
public void startScheduling() {
- physicalPlan.turnToRunning();
- physicalPlan.getPipelineList().forEach(pipeline -> {
- pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED);
- if (applyResourceForPipeline(pipeline)) {
+ if (physicalPlan.turnToRunning()) {
+ List<CompletableFuture<Object>> collect = physicalPlan.getPipelineList().stream().map(pipeline -> {
+ if (!pipeline.updatePipelineState(PipelineState.CREATED, PipelineState.SCHEDULED)) {
+ handlePipelineStateUpdateError(pipeline, PipelineState.SCHEDULED);
+ return null;
+ }
+ if (!applyResourceForPipeline(pipeline)) {
+ return null;
+ }
// deploy pipeline
- deployPipeline(pipeline);
- } else {
- pipeline.failedWithNoEnoughResource();
+ return CompletableFuture.supplyAsync(() -> {
+ deployPipeline(pipeline);
+ return null;
+ });
+ }).filter(x -> x != null).collect(Collectors.toList());
+ try {
+ CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
+ collect.toArray(new CompletableFuture[collect.size()]));
+ voidCompletableFuture.get();
+ } catch (Exception e) {
+ // cancel pipeline and throw an exception
+ physicalPlan.cancelJob();
+ throw new RuntimeException(e);
}
- });
+ } else if (!JobStatus.CANCELED.equals(physicalPlan.getJobStatus())) {
+ throw new JobException(String.format("%s turn to a unexpected state: %s", physicalPlan.getJobFullName(),
+ physicalPlan.getJobStatus()));
+ }
}
private boolean applyResourceForPipeline(@NonNull SubPlan subPlan) {
try {
// apply resource for coordinators
- subPlan.getCoordinatorVertexList().forEach(coordinator -> {
- // TODO If there is no enough resources for tasks, we need add some wait profile
- coordinator.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
- resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
- coordinator.getTaskGroup().getId());
- });
+ subPlan.getCoordinatorVertexList().forEach(coordinator -> applyResourceForTask(coordinator));
// apply resource for other tasks
- subPlan.getPhysicalVertexList().forEach(task -> {
- task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
- resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
- task.getTaskGroup().getId());
- });
+ subPlan.getPhysicalVertexList().forEach(task -> applyResourceForTask(task));
} catch (JobNoEnoughResourceException e) {
LOGGER.severe(e);
return false;
@@ -83,39 +95,85 @@ public class PipelineBaseScheduler implements JobScheduler {
return true;
}
- private void deployPipeline(@NonNull SubPlan pipeline) {
- pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING);
- List<CompletableFuture> deployCoordinatorFuture =
- pipeline.getCoordinatorVertexList().stream().map(coordinator -> {
- if (coordinator.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
- // deploy is a time-consuming operation, so we do it async
- return CompletableFuture.supplyAsync(() -> {
- coordinator.deploy(
- resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
- coordinator.getTaskGroup().getId()));
- return null;
- });
- }
+ private void applyResourceForTask(PhysicalVertex task) {
+ try {
+ // TODO If there is no enough resources for tasks, we need add some wait profile
+ if (task.updateTaskState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
+ resourceManager.applyForResource(physicalPlan.getJobImmutableInformation().getJobId(),
+ task.getTaskGroup().getId());
+ } else {
+ handleTaskStateUpdateError(task, ExecutionState.SCHEDULED);
+ }
+ } catch (JobNoEnoughResourceException e) {
+ LOGGER.severe(e);
+ }
+ }
+
+ private CompletableFuture<Void> deployTask(PhysicalVertex task) {
+ if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
+ // deploy is a time-consuming operation, so we do it async
+ return CompletableFuture.supplyAsync(() -> {
+ task.deploy(
+ resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
+ task.getTaskGroup().getId()));
return null;
- }).filter(x -> x != null).collect(Collectors.toList());
+ });
+ } else {
+ handleTaskStateUpdateError(task, ExecutionState.DEPLOYING);
+ }
+ return null;
+ }
+
+ private void deployPipeline(@NonNull SubPlan pipeline) {
+ if (pipeline.updatePipelineState(PipelineState.SCHEDULED, PipelineState.DEPLOYING)) {
+ List<CompletableFuture> deployCoordinatorFuture =
+ pipeline.getCoordinatorVertexList().stream().map(task -> deployTask(task)).filter(x -> x != null)
+ .collect(Collectors.toList());
- List<CompletableFuture> deployTaskFuture =
- pipeline.getPhysicalVertexList().stream().map(task -> {
- if (task.updateTaskState(ExecutionState.SCHEDULED, ExecutionState.DEPLOYING)) {
- return CompletableFuture.supplyAsync(() -> {
- task.deploy(
- resourceManager.getAppliedResource(physicalPlan.getJobImmutableInformation().getJobId(),
- task.getTaskGroup().getId()));
- return null;
- });
+ List<CompletableFuture> deployTaskFuture =
+ pipeline.getPhysicalVertexList().stream().map(task -> deployTask(task)).filter(x -> x != null)
+ .collect(Collectors.toList());
+
+ try {
+ deployCoordinatorFuture.addAll(deployTaskFuture);
+ CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
+ deployCoordinatorFuture.toArray(new CompletableFuture[deployCoordinatorFuture.size()]));
+ voidCompletableFuture.get();
+ if (!pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING)) {
+ LOGGER.info(
+ String.format("%s turn to state %s, skip the running state.", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState().get()));
}
- return null;
- }).filter(x -> x != null).collect(Collectors.toList());
+ } catch (Exception e) {
+ // cancel pipeline and throw an exception
+ pipeline.cancelPipeline();
+ throw new RuntimeException(e);
+ }
+ } else {
+ handlePipelineStateUpdateError(pipeline, PipelineState.DEPLOYING);
+ }
+ }
- deployCoordinatorFuture.addAll(deployTaskFuture);
- CompletableFuture.allOf(deployCoordinatorFuture.toArray(new CompletableFuture[deployCoordinatorFuture.size()]))
- .whenComplete((v, t) -> {
- pipeline.updatePipelineState(PipelineState.DEPLOYING, PipelineState.RUNNING);
- });
+ private void handlePipelineStateUpdateError(SubPlan pipeline, PipelineState targetState) {
+ if (PipelineState.CANCELING.equals(pipeline.getPipelineState().get()) ||
+ PipelineState.CANCELED.equals(pipeline.getPipelineState().get())) {
+ // may be canceled
+ LOGGER.info(String.format("%s turn to state %s, skip %s this pipeline.", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState().get(), targetState));
+ } else {
+ throw new JobException(
+ String.format("%s turn to a unexpected state: %s, stop scheduler job", pipeline.getPipelineFullName(),
+ pipeline.getPipelineState().get()));
+ }
+ }
+
+ private void handleTaskStateUpdateError(PhysicalVertex task, ExecutionState targetState) {
+ if (ExecutionState.CANCELING.equals(task.getExecutionState().get()) ||
+ ExecutionState.CANCELED.equals(task.getExecutionState().get())) {
+ LOGGER.info(String.format("%s be canceled, skip %s this task.", task.getTaskFullName(), targetState));
+ } else {
+ throw new JobException(String.format("%s turn to a unexpected state: %s, stop scheduler job.",
+ task.getTaskFullName(), task.getExecutionState().get()));
+ }
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 2a40f871b..4410966b1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -18,14 +18,16 @@
package org.apache.seatunnel.engine.server.serializable;
import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
import org.apache.seatunnel.engine.server.operation.CheckpointAckOperation;
import org.apache.seatunnel.engine.server.operation.CheckpointFinishedOperation;
import org.apache.seatunnel.engine.server.operation.CheckpointTriggerOperation;
-import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
+import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
import org.apache.seatunnel.engine.server.operation.TaskCompletedOperation;
import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
+import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -42,6 +44,7 @@ import com.hazelcast.spi.annotation.PrivateApi;
public final class OperationDataSerializerHook implements DataSerializerHook {
public static final int PRINT_MESSAGE_OPERATOR = 0;
public static final int SUBMIT_OPERATOR = 1;
+
public static final int DEPLOY_TASK_OPERATOR = 2;
public static final int TASK_COMPLETED_OPERATOR = 3;
@@ -52,6 +55,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
public static final int CHECKPOINT_ACK_OPERATOR = 6;
public static final int CHECKPOINT_FINISHED_OPERATOR = 7;
+ public static final int CANCEL_JOB_OPERATOR = 8;
+ public static final int GET_JOB_STATUS_OPERATOR = 9;
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -85,6 +90,10 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
return new WaitForJobCompleteOperation();
case CHECKPOINT_TRIGGER_OPERATOR:
return new CheckpointTriggerOperation();
+ case CANCEL_JOB_OPERATOR:
+ return new CancelJobOperation();
+ case GET_JOB_STATUS_OPERATOR:
+ return new GetJobStatusOperation();
case CHECKPOINT_ACK_OPERATOR:
return new CheckpointAckOperation();
case CHECKPOINT_FINISHED_OPERATOR:
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 059325a36..71c8f2c99 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConsta
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.Progress;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
+import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkUnregisterOperation;
import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
@@ -56,6 +58,10 @@ public class TaskDataSerializerHook implements DataSerializerHook {
public static final int CLOSE_REQUEST_TYPE = 10;
+ public static final int DEPLOY_TASK_OPERATOR = 11;
+
+ public static final int CANCEL_TASK_OPERATOR = 12;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -96,6 +102,10 @@ public class TaskDataSerializerHook implements DataSerializerHook {
return new Progress();
case CLOSE_REQUEST_TYPE:
return new CloseRequestOperation();
+ case DEPLOY_TASK_OPERATOR:
+ return new DeployTaskOperation();
+ case CANCEL_TASK_OPERATOR:
+ return new CancelTaskOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
similarity index 67%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
index ee3783352..37f45c17b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/CancelTaskOperation.java
@@ -15,12 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.operation.source;
+package org.apache.seatunnel.engine.server.task.operation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
@@ -29,24 +27,35 @@ import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-public class CloseRequestOperation extends Operation implements IdentifiedDataSerializable {
+/**
+ * This operation is only to notice the {@link org.apache.seatunnel.engine.server.TaskExecutionService} to cancel the
+ * task. After the final task is cancelled, the {@link org.apache.seatunnel.engine.server.TaskExecutionService} will
+ * notified JobMaster
+ */
+public class CancelTaskOperation extends Operation implements IdentifiedDataSerializable {
+ private long taskGroupId;
- private TaskLocation readerLocation;
+ public CancelTaskOperation() {
+ }
- public CloseRequestOperation() {
+ public CancelTaskOperation(long taskGroupId) {
+ this.taskGroupId = taskGroupId;
}
- public CloseRequestOperation(TaskLocation readerLocation) {
- this.readerLocation = readerLocation;
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.CANCEL_TASK_OPERATOR;
}
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SourceSeaTunnelTask<?, ?> task =
- server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupID())
- .getTaskGroup().getTask(readerLocation.getTaskID());
- task.close();
+ server.getTaskExecutionService().cancelTaskGroup(taskGroupId);
}
@Override
@@ -57,22 +66,12 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- readerLocation.writeData(out);
+ out.writeLong(taskGroupId);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- readerLocation.readData(in);
- }
-
- @Override
- public int getFactoryId() {
- return TaskDataSerializerHook.FACTORY_ID;
- }
-
- @Override
- public int getClassId() {
- return TaskDataSerializerHook.CLOSE_REQUEST_TYPE;
+ taskGroupId = in.readLong();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
similarity index 88%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
index 4c48060b4..1667caf4d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/DeployTaskOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeployTaskOperation.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.operation;
+package org.apache.seatunnel.engine.server.task.operation;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
-import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+import org.apache.seatunnel.engine.server.operation.AsyncOperation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.Data;
@@ -47,7 +48,7 @@ public class DeployTaskOperation extends AsyncOperation {
@Override
public int getClassId() {
- return OperationDataSerializerHook.DEPLOY_TASK_OPERATOR;
+ return TaskDataSerializerHook.DEPLOY_TASK_OPERATOR;
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 47a7e01d9..8fba46f6f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -47,9 +49,14 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SourceSeaTunnelTask<?, SplitT> task =
- server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupID()).getTaskGroup().getTask(taskID.getTaskID());
- task.receivedSourceSplit(splits);
+ RetryUtils.retryWithException(() -> {
+ SourceSeaTunnelTask<?, SplitT> task =
+ server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupID()).getTaskGroup()
+ .getTask(taskID.getTaskID());
+ task.receivedSourceSplit(splits);
+ return null;
+ }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+ exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
index ee3783352..18537e1f4 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/CloseRequestOperation.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.task.operation.source;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -43,10 +45,14 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SourceSeaTunnelTask<?, ?> task =
+ RetryUtils.retryWithException(() -> {
+ SourceSeaTunnelTask<?, ?> task =
server.getTaskExecutionService().getExecutionContext(readerLocation.getTaskGroupID())
- .getTaskGroup().getTask(readerLocation.getTaskID());
- task.close();
+ .getTaskGroup().getTask(readerLocation.getTaskID());
+ task.close();
+ return null;
+ }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+ exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 67272c6d9..1d6edefd5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.task.operation.source;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -46,10 +48,15 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SourceSplitEnumeratorTask<?> task =
+
+ RetryUtils.retryWithException(() -> {
+ SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
- .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
- task.requestSplit(taskID.getTaskID());
+ .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+ task.requestSplit(taskID.getTaskID());
+ return null;
+ }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+ exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index b8cabcc16..626ff1911 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.task.operation.source;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -45,10 +47,14 @@ public class SourceNoMoreElementOperation extends Operation implements Identifie
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
- SourceSplitEnumeratorTask<?> task =
+ RetryUtils.retryWithException(() -> {
+ SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getExecutionContext(enumeratorTaskID.getTaskGroupID())
- .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
- task.readerFinished(currentTaskID.getTaskID());
+ .getTaskGroup().getTask(enumeratorTaskID.getTaskID());
+ task.readerFinished(currentTaskID.getTaskID());
+ return null;
+ }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
+ exception -> exception instanceof NullPointerException, Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 5c51a3365..a36bc4871 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -77,7 +77,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
JobConfig config = new JobConfig();
config.setName("test");
- config.setMode(JobMode.BATCH);
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
@@ -115,7 +114,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
JobConfig config = new JobConfig();
config.setName("test");
- config.setMode(JobMode.BATCH);
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());