You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/11 08:29:56 UTC
[incubator-seatunnel] branch dev updated: [E2E][ST-Engine] Add test data consistency in 3 node cluster and fix bug (#3038)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 97400a6f1 [E2E][ST-Engine] Add test data consistency in 3 node cluster and fix bug (#3038)
97400a6f1 is described below
commit 97400a6f131d04822803bdd841f1677b2ac39618
Author: Eric <ga...@gmail.com>
AuthorDate: Tue Oct 11 16:29:51 2022 +0800
[E2E][ST-Engine] Add test data consistency in 3 node cluster and fix bug (#3038)
* add test data consistency in 3 node cluster
* Fix FakeSource generate data have row separator and column separator
---
.../apache/seatunnel/common/utils/FileUtils.java | 79 ++++++++++++++++++++-
.../seatunnel/common/utils/SeaTunnelException.java | 68 ++++++++++++++++++
.../seatunnel/common/utils/FileUtilsTest.java | 80 ++++++++++++++++++++++
.../seatunnel/fake/source/FakeDataGenerator.java | 2 +-
.../engine/e2e/ClusterFaultToleranceIT.java | 59 ++++++++++++----
.../cluster_batch_fake_to_localfile_template.conf | 4 +-
6 files changed, 272 insertions(+), 20 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
index 795f70e89..12d67d32e 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/FileUtils.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.common.utils;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
@@ -26,6 +27,8 @@ import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
@Slf4j
public class FileUtils {
@@ -36,7 +39,7 @@ public class FileUtils {
return new String(bytes);
} catch (IOException e) {
log.error(ExceptionUtils.getMessage(e));
- throw new RuntimeException(e);
+ throw new SeaTunnelException(e);
}
}
@@ -48,7 +51,7 @@ public class FileUtils {
ps.println(str);
} catch (FileNotFoundException e) {
log.error(ExceptionUtils.getMessage(e));
- throw new RuntimeException(e);
+ throw new SeaTunnelException(e);
} finally {
if (ps != null) {
ps.close();
@@ -79,4 +82,76 @@ public class FileUtils {
createParentFile(file);
}
}
+
+ /**
+ * return the line number of file
+ *
+ * @param filePath The file need be read
+ * @return
+ */
+ public static Long getFileLineNumber(@NonNull String filePath) {
+ try {
+ return Files.lines(Paths.get(filePath)).count();
+ } catch (IOException e) {
+ throw new SeaTunnelException(String.format("get file[%s] line error", filePath), e);
+ }
+ }
+
+ /**
+ * return the line number of all files in the dirPath
+ *
+ * @param dirPath dirPath
+ * @return
+ */
+ public static Long getFileLineNumberFromDir(@NonNull String dirPath) {
+ File file = new File(dirPath);
+ Long value = null;
+ if (file.isDirectory()) {
+ value = Arrays.stream(file.listFiles()).map(currFile -> {
+ if (currFile.isDirectory()) {
+ return getFileLineNumberFromDir(currFile.getPath());
+ } else {
+ return getFileLineNumber(currFile.getPath());
+ }
+ }).mapToLong(Long::longValue).sum();
+ } else {
+ value = getFileLineNumber(file.getPath());
+ }
+
+ return value;
+ }
+
+ /**
+ * clear dir and the sub dir
+ *
+ * @param filePath filePath
+ * @return
+ */
+ public static void deleteFile(@NonNull String filePath) {
+ File file = new File(filePath);
+ if (file.exists()) {
+ if (file.isDirectory()) {
+ deleteFiles(file);
+ }
+ file.delete();
+ }
+ }
+
+ private static void deleteFiles(@NonNull File file) {
+ try {
+ File[] files = file.listFiles();
+ for (int i = 0; i < files.length; i++) {
+ File thisFile = files[i];
+ if (thisFile.isDirectory()) {
+ deleteFiles(thisFile);
+ }
+ thisFile.delete();
+ }
+ file.delete();
+
+ } catch (Exception e) {
+ log.error("delete file [" + file.getPath() + "] error");
+ throw new SeaTunnelException(e);
+ }
+ }
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SeaTunnelException.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SeaTunnelException.java
new file mode 100644
index 000000000..750b38424
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SeaTunnelException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+public class SeaTunnelException extends RuntimeException {
+
+ /**
+ * Required for serialization support.
+ *
+ * @see java.io.Serializable
+ */
+ private static final long serialVersionUID = 2263144814025689516L;
+
+ /**
+ * <p>Constructs a new {@code SeaTunnelException} without specified
+ * detail message.</p>
+ */
+ public SeaTunnelException() {
+ }
+
+ /**
+ * <p>Constructs a new {@code SeaTunnelException} with specified
+ * detail message.</p>
+ *
+ * @param msg The error message.
+ */
+ public SeaTunnelException(final String msg) {
+ super(msg);
+ }
+
+ /**
+ * <p>Constructs a new {@code SeaTunnelException} with specified
+ * nested {@code Throwable}.</p>
+ *
+ * @param cause The {@code Exception} or {@code Error}
+ * that caused this exception to be thrown.
+ */
+ public SeaTunnelException(final Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * <p>Constructs a new {@code SeaTunnelException} with specified
+ * detail message and nested {@code Throwable}.</p>
+ *
+ * @param msg The error message.
+ * @param cause The {@code Exception} or {@code Error}
+ * that caused this exception to be thrown.
+ */
+ public SeaTunnelException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java
new file mode 100644
index 000000000..21dbaee08
--- /dev/null
+++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/FileUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import lombok.NonNull;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class FileUtilsTest {
+ @Test
+ public void testGetFileLineNumber() throws Exception {
+ String filePath = "/tmp/test/file_utils/file1.txt";
+ filePath = filePath.replace("/", File.separator);
+ writeTestDataToFile(filePath);
+
+ Long fileLineNumber = FileUtils.getFileLineNumber(filePath);
+ Assertions.assertEquals(100, fileLineNumber);
+ }
+
+ @Test
+ public void testGetFileLineNumberFromDir() throws Exception {
+ String rootPath = "/tmp/test/file_utils1";
+ String dirPath1 = rootPath + "/dir1";
+ String dirPath2 = rootPath + "/dir2";
+
+ String file1 = dirPath1 + "/file1.txt";
+ String file2 = dirPath1 + "/file2.txt";
+ String file3 = dirPath2 + "/file3.txt";
+ String file4 = dirPath2 + "/file4.txt";
+
+ file1 = file1.replace("/", File.separator);
+ file2 = file2.replace("/", File.separator);
+ file3 = file3.replace("/", File.separator);
+ file4 = file4.replace("/", File.separator);
+
+ FileUtils.createNewFile(file1);
+ FileUtils.createNewFile(file2);
+ FileUtils.createNewFile(file3);
+ FileUtils.createNewFile(file4);
+
+ writeTestDataToFile(file1);
+ writeTestDataToFile(file2);
+ writeTestDataToFile(file3);
+ writeTestDataToFile(file4);
+
+ Long lines = FileUtils.getFileLineNumberFromDir(rootPath);
+ Assertions.assertEquals(100 * 4, lines);
+ }
+
+ public void writeTestDataToFile(@NonNull String filePath) throws IOException {
+ FileUtils.createNewFile(filePath);
+
+ try (BufferedWriter bw = new BufferedWriter(new FileWriter(filePath))) {
+ for (int i = 0; i < 100; i++) {
+ bw.write(i + "");
+ bw.newLine();
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 3adfb885a..b73f32784 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -113,7 +113,7 @@ public class FakeDataGenerator {
case NULL:
return null;
case BYTES:
- return RandomUtils.nextBytes(fakeConfig.getBytesLength());
+ return RandomStringUtils.randomAlphabetic(fakeConfig.getBytesLength()).getBytes();
case DATE:
return randomLocalDateTime().toLocalDate();
case TIME:
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
index 76785bb9c..a1b0b1a42 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.e2e;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
@@ -29,9 +30,11 @@ import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import lombok.NonNull;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;
import java.io.File;
import java.util.HashMap;
@@ -45,19 +48,23 @@ import java.util.concurrent.TimeUnit;
*/
public class ClusterFaultToleranceIT {
+ public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";
+
@Test
public void testBatchJobRunOkIn3Node() throws ExecutionException, InterruptedException {
+ String testCaseName = "testBatchJobRunOkIn3Node";
+ String testClusterName = "ClusterFaultToleranceIT_testBatchJobRunOkIn3Node";
HazelcastInstanceImpl node1 =
SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+ TestUtils.getClusterName(testClusterName));
HazelcastInstanceImpl node2 =
SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+ TestUtils.getClusterName(testClusterName));
HazelcastInstanceImpl node3 =
SeaTunnelServerStarter.createHazelcastInstance(
- TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+ TestUtils.getClusterName(testClusterName));
// waiting all node added to cluster
Awaitility.await().atMost(10000, TimeUnit.MILLISECONDS)
@@ -65,23 +72,16 @@ public class ClusterFaultToleranceIT {
// TODO Need FakeSource support parallel first
Common.setDeployMode(DeployMode.CLIENT);
- String targetJobConfigNamePrefix = "cluster_batch_fake_to_localfile_fine";
- Map<String, String> valueMap = new HashMap<>();
- valueMap.put("target_test_job_config_file_name", targetJobConfigNamePrefix);
-
- String targetConfigFilePath =
- File.separator + "tmp" + File.separator + "test_conf" + File.separator + targetJobConfigNamePrefix +
- ".conf";
- TestUtils.createTestConfigFileFromTemplate("cluster_batch_fake_to_localfile_template.conf", valueMap,
- targetConfigFilePath);
+ ImmutablePair<String, String> testResources = createTestResources(testCaseName);
JobConfig jobConfig = new JobConfig();
- jobConfig.setName("test_cluster_fault_worker_batch_job");
+ jobConfig.setName(testCaseName);
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(
- TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRecoveryWhenWorkerDone"));
+ TestUtils.getClusterName(testClusterName));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(targetConfigFilePath, jobConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
@@ -91,5 +91,34 @@ public class ClusterFaultToleranceIT {
Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(
objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
+
+ Long fileLineNumberFromDir = FileUtils.getFileLineNumberFromDir(testResources.getLeft());
+ Assertions.assertEquals(100, fileLineNumberFromDir);
+ }
+
+ /**
+ * Create the test job config file basic on cluster_batch_fake_to_localfile_template.conf
+ * It will delete the test sink target path before return the final job config file path
+ *
+ * @param testCaseName testCaseName
+ * @return
+ */
+ private ImmutablePair<String, String> createTestResources(@NonNull String testCaseName) {
+ Map<String, String> valueMap = new HashMap<>();
+ valueMap.put(DYNAMIC_TEST_CASE_NAME, testCaseName);
+
+ String targetDir = "/tmp/hive/warehouse/" + testCaseName;
+ targetDir = targetDir.replaceAll("/", File.separator);
+
+ // clear target dir before test
+ FileUtils.deleteFile(targetDir);
+
+ String targetConfigFilePath =
+ File.separator + "tmp" + File.separator + "test_conf" + File.separator + testCaseName +
+ ".conf";
+ TestUtils.createTestConfigFileFromTemplate("cluster_batch_fake_to_localfile_template.conf", valueMap,
+ targetConfigFilePath);
+
+ return new ImmutablePair<>(targetDir, targetConfigFilePath);
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index bd843c6dd..35b4e44c4 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -28,7 +28,7 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
- row.num = 10
+ row.num = 100
map.size = 10
array.size = 10
bytes.length = 10
@@ -78,7 +78,7 @@ transform {
sink {
LocalFile {
- path="/tmp/hive/warehouse/${target_test_job_config_file_name}" # target_test_job_config_file_name will be replace to the final file name before test run
+ path="/tmp/hive/warehouse/${dynamic_test_case_name}" # dynamic_test_case_name will be replace to the final file name before test run
field_delimiter="\t"
row_delimiter="\n"
partition_by=["c_string"]