You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/27 09:02:32 UTC
[incubator-seatunnel] branch dev updated: Fix Zeta UT error (#4664)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5430ca962 Fix Zeta UT error (#4664)
5430ca962 is described below
commit 5430ca9621ffdd7f7ec183235eed29764ecd6205
Author: Eric <ga...@gmail.com>
AuthorDate: Thu Apr 27 17:02:26 2023 +0800
Fix Zeta UT error (#4664)
---
.../engine/client/SeaTunnelClientTest.java | 223 ++++++++++++---------
.../src/test/resources/hazelcast.yaml | 8 +-
2 files changed, 140 insertions(+), 91 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 554a1e782..a15dad5e3 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
@@ -35,10 +36,8 @@ import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
@@ -63,7 +62,6 @@ public class SeaTunnelClientTest {
private static SeaTunnelConfig SEATUNNEL_CONFIG = ConfigProvider.locateAndGetSeaTunnelConfig();
private static HazelcastInstance INSTANCE;
- private static SeaTunnelClient CLIENT;
@BeforeAll
public static void beforeClass() throws Exception {
@@ -77,17 +75,17 @@ public class SeaTunnelClientTest {
new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
}
- @BeforeEach
- void setUp() {
+ private SeaTunnelClient createSeaTunnelClient() {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
- CLIENT = new SeaTunnelClient(clientConfig);
+ return new SeaTunnelClient(clientConfig);
}
@Test
public void testSayHello() {
String msg = "Hello world";
- String s = CLIENT.printMessageToMaster(msg);
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ String s = seaTunnelClient.printMessageToMaster(msg);
Assertions.assertEquals(msg, s);
}
@@ -96,12 +94,13 @@ public class SeaTunnelClientTest {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
- jobConfig.setName("fake_to_file");
+ jobConfig.setName("testExecuteJob");
- JobExecutionEnvironment jobExecutionEnv =
- CLIENT.createExecutionContext(filePath, jobConfig);
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
try {
+ JobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath, jobConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(
@@ -119,6 +118,8 @@ public class SeaTunnelClientTest {
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ seaTunnelClient.close();
}
}
@@ -127,12 +128,14 @@ public class SeaTunnelClientTest {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
- jobConfig.setName("fake_to_console");
+ jobConfig.setName("testGetJobState");
- JobExecutionEnvironment jobExecutionEnv =
- CLIENT.createExecutionContext(filePath, jobConfig);
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ JobClient jobClient = seaTunnelClient.getJobClient();
try {
+ JobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath, jobConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(
@@ -145,19 +148,24 @@ public class SeaTunnelClientTest {
.untilAsserted(
() ->
Assertions.assertTrue(
- CLIENT.getJobDetailStatus(jobId).contains("RUNNING")
- && CLIENT.listJobStatus().contains("RUNNING")));
+ jobClient.getJobDetailStatus(jobId).contains("RUNNING")
+ && jobClient
+ .listJobStatus(true)
+ .contains("RUNNING")));
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
- CLIENT.getJobDetailStatus(jobId).contains("FINISHED")
- && CLIENT.listJobStatus()
+ jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+ && jobClient
+ .listJobStatus(true)
.contains("FINISHED")));
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ seaTunnelClient.close();
}
}
@@ -166,12 +174,15 @@ public class SeaTunnelClientTest {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/client_test.conf");
JobConfig jobConfig = new JobConfig();
- jobConfig.setName("fake_to_console");
+ jobConfig.setName("testGetJobMetrics");
- JobExecutionEnvironment jobExecutionEnv =
- CLIENT.createExecutionContext(filePath, jobConfig);
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ JobClient jobClient = seaTunnelClient.getJobClient();
try {
+ JobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath, jobConfig);
+
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(
@@ -184,11 +195,12 @@ public class SeaTunnelClientTest {
.untilAsserted(
() ->
Assertions.assertTrue(
- CLIENT.getJobDetailStatus(jobId).contains("FINISHED")
- && CLIENT.listJobStatus()
+ jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+ && jobClient
+ .listJobStatus(true)
.contains("FINISHED")));
- String jobMetrics = CLIENT.getJobMetrics(jobId);
+ String jobMetrics = jobClient.getJobMetrics(jobId);
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT));
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS));
@@ -197,6 +209,8 @@ public class SeaTunnelClientTest {
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ seaTunnelClient.close();
}
}
@@ -205,24 +219,36 @@ public class SeaTunnelClientTest {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
JobConfig jobConfig = new JobConfig();
- jobConfig.setName("streaming_fake_to_console");
+ jobConfig.setName("testCancelJob");
- JobExecutionEnvironment jobExecutionEnv =
- CLIENT.createExecutionContext(filePath, jobConfig);
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ JobClient jobClient = seaTunnelClient.getJobClient();
+ try {
+ JobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath, jobConfig);
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- long jobId = clientJobProxy.getJobId();
+ long jobId = clientJobProxy.getJobId();
- await().atMost(30000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "RUNNING", jobClient.getJobStatus(jobId)));
- CLIENT.cancelJob(jobId);
+ jobClient.cancelJob(jobId);
- await().atMost(30000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> Assertions.assertEquals("CANCELED", CLIENT.getJobStatus(jobId)));
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "CANCELED", jobClient.getJobStatus(jobId)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ seaTunnelClient.close();
+ }
}
@Test
@@ -232,31 +258,36 @@ public class SeaTunnelClientTest {
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_console");
- JobExecutionEnvironment jobExecutionEnv =
- CLIENT.createExecutionContext(filePath, jobConfig);
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ JobClient jobClient = seaTunnelClient.getJobClient();
try {
+ JobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath, jobConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
long jobId = clientJobProxy.getJobId();
// Running
- Assertions.assertNotNull(CLIENT.getJobInfo(jobId));
+ Assertions.assertNotNull(jobClient.getJobInfo(jobId));
await().atMost(180000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
- CLIENT.getJobDetailStatus(jobId).contains("FINISHED")
- && CLIENT.listJobStatus()
+ jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+ && jobClient
+ .listJobStatus(true)
.contains("FINISHED")));
// Finished
- JobDAGInfo jobInfo = CLIENT.getJobInfo(jobId);
+ JobDAGInfo jobInfo = jobClient.getJobInfo(jobId);
Assertions.assertTrue(
StringUtils.isNotEmpty(new ObjectMapper().writeValueAsString(jobInfo)));
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ seaTunnelClient.close();
}
}
@@ -267,56 +298,68 @@ public class SeaTunnelClientTest {
JobConfig jobConfig = new JobConfig();
jobConfig.setName("streaming_fake_to_console.conf");
- JobExecutionEnvironment jobExecutionEnv =
- CLIENT.createExecutionContext(filePath, jobConfig);
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
- long jobId = clientJobProxy.getJobId();
-
- await().atMost(30000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
-
- RetryUtils.retryWithException(
- () -> {
- CLIENT.savePointJob(jobId);
- return null;
- },
- new RetryUtils.RetryMaterial(
- Constant.OPERATION_RETRY_TIME,
- true,
- exception -> {
- // If we do savepoint for a Job which initialization has not been
- // completed yet, we will get an error.
- // In this test case, we need retry savepoint.
- return exception
- .getCause()
- .getMessage()
- .contains("Task not all ready, savepoint error");
- },
- Constant.OPERATION_RETRY_SLEEP));
-
- await().atMost(30000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> Assertions.assertEquals("FINISHED", CLIENT.getJobStatus(jobId)));
-
- Thread.sleep(1000);
- CLIENT.restoreExecutionContext(filePath, jobConfig, jobId).execute();
-
- await().atMost(30000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
-
- CLIENT.cancelJob(jobId);
-
- await().atMost(30000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> Assertions.assertEquals("CANCELED", CLIENT.getJobStatus(jobId)));
- }
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ JobClient jobClient = seaTunnelClient.getJobClient();
+
+ try {
+ JobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(filePath, jobConfig);
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+ long jobId = clientJobProxy.getJobId();
- @AfterEach
- void tearDown() {
- CLIENT.close();
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "RUNNING", jobClient.getJobStatus(jobId)));
+
+ RetryUtils.retryWithException(
+ () -> {
+ jobClient.savePointJob(jobId);
+ return null;
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+ exception -> {
+ // If we do savepoint for a Job which initialization has not been
+ // completed yet, we will get an error.
+ // In this test case, we need retry savepoint.
+ return exception
+ .getCause()
+ .getMessage()
+ .contains("Task not all ready, savepoint error");
+ },
+ Constant.OPERATION_RETRY_SLEEP));
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "FINISHED", jobClient.getJobStatus(jobId)));
+
+ Thread.sleep(1000);
+ seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobId).execute();
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "RUNNING", jobClient.getJobStatus(jobId)));
+
+ jobClient.cancelJob(jobId);
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "CANCELED", jobClient.getJobStatus(jobId)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ seaTunnelClient.close();
+ }
}
@AfterAll
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml
index 21f4d544d..6e76442a3 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml
@@ -34,4 +34,10 @@ hazelcast:
initial-mode: EAGER
class-name: org.apache.seatunnel.engine.server.persistence.FileMapStore
properties:
- path: /tmp/file-store-map
\ No newline at end of file
+ path: /tmp/file-store-map
+
+ properties:
+ hazelcast.invocation.max.retry.count: 20
+ hazelcast.tcp.join.port.try.count: 30
+ hazelcast.logging.type: log4j2
+ hazelcast.operation.generic.thread.count: 200
\ No newline at end of file