You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/27 09:02:32 UTC

[incubator-seatunnel] branch dev updated: Fix Zeta UT error (#4664)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5430ca962 Fix Zeta UT error (#4664)
5430ca962 is described below

commit 5430ca9621ffdd7f7ec183235eed29764ecd6205
Author: Eric <ga...@gmail.com>
AuthorDate: Thu Apr 27 17:02:26 2023 +0800

    Fix Zeta UT error (#4664)
---
 .../engine/client/SeaTunnelClientTest.java         | 223 ++++++++++++---------
 .../src/test/resources/hazelcast.yaml              |   8 +-
 2 files changed, 140 insertions(+), 91 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 554a1e782..a15dad5e3 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobClient;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
@@ -35,10 +36,8 @@ import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 import org.apache.commons.lang3.StringUtils;
 
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -63,7 +62,6 @@ public class SeaTunnelClientTest {
 
     private static SeaTunnelConfig SEATUNNEL_CONFIG = ConfigProvider.locateAndGetSeaTunnelConfig();
     private static HazelcastInstance INSTANCE;
-    private static SeaTunnelClient CLIENT;
 
     @BeforeAll
     public static void beforeClass() throws Exception {
@@ -77,17 +75,17 @@ public class SeaTunnelClientTest {
                         new SeaTunnelNodeContext(ConfigProvider.locateAndGetSeaTunnelConfig()));
     }
 
-    @BeforeEach
-    void setUp() {
+    private SeaTunnelClient createSeaTunnelClient() {
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         clientConfig.setClusterName(TestUtils.getClusterName("SeaTunnelClientTest"));
-        CLIENT = new SeaTunnelClient(clientConfig);
+        return new SeaTunnelClient(clientConfig);
     }
 
     @Test
     public void testSayHello() {
         String msg = "Hello world";
-        String s = CLIENT.printMessageToMaster(msg);
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        String s = seaTunnelClient.printMessageToMaster(msg);
         Assertions.assertEquals(msg, s);
     }
 
@@ -96,12 +94,13 @@ public class SeaTunnelClientTest {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/client_test.conf");
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("fake_to_file");
+        jobConfig.setName("testExecuteJob");
 
-        JobExecutionEnvironment jobExecutionEnv =
-                CLIENT.createExecutionContext(filePath, jobConfig);
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
 
         try {
+            JobExecutionEnvironment jobExecutionEnv =
+                    seaTunnelClient.createExecutionContext(filePath, jobConfig);
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
             CompletableFuture<JobStatus> objectCompletableFuture =
                     CompletableFuture.supplyAsync(
@@ -119,6 +118,8 @@ public class SeaTunnelClientTest {
 
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
+        } finally {
+            seaTunnelClient.close();
         }
     }
 
@@ -127,12 +128,14 @@ public class SeaTunnelClientTest {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/client_test.conf");
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("fake_to_console");
+        jobConfig.setName("testGetJobState");
 
-        JobExecutionEnvironment jobExecutionEnv =
-                CLIENT.createExecutionContext(filePath, jobConfig);
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        JobClient jobClient = seaTunnelClient.getJobClient();
 
         try {
+            JobExecutionEnvironment jobExecutionEnv =
+                    seaTunnelClient.createExecutionContext(filePath, jobConfig);
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
             CompletableFuture<JobStatus> objectCompletableFuture =
                     CompletableFuture.supplyAsync(
@@ -145,19 +148,24 @@ public class SeaTunnelClientTest {
                     .untilAsserted(
                             () ->
                                     Assertions.assertTrue(
-                                            CLIENT.getJobDetailStatus(jobId).contains("RUNNING")
-                                                    && CLIENT.listJobStatus().contains("RUNNING")));
+                                            jobClient.getJobDetailStatus(jobId).contains("RUNNING")
+                                                    && jobClient
+                                                            .listJobStatus(true)
+                                                            .contains("RUNNING")));
 
             await().atMost(30000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
                             () ->
                                     Assertions.assertTrue(
-                                            CLIENT.getJobDetailStatus(jobId).contains("FINISHED")
-                                                    && CLIENT.listJobStatus()
+                                            jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+                                                    && jobClient
+                                                            .listJobStatus(true)
                                                             .contains("FINISHED")));
 
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
+        } finally {
+            seaTunnelClient.close();
         }
     }
 
@@ -166,12 +174,15 @@ public class SeaTunnelClientTest {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/client_test.conf");
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("fake_to_console");
+        jobConfig.setName("testGetJobMetrics");
 
-        JobExecutionEnvironment jobExecutionEnv =
-                CLIENT.createExecutionContext(filePath, jobConfig);
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        JobClient jobClient = seaTunnelClient.getJobClient();
 
         try {
+            JobExecutionEnvironment jobExecutionEnv =
+                    seaTunnelClient.createExecutionContext(filePath, jobConfig);
+
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
             CompletableFuture<JobStatus> objectCompletableFuture =
                     CompletableFuture.supplyAsync(
@@ -184,11 +195,12 @@ public class SeaTunnelClientTest {
                     .untilAsserted(
                             () ->
                                     Assertions.assertTrue(
-                                            CLIENT.getJobDetailStatus(jobId).contains("FINISHED")
-                                                    && CLIENT.listJobStatus()
+                                            jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+                                                    && jobClient
+                                                            .listJobStatus(true)
                                                             .contains("FINISHED")));
 
-            String jobMetrics = CLIENT.getJobMetrics(jobId);
+            String jobMetrics = jobClient.getJobMetrics(jobId);
 
             Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT));
             Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS));
@@ -197,6 +209,8 @@ public class SeaTunnelClientTest {
 
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
+        } finally {
+            seaTunnelClient.close();
         }
     }
 
@@ -205,24 +219,36 @@ public class SeaTunnelClientTest {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("streaming_fake_to_console");
+        jobConfig.setName("testCancelJob");
 
-        JobExecutionEnvironment jobExecutionEnv =
-                CLIENT.createExecutionContext(filePath, jobConfig);
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        JobClient jobClient = seaTunnelClient.getJobClient();
+        try {
+            JobExecutionEnvironment jobExecutionEnv =
+                    seaTunnelClient.createExecutionContext(filePath, jobConfig);
 
-        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
 
-        long jobId = clientJobProxy.getJobId();
+            long jobId = clientJobProxy.getJobId();
 
-        await().atMost(30000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "RUNNING", jobClient.getJobStatus(jobId)));
 
-        CLIENT.cancelJob(jobId);
+            jobClient.cancelJob(jobId);
 
-        await().atMost(30000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> Assertions.assertEquals("CANCELED", CLIENT.getJobStatus(jobId)));
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "CANCELED", jobClient.getJobStatus(jobId)));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            seaTunnelClient.close();
+        }
     }
 
     @Test
@@ -232,31 +258,36 @@ public class SeaTunnelClientTest {
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_console");
 
-        JobExecutionEnvironment jobExecutionEnv =
-                CLIENT.createExecutionContext(filePath, jobConfig);
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        JobClient jobClient = seaTunnelClient.getJobClient();
 
         try {
+            JobExecutionEnvironment jobExecutionEnv =
+                    seaTunnelClient.createExecutionContext(filePath, jobConfig);
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
             CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
             long jobId = clientJobProxy.getJobId();
 
             // Running
-            Assertions.assertNotNull(CLIENT.getJobInfo(jobId));
+            Assertions.assertNotNull(jobClient.getJobInfo(jobId));
 
             await().atMost(180000, TimeUnit.MILLISECONDS)
                     .untilAsserted(
                             () ->
                                     Assertions.assertTrue(
-                                            CLIENT.getJobDetailStatus(jobId).contains("FINISHED")
-                                                    && CLIENT.listJobStatus()
+                                            jobClient.getJobDetailStatus(jobId).contains("FINISHED")
+                                                    && jobClient
+                                                            .listJobStatus(true)
                                                             .contains("FINISHED")));
             // Finished
-            JobDAGInfo jobInfo = CLIENT.getJobInfo(jobId);
+            JobDAGInfo jobInfo = jobClient.getJobInfo(jobId);
             Assertions.assertTrue(
                     StringUtils.isNotEmpty(new ObjectMapper().writeValueAsString(jobInfo)));
 
         } catch (Exception e) {
             throw new RuntimeException(e);
+        } finally {
+            seaTunnelClient.close();
         }
     }
 
@@ -267,56 +298,68 @@ public class SeaTunnelClientTest {
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("streaming_fake_to_console.conf");
 
-        JobExecutionEnvironment jobExecutionEnv =
-                CLIENT.createExecutionContext(filePath, jobConfig);
-        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-        CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
-        long jobId = clientJobProxy.getJobId();
-
-        await().atMost(30000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
-
-        RetryUtils.retryWithException(
-                () -> {
-                    CLIENT.savePointJob(jobId);
-                    return null;
-                },
-                new RetryUtils.RetryMaterial(
-                        Constant.OPERATION_RETRY_TIME,
-                        true,
-                        exception -> {
-                            // If we do savepoint for a Job which initialization has not been
-                            // completed yet, we will get an error.
-                            // In this test case, we need retry savepoint.
-                            return exception
-                                    .getCause()
-                                    .getMessage()
-                                    .contains("Task not all ready, savepoint error");
-                        },
-                        Constant.OPERATION_RETRY_SLEEP));
-
-        await().atMost(30000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> Assertions.assertEquals("FINISHED", CLIENT.getJobStatus(jobId)));
-
-        Thread.sleep(1000);
-        CLIENT.restoreExecutionContext(filePath, jobConfig, jobId).execute();
-
-        await().atMost(30000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> Assertions.assertEquals("RUNNING", CLIENT.getJobStatus(jobId)));
-
-        CLIENT.cancelJob(jobId);
-
-        await().atMost(30000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> Assertions.assertEquals("CANCELED", CLIENT.getJobStatus(jobId)));
-    }
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        JobClient jobClient = seaTunnelClient.getJobClient();
+
+        try {
+            JobExecutionEnvironment jobExecutionEnv =
+                    seaTunnelClient.createExecutionContext(filePath, jobConfig);
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+            long jobId = clientJobProxy.getJobId();
 
-    @AfterEach
-    void tearDown() {
-        CLIENT.close();
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "RUNNING", jobClient.getJobStatus(jobId)));
+
+            RetryUtils.retryWithException(
+                    () -> {
+                        jobClient.savePointJob(jobId);
+                        return null;
+                    },
+                    new RetryUtils.RetryMaterial(
+                            Constant.OPERATION_RETRY_TIME,
+                            true,
+                            exception -> {
+                                // If we do savepoint for a Job which initialization has not been
+                                // completed yet, we will get an error.
+                                // In this test case, we need retry savepoint.
+                                return exception
+                                        .getCause()
+                                        .getMessage()
+                                        .contains("Task not all ready, savepoint error");
+                            },
+                            Constant.OPERATION_RETRY_SLEEP));
+
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "FINISHED", jobClient.getJobStatus(jobId)));
+
+            Thread.sleep(1000);
+            seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobId).execute();
+
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "RUNNING", jobClient.getJobStatus(jobId)));
+
+            jobClient.cancelJob(jobId);
+
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "CANCELED", jobClient.getJobStatus(jobId)));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            seaTunnelClient.close();
+        }
     }
 
     @AfterAll
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml b/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml
index 21f4d544d..6e76442a3 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/hazelcast.yaml
@@ -34,4 +34,10 @@ hazelcast:
         initial-mode: EAGER
         class-name: org.apache.seatunnel.engine.server.persistence.FileMapStore
         properties:
-          path: /tmp/file-store-map
\ No newline at end of file
+          path: /tmp/file-store-map
+
+  properties:
+    hazelcast.invocation.max.retry.count: 20
+    hazelcast.tcp.join.port.try.count: 30
+    hazelcast.logging.type: log4j2
+    hazelcast.operation.generic.thread.count: 200
\ No newline at end of file