You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/09 05:39:50 UTC
[incubator-seatunnel] branch dev updated: [hotfix][ST-Engine] Fix Serializable error (#2997)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a84379259 [hotfix][ST-Engine] Fix Serializable error (#2997)
a84379259 is described below
commit a84379259a7ad60ffcf6c5801296b07867c0cc5d
Author: Eric <ga...@gmail.com>
AuthorDate: Sun Oct 9 13:39:45 2022 +0800
[hotfix][ST-Engine] Fix Serializable error (#2997)
* Fix ResourceProfile and Checkpoint DataSerializable error
* fix CheckpointManager NPT
* run initPluginDir before all test case
* fix check style
* improve test code
* remove try catch in test
* Rollback unnecessary modifications
* Rollback unnecessary modifications
* fix ci error
* retry ci
* retry ci
* revert ignore update
* fix check style
* fix ci error
* Update seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
Co-authored-by: Hisoka <fa...@qq.com>
* fix review problem
* fix ci error
Co-authored-by: Hisoka <fa...@qq.com>
---
.../apache/seatunnel/common/utils/FileUtils.java | 83 +++++++++++++++++++
.../connector-seatunnel-e2e-base/pom.xml | 20 ++++-
.../engine/e2e/ClusterFaultToleranceIT.java | 95 ++++++++++++++++++++++
.../seatunnel/engine/e2e/JobExecutionIT.java | 80 +++++++-----------
.../org/apache/seatunnel/engine/e2e/TestUtils.java | 85 +++++++------------
.../test/resources/batch_fakesource_to_file.conf | 52 +++++++++---
.../batch_fakesource_to_file_complex.conf | 92 ++++++++++++++++++---
... cluster_batch_fake_to_localfile_template.conf} | 53 +++++++++---
.../streaming_fakesource_to_file_complex.conf | 94 +++++++++++++++++----
.../src/main/resources/hazelcast-client.yaml | 14 ++++
.../engine/server/SeaTunnelServerStarter.java | 21 ++++-
.../engine/server/TaskExecutionService.java | 2 +-
.../operation/NotifyTaskRestoreOperation.java | 2 +-
.../operation/TaskAcknowledgeOperation.java | 4 +-
.../operation/TaskReportStatusOperation.java | 2 +
.../resourcemanager/resource/SlotProfile.java | 18 ++--
.../resourcemanager/worker/WorkerProfile.java | 15 ++--
.../serializable/ResourceDataSerializerHook.java | 11 ++-
.../serializable/TaskDataSerializerHook.java | 5 ++
.../server/service/slot/SlotAndWorkerProfile.java | 38 ++++++++-
.../operation/GetTaskGroupAddressOperation.java | 6 +-
.../operation/NotifyTaskStatusOperation.java | 41 +++++++++-
.../checkpoint/CloseRequestOperation.java | 4 +-
.../task/operation/sink/SinkRegisterOperation.java | 8 +-
.../operation/source/AssignSplitOperation.java | 4 +-
.../operation/source/RequestSplitOperation.java | 4 +-
.../operation/source/SourceRegisterOperation.java | 4 +-
.../engine/server/AbstractSeaTunnelServerTest.java | 5 +-
.../engine/server/CoordinatorServiceTest.java | 14 ++--
.../apache/seatunnel/engine/server/TestUtils.java | 14 ----
30 files changed, 661 insertions(+), 229 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
new file mode 100644
index 000000000..ae1dd1cf1
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import org.apache.seatunnel.common.ExceptionUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+@Slf4j
+public class FileUtils {
+
+ public static String readFileToStr(Path path) {
+ try {
+ byte[] bytes = Files.readAllBytes(path);
+ return new String(bytes);
+ } catch (IOException e) {
+ log.error(ExceptionUtil.getMessage(e));
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void writeStringToFile(String filePath, String str) {
+ PrintStream ps = null;
+ try {
+ File file = new File(filePath);
+ ps = new PrintStream(new FileOutputStream(file));
+ ps.println(str);
+ } catch (FileNotFoundException e) {
+ log.error(ExceptionUtil.getMessage(e));
+ throw new RuntimeException(e);
+ } finally {
+ if (ps != null) {
+ ps.close();
+ }
+ }
+ }
+
+ public static void createParentFile(File file) {
+ File parentFile = file.getParentFile();
+ if (null != parentFile && !parentFile.exists()) {
+ parentFile.mkdirs();
+ createParentFile(parentFile);
+ }
+ }
+
+ /**
+ * create a new file, delete the old one if it is exists.
+ * @param filePath filePath
+ */
+ public static void createNewFile(String filePath) {
+ File file = new File(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
+
+ if (!file.getParentFile().exists()) {
+ createParentFile(file);
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index ca51d8d6a..29077000c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -33,10 +33,28 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-console</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-local</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
- <scope>compile</scope>
+ <scope>test</scope>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
new file mode 100644
index 000000000..76785bb9c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cluster fault tolerance test. Test the job recovery capability and data consistency assurance capability in case of cluster node failure
+ */
+public class ClusterFaultToleranceIT {
+
+ @Test
+ public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedException {
+ HazelcastInstanceImpl node1 =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+
+ HazelcastInstanceImpl node2 =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+
+ HazelcastInstanceImpl node3 =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+
+ // waiting all node added to cluster
+ Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertEquals(3, node1.getCluster().getMembers().size()));
+
+ // TODO Need FakeSource support parallel first
+ Common.setDeployMode(DeployMode.CLIENT);
+ String targetJobConfigNamePrefix = "cluster_batch_fake_to_localfile_fine";
+ Map<String, String> valueMap = new HashMap<>();
+ valueMap.put("target_test_job_config_file_name", targetJobConfigNamePrefix);
+
+ String targetConfigFilePath =
+ File.separator + "tmp" + File.separator + "test_conf" + File.separator + targetJobConfigNamePrefix +
+ ".conf";
+ TestUtils.createTestConfigFileFromTemplate("cluster_batch_fake_to_localfile_template.conf", valueMap,
+ targetConfigFilePath);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("test_cluster_fault_worker_batch_job");
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(
+ TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(targetConfigFilePath, jobConfig);
+ ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
+
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index ada5c4fac..c483c61b5 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -24,21 +24,16 @@ import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
-import com.google.common.collect.Lists;
import com.hazelcast.client.config.ClientConfig;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -46,19 +41,14 @@ import java.util.concurrent.TimeUnit;
public class JobExecutionIT {
@BeforeAll
public static void beforeClass() throws Exception {
- SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
- seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
- Thread.currentThread().getName(),
- new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
+ SeaTunnelServerStarter.createHazelcastInstance(TestUtils.getClusterName("JobExecutionIT"));
}
@Test
public void testSayHello() {
- SeaTunnelClientConfig seaTunnelClientConfig = new SeaTunnelClientConfig();
- seaTunnelClientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
- seaTunnelClientConfig.getNetworkConfig().setAddresses(Lists.newArrayList("localhost:5801"));
- SeaTunnelClient engineClient = new SeaTunnelClient(seaTunnelClientConfig);
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
String msg = "Hello world";
String s = engineClient.printMessageToMaster(msg);
@@ -66,10 +56,9 @@ public class JobExecutionIT {
}
@Test
- public void testExecuteJob() throws IOException {
- TestUtils.initPluginDir();
+ public void testExecuteJob() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
- String filePath = TestUtils.getResource("/batch_fakesource_to_file.conf");
+ String filePath = TestUtils.getResource("batch_fakesource_to_file.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
@@ -78,27 +67,22 @@ public class JobExecutionIT {
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
- try {
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
- return clientJobProxy.waitForJobComplete();
- });
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return clientJobProxy.waitForJobComplete();
+ });
- Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertTrue(
- objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
}
@Test
- public void cancelJobTest() throws IOException {
- TestUtils.initPluginDir();
+ public void cancelJobTest() throws Exception {
Common.setDeployMode(DeployMode.CLIENT);
- String filePath = TestUtils.getResource("/streaming_fakesource_to_file_complex.conf");
+ String filePath = TestUtils.getResource("streaming_fakesource_to_file_complex.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
@@ -107,23 +91,19 @@ public class JobExecutionIT {
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig);
- try {
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- JobStatus jobStatus1 = clientJobProxy.getJobStatus();
- Assertions.assertFalse(jobStatus1.isEndState());
- ClientJobProxy finalClientJobProxy = clientJobProxy;
- CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
- return finalClientJobProxy.waitForJobComplete();
- });
- Thread.sleep(1000);
- clientJobProxy.cancelJob();
-
- Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> Assertions.assertTrue(
- objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ JobStatus jobStatus1 = clientJobProxy.getJobStatus();
+ Assertions.assertFalse(jobStatus1.isEndState());
+ ClientJobProxy finalClientJobProxy = clientJobProxy;
+ CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
+ return finalClientJobProxy.waitForJobComplete();
+ });
+ Thread.sleep(1000);
+ clientJobProxy.cancelJob();
+
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> Assertions.assertTrue(
+ objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
+
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index e115b5af8..b9e13e3bc 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -17,71 +17,42 @@
package org.apache.seatunnel.engine.e2e;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.VariablesSubstitute;
+
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitOption;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.Map;
+@Slf4j
public class TestUtils {
public static String getResource(String confFile) {
- return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+ return System.getProperty("user.dir") + File.separator + "src" + File.separator + "test" + File.separator +
+ "resources" + File.separator + confFile;
}
- public static String getClusterName(String testClassName) {
- return System.getProperty("user.name") + "_" + testClassName;
+ /**
+ * For reduce the config files num, we can define a job config template and then create new job config file base on it.
+ *
+ * @param templateFile The basic job configuration file, which often contains some content that needs to be replaced
+ * at runtime, generates a new final job configuration file for testing after replacement
+ * @param valueMap replace kv
+ * @param targetFilePath The new config file path
+ */
+ public static void createTestConfigFileFromTemplate(@NonNull String templateFile,
+ @NonNull Map<String, String> valueMap,
+ @NonNull String targetFilePath) {
+ String templateFilePath = getResource(templateFile);
+ String confContent = FileUtils.readFileToStr(Paths.get(templateFilePath));
+ String targetConfContent = VariablesSubstitute.substitute(confContent, valueMap);
+ FileUtils.createNewFile(targetFilePath);
+ FileUtils.writeStringToFile(targetFilePath, targetConfContent);
}
- @SuppressWarnings("checkstyle:MagicNumber")
- public static void initPluginDir() throws IOException {
- // TODO change connector get method
- // copy connectors to project_root/connectors dir
- System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
- String.format("%s..%s..%s..%s", File.separator, File.separator, File.separator, File.separator));
- File seatunnelRootDir = new File(System.getProperty("SEATUNNEL_HOME"));
-
- File connectorDir = new File(seatunnelRootDir +
- File.separator +
- "connectors/seatunnel");
-
- if (connectorDir.exists()) {
- connectorDir.delete();
- }
-
- connectorDir.mkdirs();
-
- File connectorDistDir = new File(
- seatunnelRootDir +
- File.separator +
- "seatunnel-connectors-v2");
- try (Stream<Path> paths = Files.walk(connectorDistDir.toPath(), 6, FileVisitOption.FOLLOW_LINKS)) {
- paths.filter(path -> path.getFileName().toFile().getName().startsWith("connector-") && path.getFileName().toFile().getName().endsWith(".jar") && !path.getFileName().toString().contains("javadoc"))
- .collect(Collectors.toSet()).forEach(file -> {
- Path copied = Paths.get(connectorDir + File.separator + file.toFile().getName());
- Path originalPath = file.toAbsolutePath();
- try {
- Files.copy(originalPath, copied, StandardCopyOption.REPLACE_EXISTING);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- }
-
- Path targetPluginMappingFile = Paths.get(seatunnelRootDir +
- File.separator +
- "connectors" +
- File.separator +
- "plugin-mapping.properties");
-
- Path sourcePluginMappingFile = Paths.get(seatunnelRootDir + File.separator + "plugin-mapping.properties");
- try {
- Files.copy(sourcePluginMappingFile, targetPluginMappingFile, StandardCopyOption.REPLACE_EXISTING);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ public static String getClusterName(String testClassName) {
+ return System.getProperty("user.name") + "_" + testClassName;
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index 8c78ced4c..4431bf67c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -20,7 +20,6 @@
env {
# You can set flink configuration here
- execution.parallelism = 1
job.mode = "BATCH"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -29,16 +28,50 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ parallelism = 1
result_table_name = "fake"
- schema {
+ schema = {
fields {
- name = "string"
- age = "int"
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
}
}
- parallelism = 1
}
-
}
transform {
@@ -46,19 +79,16 @@ transform {
sink {
LocalFile {
- path="/tmp/hive/warehouse/test2"
+ path="/tmp/hive/warehouse/test1"
field_delimiter="\t"
row_delimiter="\n"
- partition_by=["age"]
+ partition_by=["c_string"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="text"
- sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error"
-
}
-
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index ddc450819..8d8afe530 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -20,7 +20,6 @@
env {
# You can set flink configuration here
- execution.parallelism = 1
job.mode = "BATCH"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -29,27 +28,96 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ parallelism = 1
result_table_name = "fake"
- schema {
+ schema = {
fields {
- name = "string"
- age = "int"
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
}
}
- parallelism = 1
}
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
result_table_name = "fake"
- schema {
+ schema = {
fields {
- name = "string"
- age = "int"
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
}
}
- parallelism = 1
}
-
}
transform {
@@ -60,16 +128,14 @@ sink {
path="/tmp/hive/warehouse/test2"
field_delimiter="\t"
row_delimiter="\n"
- partition_by=["age"]
+ partition_by=["c_string"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="text"
- sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",
source_table_name="fake"
}
-
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
similarity index 55%
copy from seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index 8c78ced4c..bd843c6dd 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -20,7 +20,6 @@
env {
# You can set flink configuration here
- execution.parallelism = 1
job.mode = "BATCH"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -29,16 +28,49 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
- result_table_name = "fake"
- schema {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ parallelism = 1
+ schema = {
fields {
- name = "string"
- age = "int"
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
}
}
- parallelism = 1
}
-
}
transform {
@@ -46,19 +78,16 @@ transform {
sink {
LocalFile {
- path="/tmp/hive/warehouse/test2"
+ path="/tmp/hive/warehouse/${target_test_job_config_file_name}" # target_test_job_config_file_name will be replace to the final file name before test run
field_delimiter="\t"
row_delimiter="\n"
- partition_by=["age"]
+ partition_by=["c_string"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="text"
- sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error"
-
}
-
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index d454ac791..f2c8e5fee 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -20,7 +20,6 @@
env {
# You can set flink configuration here
- execution.parallelism = 1
job.mode = "STREAMING"
execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -29,27 +28,96 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
result_table_name = "fake"
- schema {
+ parallelism = 1
+ schema = {
fields {
- name = "string"
- age = "int"
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
}
}
- parallelism = 1
}
FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
result_table_name = "fake"
- schema {
+ parallelism = 1
+ schema = {
fields {
- name = "string"
- age = "int"
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
}
}
- parallelism = 1
}
-
}
transform {
@@ -57,19 +125,17 @@ transform {
sink {
LocalFile {
- path="/tmp/hive/warehouse/test2"
+ path="/tmp/hive/warehouse/test3"
field_delimiter="\t"
row_delimiter="\n"
- partition_by=["age"]
+ partition_by=["c_string"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="text"
- sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",
source_table_name="fake"
}
-
}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
index a5fe3bc42..9552c382e 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/hazelcast-client.yaml
@@ -21,3 +21,17 @@ hazelcast-client:
network:
cluster-members:
- localhost:5801
+ - localhost:5802
+ - localhost:5803
+ - localhost:5804
+ - localhost:5805
+ - localhost:5806
+ - localhost:5807
+ - localhost:5808
+ - localhost:5809
+ - localhost:5810
+ - localhost:5811
+ - localhost:5812
+ - localhost:5813
+ - localhost:5814
+ - localhost:5815
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
index 0104c1e55..447a7fe7c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServerStarter.java
@@ -21,12 +21,29 @@ import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
public class SeaTunnelServerStarter {
public static void main(String[] args) {
+ createHazelcastInstance();
+ }
+
+ public static HazelcastInstanceImpl createHazelcastInstance(String clusterName) {
+ SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+ return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
+ seaTunnelConfig.getHazelcastConfig(),
+ HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
+ new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
+ }
+
+ public static HazelcastInstanceImpl createHazelcastInstance() {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
- HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
- Thread.currentThread().getName(), new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
+ return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
+ seaTunnelConfig.getHazelcastConfig(),
+ HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
+ new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index d2d8b67d6..6f88f89dc 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -38,8 +38,8 @@ import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
-import org.apache.seatunnel.engine.server.operation.NotifyTaskStatusOperation;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
+import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
import com.google.common.collect.Lists;
import com.hazelcast.internal.serialization.Data;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
index 9bed053ff..ec282e037 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
@@ -71,7 +71,7 @@ public class NotifyTaskRestoreOperation extends TaskOperation {
int size = in.readInt();
this.restoredState = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- restoredState.add(in.readObject(ActionSubtaskState.class));
+ restoredState.add(in.readObject());
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskAcknowledgeOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskAcknowledgeOperation.java
index 64e609048..5a5e16c54 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskAcknowledgeOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskAcknowledgeOperation.java
@@ -65,8 +65,8 @@ public class TaskAcknowledgeOperation extends Operation implements IdentifiedDat
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- taskLocation = in.readObject(TaskLocation.class);
- barrier = in.readObject(CheckpointBarrier.class);
+ taskLocation = in.readObject();
+ barrier = in.readObject();
states = in.readObject();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
index e515387c4..d5ea0d078 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/TaskReportStatusOperation.java
@@ -53,11 +53,13 @@ public class TaskReportStatusOperation extends Operation implements IdentifiedDa
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
out.writeObject(location);
+ out.writeObject(status);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
location = in.readObject(TaskLocation.class);
+ status = in.readObject();
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
index 9e4794dbf..b7a53de85 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/resource/SlotProfile.java
@@ -31,7 +31,7 @@ import java.io.IOException;
*/
public class SlotProfile implements IdentifiedDataSerializable {
- private final Address worker;
+ private Address worker;
private int slotID;
@@ -83,12 +83,12 @@ public class SlotProfile implements IdentifiedDataSerializable {
@Override
public String toString() {
return "SlotProfile{" +
- "worker=" + worker +
- ", slotID=" + slotID +
- ", ownerJobID=" + ownerJobID +
- ", assigned=" + assigned +
- ", resourceProfile=" + resourceProfile +
- '}';
+ "worker=" + worker +
+ ", slotID=" + slotID +
+ ", ownerJobID=" + ownerJobID +
+ ", assigned=" + assigned +
+ ", resourceProfile=" + resourceProfile +
+ '}';
}
@Override
@@ -103,7 +103,7 @@ public class SlotProfile implements IdentifiedDataSerializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
- worker.writeData(out);
+ out.writeObject(worker);
out.writeInt(slotID);
out.writeLong(ownerJobID);
out.writeBoolean(assigned);
@@ -112,7 +112,7 @@ public class SlotProfile implements IdentifiedDataSerializable {
@Override
public void readData(ObjectDataInput in) throws IOException {
- worker.readData(in);
+ worker = in.readObject();
slotID = in.readInt();
ownerJobID = in.readLong();
assigned = in.readBoolean();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
index fb83a7dfd..e8a73d338 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/worker/WorkerProfile.java
@@ -66,36 +66,33 @@ public class WorkerProfile implements IdentifiedDataSerializable {
@Override
public void writeData(ObjectDataOutput out) throws IOException {
- address.writeData(out);
+ out.writeObject(address);
out.writeObject(profile);
out.writeObject(unassignedResource);
out.writeInt(assignedSlots.length);
for (SlotProfile assignedSlot : assignedSlots) {
- assignedSlot.writeData(out);
+ out.writeObject(assignedSlot);
}
out.writeInt(unassignedSlots.length);
for (SlotProfile unassignedSlot : unassignedSlots) {
- unassignedSlot.writeData(out);
+ out.writeObject(unassignedSlot);
}
- out.writeObject(unassignedSlots);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
- address.readData(in);
+ address = in.readObject();
profile = in.readObject();
unassignedResource = in.readObject();
int assignedSlotsLength = in.readInt();
assignedSlots = new SlotProfile[assignedSlotsLength];
for (int i = 0; i < assignedSlots.length; i++) {
- assignedSlots[i] = new SlotProfile();
- assignedSlots[i].readData(in);
+ assignedSlots[i] = in.readObject();
}
int unassignedSlotsLength = in.readInt();
unassignedSlots = new SlotProfile[unassignedSlotsLength];
for (int i = 0; i < unassignedSlots.length; i++) {
- unassignedSlots[i] = new SlotProfile();
- unassignedSlots[i].readData(in);
+ unassignedSlots[i] = in.readObject();
}
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
index 4ec2e16a5..28710284c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ResourceDataSerializerHook.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.resourcemanager.opeartion.ResetResourc
import org.apache.seatunnel.engine.server.resourcemanager.opeartion.WorkerHeartbeatOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -45,9 +46,11 @@ public class ResourceDataSerializerHook implements DataSerializerHook {
public static final int SLOT_PROFILE_TYPE = 6;
+ public static final int SLOT_AND_WORKER_PROFILE = 7;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
- SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
- SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
+ SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY,
+ SeaTunnelFactoryIdConstant.SEATUNNEL_RESOURCE_DATA_SERIALIZER_FACTORY_ID
);
@Override
@@ -75,8 +78,10 @@ public class ResourceDataSerializerHook implements DataSerializerHook {
return new ResetResourceOperation();
case WORKER_PROFILE_TYPE:
return new WorkerProfile();
- case SLOT_PROFILE_TYPE:
+ case SLOT_PROFILE_TYPE:
return new SlotProfile();
+ case SLOT_AND_WORKER_PROFILE:
+ return new SlotAndWorkerProfile();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 828de27ae..19c2d7114 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
+import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
import org.apache.seatunnel.engine.server.task.operation.checkpoint.CloseRequestOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitOperation;
import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
@@ -66,6 +67,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
public static final int RESTORED_SPLIT_OPERATOR = 14;
+ public static final int NOTIFY_TASK_STATUS_OPERATOR = 15;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY_ID
@@ -114,6 +117,8 @@ public class TaskDataSerializerHook implements DataSerializerHook {
return new GetTaskGroupAddressOperation();
case RESTORED_SPLIT_OPERATOR:
return new RestoredSplitOperation();
+ case NOTIFY_TASK_STATUS_OPERATOR:
+ return new NotifyTaskStatusOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
index 2523e79d2..402b79656 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/slot/SlotAndWorkerProfile.java
@@ -19,14 +19,22 @@ package org.apache.seatunnel.engine.server.service.slot;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
+import org.apache.seatunnel.engine.server.serializable.ResourceDataSerializerHook;
-import java.io.Serializable;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
-public class SlotAndWorkerProfile implements Serializable {
+import java.io.IOException;
- private final WorkerProfile workerProfile;
+public class SlotAndWorkerProfile implements IdentifiedDataSerializable {
- private final SlotProfile slotProfile;
+ private WorkerProfile workerProfile;
+
+ private SlotProfile slotProfile;
+
+ public SlotAndWorkerProfile() {
+ }
public SlotAndWorkerProfile(WorkerProfile workerProfile, SlotProfile slotProfile) {
this.workerProfile = workerProfile;
@@ -43,4 +51,26 @@ public class SlotAndWorkerProfile implements Serializable {
public SlotProfile getSlotProfile() {
return slotProfile;
}
+
+ @Override
+ public int getFactoryId() {
+ return ResourceDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return ResourceDataSerializerHook.SLOT_AND_WORKER_PROFILE;
+ }
+
+ @Override
+ public void writeData(ObjectDataOutput out) throws IOException {
+ out.writeObject(workerProfile);
+ out.writeObject(slotProfile);
+ }
+
+ @Override
+ public void readData(ObjectDataInput in) throws IOException {
+ workerProfile = in.readObject();
+ slotProfile = in.readObject();
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
index 6a788c3e1..7f69773b1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetTaskGroupAddressOperation.java
@@ -66,13 +66,13 @@ public class GetTaskGroupAddressOperation extends Operation implements Identifie
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- taskLocation.writeData(out);
+ out.writeObject(taskLocation);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- taskLocation.readData(in);
+ taskLocation = in.readObject();
}
@Override
@@ -82,6 +82,6 @@ public class GetTaskGroupAddressOperation extends Operation implements Identifie
@Override
public int getClassId() {
- return TaskDataSerializerHook.CLOSE_REQUEST_TYPE;
+ return TaskDataSerializerHook.GET_TASKGROUP_ADDRESS_TYPE;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/NotifyTaskStatusOperation.java
similarity index 56%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/NotifyTaskStatusOperation.java
index 501dc468d..85216cb74 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/NotifyTaskStatusOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/NotifyTaskStatusOperation.java
@@ -15,18 +15,27 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.operation;
+package org.apache.seatunnel.engine.server.task.operation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.operationservice.Operation;
-public class NotifyTaskStatusOperation extends Operation {
+import java.io.IOException;
- private final TaskGroupLocation taskGroupLocation;
- private final TaskExecutionState taskExecutionState;
+public class NotifyTaskStatusOperation extends Operation implements IdentifiedDataSerializable {
+
+ private TaskGroupLocation taskGroupLocation;
+ private TaskExecutionState taskExecutionState;
+
+ public NotifyTaskStatusOperation() {
+ }
public NotifyTaskStatusOperation(TaskGroupLocation taskGroupLocation, TaskExecutionState taskExecutionState) {
super();
@@ -34,6 +43,30 @@ public class NotifyTaskStatusOperation extends Operation {
this.taskExecutionState = taskExecutionState;
}
+ @Override
+ public final int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.NOTIFY_TASK_STATUS_OPERATOR;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ super.writeInternal(out);
+ out.writeObject(taskGroupLocation);
+ out.writeObject(taskExecutionState);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ super.readInternal(in);
+ taskGroupLocation = in.readObject();
+ taskExecutionState = in.readObject();
+ }
+
@Override
public void run() throws Exception {
SeaTunnelServer server = getService();
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
index 859403b66..fd46082ab 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
@@ -62,13 +62,13 @@ public class CloseRequestOperation extends Operation implements IdentifiedDataSe
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- readerLocation.writeData(out);
+ out.writeObject(readerLocation);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- readerLocation.readData(in);
+ readerLocation = in.readObject();
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
index 1a326571d..77352339b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkRegisterOperation.java
@@ -68,15 +68,15 @@ public class SinkRegisterOperation extends Operation implements IdentifiedDataSe
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- writerTaskID.writeData(out);
- committerTaskID.writeData(out);
+ out.writeObject(writerTaskID);
+ out.writeObject(committerTaskID);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- writerTaskID.readData(in);
- committerTaskID.readData(in);
+ writerTaskID = in.readObject();
+ committerTaskID = in.readObject();
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 3eb20042b..7f42e4aed 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -65,13 +65,13 @@ public class AssignSplitOperation<SplitT extends SourceSplit> extends Operation
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
out.writeByteArray(splits);
- taskID.writeData(out);
+ out.writeObject(taskID);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
splits = in.readByteArray();
- taskID.readData(in);
+ taskID = in.readObject();
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 13536215d..db7803c1d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -66,8 +66,8 @@ public class RequestSplitOperation extends Operation implements IdentifiedDataSe
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
- taskID.writeData(out);
- enumeratorTaskID.writeData(out);
+ out.writeObject(taskID);
+ out.writeObject(enumeratorTaskID);
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 005cda41e..fef29aa57 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -80,8 +80,8 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
- readerTaskID.readData(in);
- enumeratorTaskID.readData(in);
+ readerTaskID = in.readObject();
+ enumeratorTaskID = in.readObject();
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 1d1e0105b..403bbd128 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -36,8 +36,9 @@ public abstract class AbstractSeaTunnelServerTest {
protected static ILogger LOGGER;
@BeforeAll
- public void before() {
- instance = TestUtils.createHazelcastInstance("AbstractSeaTunnelServerTest" + "_" + System.currentTimeMillis());
+ public void before() {
+ instance = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("AbstractSeaTunnelServerTest_" + System.currentTimeMillis()));
nodeEngine = instance.node.nodeEngine;
server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
index 3de0be829..1397206fe 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java
@@ -39,8 +39,10 @@ import java.util.concurrent.TimeUnit;
public class CoordinatorServiceTest {
@Test
public void testMasterNodeActive() {
- HazelcastInstanceImpl instance1 = TestUtils.createHazelcastInstance("CoordinatorServiceTest_testMasterNodeActive");
- HazelcastInstanceImpl instance2 = TestUtils.createHazelcastInstance("CoordinatorServiceTest_testMasterNodeActive");
+ HazelcastInstanceImpl instance1 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("CoordinatorServiceTest_testMasterNodeActive"));
+ HazelcastInstanceImpl instance2 = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("CoordinatorServiceTest_testMasterNodeActive"));
SeaTunnelServer server1 = instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
SeaTunnelServer server2 = instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
@@ -57,7 +59,7 @@ public class CoordinatorServiceTest {
// shutdown instance1
instance1.shutdown();
- await().atMost(10000, TimeUnit.MILLISECONDS)
+ await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
try {
CoordinatorService coordinatorService = server2.getCoordinatorService();
@@ -75,7 +77,8 @@ public class CoordinatorServiceTest {
@Test
public void testClearCoordinatorService()
throws MalformedURLException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
- HazelcastInstanceImpl coordinatorServiceTest = TestUtils.createHazelcastInstance("CoordinatorServiceTest_testClearCoordinatorService");
+ HazelcastInstanceImpl coordinatorServiceTest = SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName("CoordinatorServiceTest_testClearCoordinatorService"));
SeaTunnelServer server1 = coordinatorServiceTest.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
CoordinatorService coordinatorService = server1.getCoordinatorService();
Assertions.assertTrue(coordinatorService.isCoordinatorActive());
@@ -85,7 +88,8 @@ public class CoordinatorServiceTest {
TestUtils.createTestLogicalPlan("stream_fakesource_to_file.conf", "test_clear_coordinator_service", jobId);
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(jobId,
- coordinatorServiceTest.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(), Collections.emptyList());
+ coordinatorServiceTest.getSerializationService().toData(testLogicalDag), testLogicalDag.getJobConfig(),
+ Collections.emptyList());
Data data = coordinatorServiceTest.getSerializationService().toData(jobImmutableInformation);
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 213cc725e..c9f802489 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -22,9 +22,7 @@ import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
-import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -36,9 +34,6 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.parse.JobConfigParser;
import com.google.common.collect.Sets;
-import com.hazelcast.instance.impl.HazelcastInstanceFactory;
-import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.net.MalformedURLException;
@@ -82,15 +77,6 @@ public class TestUtils {
return System.getProperty("user.name") + "_" + testClassName;
}
- public static HazelcastInstanceImpl createHazelcastInstance(String clusterName) {
- SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
- seaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName(clusterName));
- return ((HazelcastInstanceProxy) HazelcastInstanceFactory.newHazelcastInstance(
- seaTunnelConfig.getHazelcastConfig(),
- HazelcastInstanceFactory.createInstanceName(seaTunnelConfig.getHazelcastConfig()),
- new SeaTunnelNodeContext(new SeaTunnelConfig()))).getOriginal();
- }
-
public static LogicalDag createTestLogicalPlan(String jobConfigFile, String jobName, Long jobId) {
Common.setDeployMode(DeployMode.CLIENT);
JobContext jobContext = new JobContext(jobId);