You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/11 08:29:56 UTC

[incubator-seatunnel] branch dev updated: [E2E][ST-Engine] Add test data consistency in 3 node cluster and fix bug (#3038)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 97400a6f1 [E2E][ST-Engine] Add test data consistency in 3 node cluster and fix bug (#3038)
97400a6f1 is described below

commit 97400a6f131d04822803bdd841f1677b2ac39618
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Oct 11 16:29:51 2022 +0800

    [E2E][ST-Engine] Add test data consistency in 3 node cluster and fix bug (#3038)
    
    * add test data consistency in 3 node cluster
    
    * Fix FakeSource generate data have row separator and column separator
---
 .../apache/seatunnel/common/utils/FileUtils.java   | 79 ++++++++++++++++++++-
 .../seatunnel/common/utils/SeaTunnelException.java | 68 ++++++++++++++++++
 .../seatunnel/common/utils/FileUtilsTest.java      | 80 ++++++++++++++++++++++
 .../seatunnel/fake/source/FakeDataGenerator.java   |  2 +-
 .../engine/e2e/ClusterFaultToleranceIT.java        | 59 ++++++++++++----
 .../cluster_batch_fake_to_localfile_template.conf  |  4 +-
 6 files changed, 272 insertions(+), 20 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
index 795f70e89..12d67d32e 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.common.utils;
 
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.File;
@@ -26,6 +27,8 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
 
 @Slf4j
 public class FileUtils {
@@ -36,7 +39,7 @@ public class FileUtils {
             return new String(bytes);
         } catch (IOException e) {
             log.error(ExceptionUtils.getMessage(e));
-            throw new RuntimeException(e);
+            throw new SeaTunnelException(e);
         }
     }
 
@@ -48,7 +51,7 @@ public class FileUtils {
             ps.println(str);
         } catch (FileNotFoundException e) {
             log.error(ExceptionUtils.getMessage(e));
-            throw new RuntimeException(e);
+            throw new SeaTunnelException(e);
         } finally {
             if (ps != null) {
                 ps.close();
@@ -79,4 +82,76 @@ public class FileUtils {
             createParentFile(file);
         }
     }
+
+    /**
+     * return the line number of file
+     *
+     * @param filePath The file need be read
+     * @return
+     */
+    public static Long getFileLineNumber(@NonNull String filePath) {
+        try {
+            return Files.lines(Paths.get(filePath)).count();
+        } catch (IOException e) {
+            throw new SeaTunnelException(String.format("get file[%s] line error", filePath), e);
+        }
+    }
+
+    /**
+     * return the line number of all files in the dirPath
+     *
+     * @param dirPath dirPath
+     * @return
+     */
+    public static Long getFileLineNumberFromDir(@NonNull String dirPath) {
+        File file = new File(dirPath);
+        Long value = null;
+        if (file.isDirectory()) {
+            value = Arrays.stream(file.listFiles()).map(currFile -> {
+                if (currFile.isDirectory()) {
+                    return getFileLineNumberFromDir(currFile.getPath());
+                } else {
+                    return getFileLineNumber(currFile.getPath());
+                }
+            }).mapToLong(Long::longValue).sum();
+        } else {
+            value = getFileLineNumber(file.getPath());
+        }
+
+        return value;
+    }
+
+    /**
+     * clear dir and the sub dir
+     *
+     * @param filePath filePath
+     * @return
+     */
+    public static void deleteFile(@NonNull String filePath) {
+        File file = new File(filePath);
+        if (file.exists()) {
+            if (file.isDirectory()) {
+                deleteFiles(file);
+            }
+            file.delete();
+        }
+    }
+
+    private static void deleteFiles(@NonNull File file) {
+        try {
+            File[] files = file.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                File thisFile = files[i];
+                if (thisFile.isDirectory()) {
+                    deleteFiles(thisFile);
+                }
+                thisFile.delete();
+            }
+            file.delete();
+
+        } catch (Exception e) {
+            log.error("delete file [" + file.getPath() + "] error");
+            throw new SeaTunnelException(e);
+        }
+    }
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SeaTunnelException.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SeaTunnelException.java
new file mode 100644
index 000000000..750b38424
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SeaTunnelException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+public class SeaTunnelException extends RuntimeException {
+
+    /**
+     * Required for serialization support.
+     *
+     * @see java.io.Serializable
+     */
+    private static final long serialVersionUID = 2263144814025689516L;
+
+    /**
+     * <p>Constructs a new {@code SeaTunnelException} without specified
+     * detail message.</p>
+     */
+    public SeaTunnelException() {
+    }
+
+    /**
+     * <p>Constructs a new {@code SeaTunnelException} with specified
+     * detail message.</p>
+     *
+     * @param msg The error message.
+     */
+    public SeaTunnelException(final String msg) {
+        super(msg);
+    }
+
+    /**
+     * <p>Constructs a new {@code SeaTunnelException} with specified
+     * nested {@code Throwable}.</p>
+     *
+     * @param cause The {@code Exception} or {@code Error}
+     *              that caused this exception to be thrown.
+     */
+    public SeaTunnelException(final Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * <p>Constructs a new {@code SeaTunnelException} with specified
+     * detail message and nested {@code Throwable}.</p>
+     *
+     * @param msg   The error message.
+     * @param cause The {@code Exception} or {@code Error}
+     *              that caused this exception to be thrown.
+     */
+    public SeaTunnelException(final String msg, final Throwable cause) {
+        super(msg, cause);
+    }
+}
diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java
new file mode 100644
index 000000000..21dbaee08
--- /dev/null
+++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import lombok.NonNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class FileUtilsTest {
+    @Test
+    public void testGetFileLineNumber() throws Exception {
+        String filePath = "/tmp/test/file_utils/file1.txt";
+        filePath = filePath.replace("/", File.separator);
+        writeTestDataToFile(filePath);
+
+        Long fileLineNumber = FileUtils.getFileLineNumber(filePath);
+        Assertions.assertEquals(100, fileLineNumber);
+    }
+
+    @Test
+    public void testGetFileLineNumberFromDir() throws Exception {
+        String rootPath = "/tmp/test/file_utils1";
+        String dirPath1 = rootPath + "/dir1";
+        String dirPath2 = rootPath + "/dir2";
+
+        String file1 = dirPath1 + "/file1.txt";
+        String file2 = dirPath1 + "/file2.txt";
+        String file3 = dirPath2 + "/file3.txt";
+        String file4 = dirPath2 + "/file4.txt";
+
+        file1 = file1.replace("/", File.separator);
+        file2 = file2.replace("/", File.separator);
+        file3 = file3.replace("/", File.separator);
+        file4 = file4.replace("/", File.separator);
+
+        FileUtils.createNewFile(file1);
+        FileUtils.createNewFile(file2);
+        FileUtils.createNewFile(file3);
+        FileUtils.createNewFile(file4);
+
+        writeTestDataToFile(file1);
+        writeTestDataToFile(file2);
+        writeTestDataToFile(file3);
+        writeTestDataToFile(file4);
+
+        Long lines = FileUtils.getFileLineNumberFromDir(rootPath);
+        Assertions.assertEquals(100 * 4, lines);
+    }
+
+    public void writeTestDataToFile(@NonNull String filePath) throws IOException {
+        FileUtils.createNewFile(filePath);
+
+        try (BufferedWriter bw = new BufferedWriter(new FileWriter(filePath))) {
+            for (int i = 0; i < 100; i++) {
+                bw.write(i + "");
+                bw.newLine();
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 3adfb885a..b73f32784 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -113,7 +113,7 @@ public class FakeDataGenerator {
             case NULL:
                 return null;
             case BYTES:
-                return RandomUtils.nextBytes(fakeConfig.getBytesLength());
+                return RandomStringUtils.randomAlphabetic(fakeConfig.getBytesLength()).getBytes();
             case DATE:
                 return randomLocalDateTime().toLocalDate();
             case TIME:
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 76785bb9c..a1b0b1a42 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.e2e;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
@@ -29,9 +30,11 @@ import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import lombok.NonNull;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
 
 import java.io.File;
 import java.util.HashMap;
@@ -45,19 +48,23 @@ import java.util.concurrent.TimeUnit;
  */
 public class ClusterFaultToleranceIT {
 
+    public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";
+
     @Test
     public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedException {
+        String testCaseName = "testBatchJobRunOkIn3Node";
+        String testClusterName = "ClusterFaultToleranceIT_testBatchJobRunOkIn3Node";
         HazelcastInstanceImpl node1 =
             SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+                TestUtils.getClusterName(testClusterName));
 
         HazelcastInstanceImpl node2 =
             SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+                TestUtils.getClusterName(testClusterName));
 
         HazelcastInstanceImpl node3 =
             SeaTunnelServerStarter.createHazelcastInstance(
-                TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+                TestUtils.getClusterName(testClusterName));
 
         // waiting all node added to cluster
         Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
@@ -65,23 +72,16 @@ public class ClusterFaultToleranceIT {
 
         // TODO Need FakeSource support parallel first
         Common.setDeployMode(DeployMode.CLIENT);
-        String targetJobConfigNamePrefix = "cluster_batch_fake_to_localfile_fine";
-        Map<String, String> valueMap = new HashMap<>();
-        valueMap.put("target_test_job_config_file_name", targetJobConfigNamePrefix);
-
-        String targetConfigFilePath =
-            File.separator + "tmp" + File.separator + "test_conf" + File.separator + targetJobConfigNamePrefix +
-                ".conf";
-        TestUtils.createTestConfigFileFromTemplate("cluster_batch_fake_to_localfile_template.conf", valueMap,
-            targetConfigFilePath);
+        ImmutablePair<String, String> testResources = createTestResources(testCaseName);
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setName("test_cluster_fault_worker_batch_job");
+        jobConfig.setName(testCaseName);
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         clientConfig.setClusterName(
-            TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+            TestUtils.getClusterName(testClusterName));
         SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
-        JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(targetConfigFilePath, jobConfig);
+        JobExecutionEnvironment jobExecutionEnv =
+            engineClient.createExecutionContext(testResources.getRight(), jobConfig);
         ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
 
         CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
@@ -91,5 +91,34 @@ public class ClusterFaultToleranceIT {
         Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
             .untilAsserted(() -> Assertions.assertTrue(
                 objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+
+        Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+        Assertions.assertEquals(100, fileLineNumberFromDir);
+    }
+
+    /**
+     * Create the test job config file basic on cluster_batch_fake_to_localfile_template.conf
+     * It will delete the test sink target path before return the final job config file path
+     *
+     * @param testCaseName testCaseName
+     * @return
+     */
+    private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName) {
+        Map<String, String> valueMap = new HashMap<>();
+        valueMap.put(DYNAMIC_TEST_CASE_NAME, testCaseName);
+
+        String targetDir = "/tmp/hive/warehouse/" + testCaseName;
+        targetDir = targetDir.replaceAll("/", File.separator);
+
+        // clear target dir before test
+        FileUtils.deleteFile(targetDir);
+
+        String targetConfigFilePath =
+            File.separator + "tmp" + File.separator + "test_conf" + File.separator + testCaseName +
+                ".conf";
+        TestUtils.createTestConfigFileFromTemplate("cluster_batch_fake_to_localfile_template.conf", valueMap,
+            targetConfigFilePath);
+
+        return new ImmutablePair<>(targetDir, targetConfigFilePath);
     }
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index bd843c6dd..35b4e44c4 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -28,7 +28,7 @@ env {
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
-      row.num = 10
+      row.num = 100
       map.size = 10
       array.size = 10
       bytes.length = 10
@@ -78,7 +78,7 @@ transform {
 
 sink {
   LocalFile {
-    path="/tmp/hive/warehouse/${target_test_job_config_file_name}" # target_test_job_config_file_name will be replace to the final file name before test run
+    path="/tmp/hive/warehouse/${dynamic_test_case_name}" # dynamic_test_case_name will be replace to the final file name before test run
     field_delimiter="\t"
     row_delimiter="\n"
     partition_by=["c_string"]