You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/14 02:44:17 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][Zeta] Fix Default Cluster Not Working In Config File (#3770)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 980f0ceb1 [Hotfix][Zeta] Fix Default Cluster Not Working In Config File (#3770)
980f0ceb1 is described below
commit 980f0ceb1ec70974d0a72b73686587c22a6d28f4
Author: Hisoka <fa...@qq.com>
AuthorDate: Tue Mar 14 10:44:11 2023 +0800
[Hotfix][Zeta] Fix Default Cluster Not Working In Config File (#3770)
* [Improve] [Engine] Fix Default Cluster Not Working In Config File
* [Fix] [Zeta] Fix Default Cluster Not Working In Config File
* [Fix] [Zeta] Fix Default Cluster Not Working In Config File
* [Fix] [Zeta] Fix Default Cluster Not Working In Config File
* [Fix] [Zeta] Fix JobMasterTest CI Error
* [Fix] [Zeta] Fix JobMasterTest CI Error
---
.../starter/seatunnel/args/ClientCommandArgs.java | 2 +-
.../starter/seatunnel/args/ServerCommandArgs.java | 2 +-
.../seatunnel/command/ClientExecuteCommand.java | 17 ++++++++++---
.../seatunnel/command/ServerExecuteCommand.java | 6 ++++-
.../engine/common/config/SeaTunnelConfig.java | 1 -
.../engine/server/master/JobMasterTest.java | 28 ++++++++++++----------
6 files changed, 36 insertions(+), 20 deletions(-)
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 964ff5380..15e670acc 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -61,7 +61,7 @@ public class ClientCommandArgs extends AbstractCommandArgs {
@Parameter(
names = {"-cn", "--cluster"},
description = "The name of cluster")
- private String clusterName = "seatunnel_default_cluster";
+ private String clusterName;
@Parameter(
names = {"-j", "--job-id"},
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
index a83b1e645..35e8e3e87 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
@@ -31,7 +31,7 @@ public class ServerCommandArgs extends CommandArgs {
@Parameter(
names = {"-cn", "--cluster"},
description = "The name of cluster")
- private String clusterName = "seatunnel_default_cluster";
+ private String clusterName;
@Override
public Command<?> buildCommand() {
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index f094c5c7b..9e5cd3bd9 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -28,12 +28,15 @@ import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+import org.apache.commons.lang3.StringUtils;
+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
@@ -76,12 +79,20 @@ public class ClientExecuteCommand implements Command<ClientCommandArgs> {
try {
String clusterName = clientCommandArgs.getClusterName();
if (clientCommandArgs.getMasterType().equals(MasterType.LOCAL)) {
- clusterName = creatRandomClusterName(clusterName);
+ clusterName =
+ creatRandomClusterName(
+ StringUtils.isNotEmpty(clusterName)
+ ? clusterName
+ : Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
instance = createServerInLocal(clusterName);
}
- seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+ if (StringUtils.isNotEmpty(clusterName)) {
+ seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+ }
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
- clientConfig.setClusterName(clusterName);
+ if (StringUtils.isNotEmpty(clusterName)) {
+ clientConfig.setClusterName(clusterName);
+ }
engineClient = new SeaTunnelClient(clientConfig);
if (clientCommandArgs.isListJob()) {
String jobStatus = engineClient.getJobClient().listJobStatus(true);
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
index f65d7fc20..aead1f8a9 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+import org.apache.commons.lang3.StringUtils;
+
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
/** This command is used to execute the SeaTunnel engine job by SeaTunnel API. */
@@ -37,7 +39,9 @@ public class ServerExecuteCommand implements Command<ServerCommandArgs> {
@Override
public void execute() {
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
- seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
+ if (StringUtils.isNotEmpty(serverCommandArgs.getClusterName())) {
+ seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
+ }
HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
index 499440045..dfbfd2c66 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/SeaTunnelConfig.java
@@ -46,7 +46,6 @@ public class SeaTunnelConfig {
.getJoin()
.getMulticastConfig()
.setMulticastPort(Constant.DEFAULT_SEATUNNEL_MULTICAST_PORT);
- hazelcastConfig.setClusterName(Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
hazelcastConfig
.getHotRestartPersistenceConfig()
.setBaseDir(new File(seatunnelHome(), "recovery").getAbsoluteFile());
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
index 7e25f0c1c..efdc9e0f7 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java
@@ -133,33 +133,35 @@ public class JobMasterTest extends AbstractSeaTunnelServerTest {
.untilAsserted(
() -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
- // call checkpoint timeout
- jobMaster.handleCheckpointError(1);
-
// Because handleCheckpointTimeout is an async method, so we need sleep 5s to waiting job
// status become running again
Thread.sleep(5000);
- // test job still run
- await().atMost(120000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> Assertions.assertEquals(JobStatus.RUNNING, jobMaster.getJobStatus()));
+ jobMaster.neverNeedRestore();
+ // call checkpoint timeout
+ jobMaster.handleCheckpointError(1);
PassiveCompletableFuture<JobResult> jobMasterCompleteFuture =
jobMaster.getJobMasterCompleteFuture();
- // cancel job
- jobMaster.cancelJob();
// test job turn to complete
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
+ // Why equals CANCELED or FAILED? because handleCheckpointError
+ // should call by CheckpointCoordinator,
+ // before do this, CheckpointCoordinator should be failed. Anyway,
+ // use handleCheckpointError not good to test checkpoint timeout.
Assertions.assertTrue(
jobMasterCompleteFuture.isDone()
- && JobStatus.CANCELED.equals(
- jobMasterCompleteFuture
- .get()
- .getStatus())));
+ && (JobStatus.CANCELED.equals(
+ jobMasterCompleteFuture
+ .get()
+ .getStatus())
+ || JobStatus.FAILED.equals(
+ jobMasterCompleteFuture
+ .get()
+ .getStatus()))));
testIMapRemovedAfterJobComplete(jobMaster);
}