You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/03/14 03:54:16 UTC

[shardingsphere] branch master updated: Refactor PipelineContainerComposer.init() (#24585)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bbf88398ccf Refactor PipelineContainerComposer.init() (#24585)
bbf88398ccf is described below

commit bbf88398ccf2eb980e2dd2d937435a62bfd70de2
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Mar 14 11:54:08 2023 +0800

    Refactor PipelineContainerComposer.init() (#24585)
    
    * Refactor PipelineContainerComposer.init()
    
    * Refactor PipelineContainerComposer.init()
---
 .../e2e/data/pipeline/cases/PipelineContainerComposer.java | 14 +++++---------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java         |  8 ++++----
 .../pipeline/cases/migration/AbstractMigrationE2EIT.java   |  5 +++--
 .../migration/general/MySQLMigrationGeneralE2EIT.java      |  3 +--
 .../migration/general/PostgreSQLMigrationGeneralE2EIT.java |  3 +--
 .../cases/migration/general/RulesMigrationE2EIT.java       |  3 +--
 .../cases/migration/primarykey/IndexesMigrationE2EIT.java  |  3 +--
 .../cases/migration/primarykey/MariaDBMigrationE2EIT.java  |  3 +--
 .../migration/primarykey/TextPrimaryKeyMigrationE2EIT.java |  3 +--
 9 files changed, 18 insertions(+), 27 deletions(-)

diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 90e9398f1ba..2ded13bc995 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases;
 
 import com.google.common.base.Strings;
 import lombok.Getter;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -108,7 +109,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     private Thread increaseTaskThread;
     
-    public PipelineContainerComposer(final PipelineTestParameter testParam) {
+    public PipelineContainerComposer(final PipelineTestParameter testParam, final JobType jobType) {
         databaseType = testParam.getDatabaseType();
         containerComposer = PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.DOCKER
                 ? new DockerContainerComposer(testParam.getDatabaseType(), testParam.getStorageContainerImage(), testParam.getStorageContainerCount())
@@ -123,16 +124,11 @@ public final class PipelineContainerComposer implements AutoCloseable {
         }
         extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineContainerComposer.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
         containerComposer.start();
+        init(jobType);
     }
     
-    /**
-     * Initialize environment.
-     * 
-     * @param databaseType database type
-     * @param jobType job type
-     * @throws SQLException SQL exception
-     */
-    public void initEnvironment(final DatabaseType databaseType, final JobType jobType) throws SQLException {
+    @SneakyThrows(SQLException.class)
+    private void init(final JobType jobType) {
         sourceDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(getActualJdbcUrlTemplate(DS_0, false)), username, password);
         proxyDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)),
                 ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 436bf08d945..cf25cc7e543 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -96,7 +96,7 @@ public final class CDCE2EIT {
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
     
     public CDCE2EIT(final PipelineTestParameter testParam) {
-        containerComposer = new PipelineContainerComposer(testParam);
+        containerComposer = new PipelineContainerComposer(testParam, new CDCJobType());
     }
     
     @Parameters(name = "{0}")
@@ -125,7 +125,6 @@ public final class CDCE2EIT {
     public void assertCDCDataImportSuccess() throws SQLException, InterruptedException {
         // make sure the program time zone same with the database server at CI.
         TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-        containerComposer.initEnvironment(containerComposer.getDatabaseType(), new CDCJobType());
         for (String each : Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)) {
             containerComposer.registerStorageUnit(each);
         }
@@ -207,8 +206,9 @@ public final class CDCE2EIT {
     }
     
     private List<Map<String, Object>> listOrderRecords(final String tableNameWithSchema) throws SQLException {
-        try (Connection connection = DriverManager.getConnection(
-                containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false), containerComposer.getUsername(), containerComposer.getPassword())) {
+        try (
+                Connection connection = DriverManager.getConnection(
+                        containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false), containerComposer.getUsername(), containerComposer.getPassword())) {
             ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", tableNameWithSchema));
             return containerComposer.transformResultSetToList(resultSet);
         }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 39b5d7003c1..1990027d006 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
@@ -52,8 +53,8 @@ public abstract class AbstractMigrationE2EIT {
     
     private final PipelineContainerComposer containerComposer;
     
-    public AbstractMigrationE2EIT(final PipelineTestParameter testParam) {
-        containerComposer = new PipelineContainerComposer(testParam);
+    public AbstractMigrationE2EIT(final PipelineTestParameter testParam, final JobType jobType) {
+        containerComposer = new PipelineContainerComposer(testParam, jobType);
         migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(AbstractMigrationE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")), MigrationDistSQLCommand.class);
     }
     
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index fc25f0bb2ac..837137243e9 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -55,7 +55,7 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
     
     public MySQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
-        super(testParam);
+        super(testParam, new MigrationJobType());
     }
     
     @Parameters(name = "{0}")
@@ -73,7 +73,6 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
     @Test
     public void assertMigrationSuccess() throws SQLException, InterruptedException {
-        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         addMigrationProcessConfig();
         getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
         getContainerComposer().createSourceOrderItemTable();
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 118778126bb..6f425f4f4a3 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -53,7 +53,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
     
     public PostgreSQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
-        super(testParam);
+        super(testParam, new MigrationJobType());
     }
     
     @Parameters(name = "{0}")
@@ -73,7 +73,6 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
     
     @Test
     public void assertMigrationSuccess() throws SQLException, InterruptedException {
-        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         addMigrationProcessConfig();
         createSourceSchema(PipelineContainerComposer.SCHEMA_NAME);
         getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index bd28b68bed1..0e5d24adc19 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -53,7 +53,7 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
     
     public RulesMigrationE2EIT(final PipelineTestParameter testParam) {
-        super(testParam);
+        super(testParam, new MigrationJobType());
     }
     
     @Parameters(name = "{0}")
@@ -84,7 +84,6 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     }
     
     private void assertMigrationSuccess(final Callable<Void> addRuleFn) throws Exception {
-        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
         try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
             PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, new UUIDKeyGenerateAlgorithm(), SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index fdef91f9a62..90bae9da72f 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -68,7 +68,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
     
     public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
-        super(testParam);
+        super(testParam, new MigrationJobType());
     }
     
     @Parameters(name = "{0}")
@@ -183,7 +183,6 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private void assertMigrationSuccess(final String sqlPattern, final String shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
                                         final String consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws Exception {
-        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_ORDER_NAME));
         try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
             PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 3dc39456ea6..c327c3f1c5b 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -50,7 +50,7 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
     
     public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
-        super(testParam);
+        super(testParam, new MigrationJobType());
     }
     
     @Parameters(name = "{0}")
@@ -70,7 +70,6 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @Test
     public void assertMigrationSuccess() throws SQLException, InterruptedException {
-        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
         getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_ORDER_NAME));
         try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index fd92fb88356..c2b652ce7fa 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -48,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     
     public TextPrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam) {
-        super(testParam);
+        super(testParam, new MigrationJobType());
     }
     
     @Parameters(name = "{0}")
@@ -71,7 +71,6 @@ public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @Test
     public void assertTextPrimaryMigrationSuccess() throws SQLException, InterruptedException {
-        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         getContainerComposer().createSourceOrderTable(getSourceTableOrderName());
         try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
             UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();