You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/03/14 03:54:16 UTC
[shardingsphere] branch master updated: Refactor PipelineContainerComposer.init() (#24585)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bbf88398ccf Refactor PipelineContainerComposer.init() (#24585)
bbf88398ccf is described below
commit bbf88398ccf2eb980e2dd2d937435a62bfd70de2
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Mar 14 11:54:08 2023 +0800
Refactor PipelineContainerComposer.init() (#24585)
* Refactor PipelineContainerComposer.init()
* Refactor PipelineContainerComposer.init()
---
.../e2e/data/pipeline/cases/PipelineContainerComposer.java | 14 +++++---------
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 8 ++++----
.../pipeline/cases/migration/AbstractMigrationE2EIT.java | 5 +++--
.../migration/general/MySQLMigrationGeneralE2EIT.java | 3 +--
.../migration/general/PostgreSQLMigrationGeneralE2EIT.java | 3 +--
.../cases/migration/general/RulesMigrationE2EIT.java | 3 +--
.../cases/migration/primarykey/IndexesMigrationE2EIT.java | 3 +--
.../cases/migration/primarykey/MariaDBMigrationE2EIT.java | 3 +--
.../migration/primarykey/TextPrimaryKeyMigrationE2EIT.java | 3 +--
9 files changed, 18 insertions(+), 27 deletions(-)
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 90e9398f1ba..2ded13bc995 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases;
import com.google.common.base.Strings;
import lombok.Getter;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -108,7 +109,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
private Thread increaseTaskThread;
- public PipelineContainerComposer(final PipelineTestParameter testParam) {
+ public PipelineContainerComposer(final PipelineTestParameter testParam, final JobType jobType) {
databaseType = testParam.getDatabaseType();
containerComposer = PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.DOCKER
? new DockerContainerComposer(testParam.getDatabaseType(), testParam.getStorageContainerImage(), testParam.getStorageContainerCount())
@@ -123,16 +124,11 @@ public final class PipelineContainerComposer implements AutoCloseable {
}
extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineContainerComposer.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
containerComposer.start();
+ init(jobType);
}
- /**
- * Initialize environment.
- *
- * @param databaseType database type
- * @param jobType job type
- * @throws SQLException SQL exception
- */
- public void initEnvironment(final DatabaseType databaseType, final JobType jobType) throws SQLException {
+ @SneakyThrows(SQLException.class)
+ private void init(final JobType jobType) {
sourceDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(getActualJdbcUrlTemplate(DS_0, false)), username, password);
proxyDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)),
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 436bf08d945..cf25cc7e543 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -96,7 +96,7 @@ public final class CDCE2EIT {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public CDCE2EIT(final PipelineTestParameter testParam) {
- containerComposer = new PipelineContainerComposer(testParam);
+ containerComposer = new PipelineContainerComposer(testParam, new CDCJobType());
}
@Parameters(name = "{0}")
@@ -125,7 +125,6 @@ public final class CDCE2EIT {
public void assertCDCDataImportSuccess() throws SQLException, InterruptedException {
// make sure the program time zone same with the database server at CI.
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
- containerComposer.initEnvironment(containerComposer.getDatabaseType(), new CDCJobType());
for (String each : Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)) {
containerComposer.registerStorageUnit(each);
}
@@ -207,8 +206,9 @@ public final class CDCE2EIT {
}
private List<Map<String, Object>> listOrderRecords(final String tableNameWithSchema) throws SQLException {
- try (Connection connection = DriverManager.getConnection(
- containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false), containerComposer.getUsername(), containerComposer.getPassword())) {
+ try (
+ Connection connection = DriverManager.getConnection(
+ containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false), containerComposer.getUsername(), containerComposer.getPassword())) {
ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", tableNameWithSchema));
return containerComposer.transformResultSetToList(resultSet);
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 39b5d7003c1..1990027d006 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
@@ -52,8 +53,8 @@ public abstract class AbstractMigrationE2EIT {
private final PipelineContainerComposer containerComposer;
- public AbstractMigrationE2EIT(final PipelineTestParameter testParam) {
- containerComposer = new PipelineContainerComposer(testParam);
+ public AbstractMigrationE2EIT(final PipelineTestParameter testParam, final JobType jobType) {
+ containerComposer = new PipelineContainerComposer(testParam, jobType);
migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(AbstractMigrationE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")), MigrationDistSQLCommand.class);
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index fc25f0bb2ac..837137243e9 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -55,7 +55,7 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
public MySQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
- super(testParam);
+ super(testParam, new MigrationJobType());
}
@Parameters(name = "{0}")
@@ -73,7 +73,6 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
@Test
public void assertMigrationSuccess() throws SQLException, InterruptedException {
- getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
addMigrationProcessConfig();
getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
getContainerComposer().createSourceOrderItemTable();
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 118778126bb..6f425f4f4a3 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -53,7 +53,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
public PostgreSQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
- super(testParam);
+ super(testParam, new MigrationJobType());
}
@Parameters(name = "{0}")
@@ -73,7 +73,6 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
@Test
public void assertMigrationSuccess() throws SQLException, InterruptedException {
- getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
addMigrationProcessConfig();
createSourceSchema(PipelineContainerComposer.SCHEMA_NAME);
getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index bd28b68bed1..0e5d24adc19 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -53,7 +53,7 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
public RulesMigrationE2EIT(final PipelineTestParameter testParam) {
- super(testParam);
+ super(testParam, new MigrationJobType());
}
@Parameters(name = "{0}")
@@ -84,7 +84,6 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
}
private void assertMigrationSuccess(final Callable<Void> addRuleFn) throws Exception {
- getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, new UUIDKeyGenerateAlgorithm(), SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index fdef91f9a62..90bae9da72f 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -68,7 +68,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
- super(testParam);
+ super(testParam, new MigrationJobType());
}
@Parameters(name = "{0}")
@@ -183,7 +183,6 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
private void assertMigrationSuccess(final String sqlPattern, final String shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
final String consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws Exception {
- getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_ORDER_NAME));
try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 3dc39456ea6..c327c3f1c5b 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -50,7 +50,7 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
- super(testParam);
+ super(testParam, new MigrationJobType());
}
@Parameters(name = "{0}")
@@ -70,7 +70,6 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
@Test
public void assertMigrationSuccess() throws SQLException, InterruptedException {
- getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_ORDER_NAME));
try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index fd92fb88356..c2b652ce7fa 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -48,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
public TextPrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam) {
- super(testParam);
+ super(testParam, new MigrationJobType());
}
@Parameters(name = "{0}")
@@ -71,7 +71,6 @@ public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
@Test
public void assertTextPrimaryMigrationSuccess() throws SQLException, InterruptedException {
- getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
getContainerComposer().createSourceOrderTable(getSourceTableOrderName());
try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();