You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ji...@apache.org on 2023/03/13 16:23:40 UTC
[shardingsphere] branch master updated: Add PipelineContainerComposer (#24582)
This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 94062ee1c2d Add PipelineContainerComposer (#24582)
94062ee1c2d is described below
commit 94062ee1c2d4e0b9d66f8e9d23996feb816168b8
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Mar 14 00:23:21 2023 +0800
Add PipelineContainerComposer (#24582)
* Add PipelineContainerComposer
* Add PipelineContainerComposer
---
...seE2EIT.java => PipelineContainerComposer.java} | 280 ++++++++++++++++-----
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 100 ++++----
.../cases/migration/AbstractMigrationE2EIT.java | 77 +++---
.../general/MySQLMigrationGeneralE2EIT.java | 44 ++--
.../general/PostgreSQLMigrationGeneralE2EIT.java | 64 +++--
.../migration/general/RulesMigrationE2EIT.java | 32 ++-
.../primarykey/IndexesMigrationE2EIT.java | 56 ++---
.../primarykey/MariaDBMigrationE2EIT.java | 36 ++-
.../primarykey/TextPrimaryKeyMigrationE2EIT.java | 42 ++--
9 files changed, 432 insertions(+), 299 deletions(-)
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
similarity index 67%
rename from test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
rename to test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 5ae6eef95e1..90e9398f1ba 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.e2e.data.pipeline.cases.base;
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases;
import com.google.common.base.Strings;
-import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -29,6 +28,7 @@ import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
import org.apache.shardingsphere.test.e2e.data.pipeline.command.ExtraSQLCommand;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
@@ -36,7 +36,6 @@ import org.apache.shardingsphere.test.e2e.data.pipeline.framework.container.comp
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.container.compose.DockerContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.container.compose.NativeContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.apache.shardingsphere.test.e2e.data.pipeline.framework.watcher.PipelineWatcher;
import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
import org.apache.shardingsphere.test.e2e.env.container.atomic.storage.DockerStorageContainer;
import org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
@@ -44,7 +43,6 @@ import org.apache.shardingsphere.test.e2e.env.container.atomic.util.StorageConta
import org.apache.shardingsphere.test.e2e.env.runtime.DataSourceEnvironment;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
-import org.junit.Rule;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import javax.sql.DataSource;
@@ -71,33 +69,28 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@Getter(AccessLevel.PROTECTED)
+/**
+ * Pipeline container composer.
+ */
+@Getter
@Slf4j
-public abstract class PipelineBaseE2EIT {
-
- protected static final PipelineE2EEnvironment ENV = PipelineE2EEnvironment.getInstance();
-
- protected static final String SCHEMA_NAME = "test";
-
- protected static final String PROXY_DATABASE = "sharding_db";
+public final class PipelineContainerComposer implements AutoCloseable {
- protected static final String DS_0 = "pipeline_it_0";
+ public static final String SCHEMA_NAME = "test";
- protected static final String DS_1 = "pipeline_it_1";
+ public static final String PROXY_DATABASE = "sharding_db";
- protected static final String DS_2 = "pipeline_it_2";
+ public static final String DS_0 = "pipeline_it_0";
- protected static final String DS_3 = "pipeline_it_3";
+ public static final String DS_1 = "pipeline_it_1";
- protected static final String DS_4 = "pipeline_it_4";
+ public static final String DS_2 = "pipeline_it_2";
- protected static final int TABLE_INIT_ROW_COUNT = 3000;
+ public static final String DS_3 = "pipeline_it_3";
- private static final String REGISTER_STORAGE_UNIT_SQL = "REGISTER STORAGE UNIT ${ds} ( URL='${url}', USER='${user}', PASSWORD='${password}')";
+ public static final String DS_4 = "pipeline_it_4";
- @Rule
- @Getter(AccessLevel.NONE)
- public PipelineWatcher pipelineWatcher;
+ public static final int TABLE_INIT_ROW_COUNT = 3000;
private final BaseContainerComposer containerComposer;
@@ -115,24 +108,31 @@ public abstract class PipelineBaseE2EIT {
private Thread increaseTaskThread;
- public PipelineBaseE2EIT(final PipelineTestParameter testParam) {
+ public PipelineContainerComposer(final PipelineTestParameter testParam) {
databaseType = testParam.getDatabaseType();
- containerComposer = ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER
+ containerComposer = PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.DOCKER
? new DockerContainerComposer(testParam.getDatabaseType(), testParam.getStorageContainerImage(), testParam.getStorageContainerCount())
: new NativeContainerComposer(testParam.getDatabaseType());
- if (ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER) {
+ if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.DOCKER) {
DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
username = storageContainer.getUsername();
password = storageContainer.getPassword();
} else {
- username = ENV.getActualDataSourceUsername(databaseType);
- password = ENV.getActualDataSourcePassword(databaseType);
+ username = PipelineE2EEnvironment.getInstance().getActualDataSourceUsername(databaseType);
+ password = PipelineE2EEnvironment.getInstance().getActualDataSourcePassword(databaseType);
}
- extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineBaseE2EIT.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
- pipelineWatcher = new PipelineWatcher(containerComposer);
+ extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineContainerComposer.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
+ containerComposer.start();
}
- protected void initEnvironment(final DatabaseType databaseType, final JobType jobType) throws SQLException {
+ /**
+ * Initialize environment.
+ *
+ * @param databaseType database type
+ * @param jobType job type
+ * @throws SQLException SQL exception
+ */
+ public void initEnvironment(final DatabaseType databaseType, final JobType jobType) throws SQLException {
sourceDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(getActualJdbcUrlTemplate(DS_0, false)), username, password);
proxyDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)),
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
@@ -149,7 +149,13 @@ public abstract class PipelineBaseE2EIT {
cleanUpDataSource();
}
- protected String appendExtraParam(final String jdbcUrl) {
+ /**
+ * Append extra parameter.
+ *
+ * @param jdbcUrl JDBC URL
+ * @return appended JDBC URL
+ */
+ public String appendExtraParam(final String jdbcUrl) {
String result = jdbcUrl;
if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
result = new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("rewriteBatchedStatements", Boolean.TRUE.toString())));
@@ -161,7 +167,7 @@ public abstract class PipelineBaseE2EIT {
}
private void cleanUpProxyDatabase(final Connection connection) {
- if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+ if (PipelineEnvTypeEnum.NATIVE != PipelineE2EEnvironment.getInstance().getItEnvType()) {
return;
}
try {
@@ -173,7 +179,7 @@ public abstract class PipelineBaseE2EIT {
}
private void cleanUpPipelineJobs(final Connection connection, final JobType jobType) throws SQLException {
- if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+ if (PipelineEnvTypeEnum.NATIVE != PipelineE2EEnvironment.getInstance().getItEnvType()) {
return;
}
String jobTypeName = jobType.getTypeName();
@@ -201,7 +207,7 @@ public abstract class PipelineBaseE2EIT {
}
private void cleanUpDataSource() {
- if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+ if (PipelineEnvTypeEnum.NATIVE != PipelineE2EEnvironment.getInstance().getItEnvType()) {
return;
}
for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
@@ -216,40 +222,77 @@ public abstract class PipelineBaseE2EIT {
ThreadUtil.sleep(2, TimeUnit.SECONDS);
}
- protected void registerStorageUnit(final String storageUnitName) throws SQLException {
- String registerStorageUnitTemplate = REGISTER_STORAGE_UNIT_SQL.replace("${ds}", storageUnitName)
+ /**
+ * Register storage unit.
+ *
+ * @param storageUnitName storage unit name
+ * @throws SQLException SQL exception
+ */
+ public void registerStorageUnit(final String storageUnitName) throws SQLException {
+ String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} ( URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", storageUnitName)
.replace("${user}", getUsername())
.replace("${password}", getPassword())
.replace("${url}", appendExtraParam(getActualJdbcUrlTemplate(storageUnitName, true)));
proxyExecuteWithLog(registerStorageUnitTemplate, 2);
}
+ /**
+ * Add resource.
+ *
+ * @param distSQL dist SQL
+ * @throws SQLException SQL exception
+ */
// TODO Use registerStorageUnit instead, and remove the method
- protected void addResource(final String distSQL) throws SQLException {
+ public void addResource(final String distSQL) throws SQLException {
proxyExecuteWithLog(distSQL, 2);
}
- protected String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer, final int storageContainerIndex) {
- if (PipelineEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
+ /**
+ * Get actual JDBC URL template.
+ *
+ * @param databaseName database name
+ * @param isInContainer is in container
+ * @param storageContainerIndex storage container index
+ * @return actual JDBC URL template
+ */
+ public String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer, final int storageContainerIndex) {
+ if (PipelineEnvTypeEnum.DOCKER == PipelineE2EEnvironment.getInstance().getItEnvType()) {
DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(storageContainerIndex);
return isInContainer
? DataSourceEnvironment.getURL(getDatabaseType(), storageContainer.getNetworkAliases().get(0), storageContainer.getExposedPort(), databaseName)
: storageContainer.getJdbcUrl(databaseName);
}
- return DataSourceEnvironment.getURL(getDatabaseType(), "127.0.0.1", ENV.getActualDataSourceDefaultPort(databaseType), databaseName);
+ return DataSourceEnvironment.getURL(getDatabaseType(), "127.0.0.1", PipelineE2EEnvironment.getInstance().getActualDataSourceDefaultPort(databaseType), databaseName);
}
- protected String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer) {
+ /**
+ * Get actual JDBC URL template.
+ *
+ * @param databaseName database name
+ * @param isInContainer is in container
+ * @return actual JDBC URL template
+ */
+ public String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer) {
return getActualJdbcUrlTemplate(databaseName, isInContainer, 0);
}
- protected abstract String getSourceTableOrderName();
-
- protected String getTargetTableOrderName() {
+ /**
+ * Get target table order name.
+ *
+ * @return target table order name
+ */
+ public String getTargetTableOrderName() {
return "t_order";
}
- protected void createSchema(final Connection connection, final int sleepSeconds) throws SQLException {
+ /**
+ * Create schema.
+ *
+ * @param connection connection
+ * @param sleepSeconds sleep seconds
+ * @throws SQLException SQL exception
+ */
+ public void createSchema(final Connection connection, final int sleepSeconds) throws SQLException {
if (!getDatabaseType().isSchemaAvailable()) {
return;
}
@@ -259,34 +302,72 @@ public abstract class PipelineBaseE2EIT {
}
}
- protected void createSourceOrderTable() throws SQLException {
- sourceExecuteWithLog(getExtraSQLCommand().getCreateTableOrder(getSourceTableOrderName()));
+ /**
+ * Create source order table.
+ *
+ * @param sourceTableOrderName source table order name
+ * @throws SQLException SQL exception
+ */
+ public void createSourceOrderTable(final String sourceTableOrderName) throws SQLException {
+ sourceExecuteWithLog(getExtraSQLCommand().getCreateTableOrder(sourceTableOrderName));
}
- protected void createSourceTableIndexList(final String schema) throws SQLException {
+ /**
+ * Create source table index list.
+ *
+ * @param schema schema
+ * @param sourceTableOrderName source table order name
+ * @throws SQLException SQL exception
+ */
+ public void createSourceTableIndexList(final String schema, final String sourceTableOrderName) throws SQLException {
if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
- sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS idx_user_id ON %s.%s ( user_id )", schema, getSourceTableOrderName()));
+ sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS idx_user_id ON %s.%s ( user_id )", schema, sourceTableOrderName));
} else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
- sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON %s.%s ( user_id )", schema, getSourceTableOrderName()));
+ sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON %s.%s ( user_id )", schema, sourceTableOrderName));
}
}
- protected void createSourceCommentOnList(final String schema) throws SQLException {
- sourceExecuteWithLog(String.format("COMMENT ON COLUMN %s.%s.user_id IS 'user id'", schema, getSourceTableOrderName()));
+ /**
+ * Create source comment on list.
+ *
+ * @param schema schema
+ * @param sourceTableOrderName source table order name
+ * @throws SQLException SQL exception
+ */
+ public void createSourceCommentOnList(final String schema, final String sourceTableOrderName) throws SQLException {
+ sourceExecuteWithLog(String.format("COMMENT ON COLUMN %s.%s.user_id IS 'user id'", schema, sourceTableOrderName));
}
- protected void createSourceOrderItemTable() throws SQLException {
+ /**
+ * Create source order item table.
+ *
+ * @throws SQLException SQL exception
+ */
+ public void createSourceOrderItemTable() throws SQLException {
sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
}
- protected void sourceExecuteWithLog(final String sql) throws SQLException {
+ /**
+ * Source execute with log.
+ *
+ * @param sql SQL
+ * @throws SQLException SQL exception
+ */
+ public void sourceExecuteWithLog(final String sql) throws SQLException {
log.info("source execute :{}", sql);
try (Connection connection = sourceDataSource.getConnection()) {
connection.createStatement().execute(sql);
}
}
- protected void proxyExecuteWithLog(final String sql, final int sleepSeconds) throws SQLException {
+ /**
+ * Proxy execute with log.
+ *
+ * @param sql SQL
+ * @param sleepSeconds sleep seconds
+ * @throws SQLException SQL exception
+ */
+ public void proxyExecuteWithLog(final String sql, final int sleepSeconds) throws SQLException {
log.info("proxy execute :{}", sql);
try (Connection connection = proxyDataSource.getConnection()) {
connection.createStatement().execute(sql);
@@ -294,7 +375,12 @@ public abstract class PipelineBaseE2EIT {
ThreadUtil.sleep(Math.max(sleepSeconds, 0), TimeUnit.SECONDS);
}
- protected void waitJobPrepareSuccess(final String distSQL) {
+ /**
+ * Wait job prepare success.
+ *
+ * @param distSQL dist SQL
+ */
+ public void waitJobPrepareSuccess(final String distSQL) {
for (int i = 0; i < 5; i++) {
List<Map<String, Object>> jobStatus = queryForListWithLog(distSQL);
Set<String> statusSet = jobStatus.stream().map(each -> String.valueOf(each.get("status"))).collect(Collectors.toSet());
@@ -304,7 +390,13 @@ public abstract class PipelineBaseE2EIT {
}
}
- protected List<Map<String, Object>> queryForListWithLog(final String sql) {
+ /**
+ * Query for list with log.
+ *
+ * @param sql SQL
+ * @return query result
+ */
+ public List<Map<String, Object>> queryForListWithLog(final String sql) {
int retryNumber = 0;
while (retryNumber <= 3) {
try (Connection connection = proxyDataSource.getConnection()) {
@@ -319,7 +411,14 @@ public abstract class PipelineBaseE2EIT {
throw new RuntimeException("can't get result from proxy");
}
- protected List<Map<String, Object>> transformResultSetToList(final ResultSet resultSet) throws SQLException {
+ /**
+ * Transform result set to list.
+ *
+ * @param resultSet result set
+ * @return transformed result
+ * @throws SQLException SQL exception
+ */
+ public List<Map<String, Object>> transformResultSetToList(final ResultSet resultSet) throws SQLException {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int columns = resultSetMetaData.getColumnCount();
List<Map<String, Object>> result = new ArrayList<>();
@@ -333,13 +432,25 @@ public abstract class PipelineBaseE2EIT {
return result;
}
- protected void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
+ /**
+ * Start increment task.
+ *
+ * @param baseIncrementTask base increment task
+ */
+ public void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
increaseTaskThread = new Thread(baseIncrementTask);
increaseTaskThread.start();
}
+ /**
+ * Wait increment task finished.
+ *
+ * @param distSQL dist SQL
+ * @return result
+ * @throws InterruptedException interrupted exception
+ */
// TODO use DAO to query via DistSQL
- protected List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) throws InterruptedException {
+ public List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) throws InterruptedException {
if (null != increaseTaskThread) {
TimeUnit.SECONDS.timedJoin(increaseTaskThread, 30);
}
@@ -368,7 +479,13 @@ public abstract class PipelineBaseE2EIT {
return Collections.emptyList();
}
- protected void assertProxyOrderRecordExist(final String tableName, final Object orderId) {
+ /**
+ * Assert proxy order record exist.
+ *
+ * @param tableName table name
+ * @param orderId order id
+ */
+ public void assertProxyOrderRecordExist(final String tableName, final Object orderId) {
String sql;
if (orderId instanceof String) {
sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'", tableName, orderId);
@@ -378,7 +495,12 @@ public abstract class PipelineBaseE2EIT {
assertProxyOrderRecordExist(sql);
}
- protected void assertProxyOrderRecordExist(final String sql) {
+ /**
+ * Assert proxy order record exist.
+ *
+ * @param sql SQL
+ */
+ public void assertProxyOrderRecordExist(final String sql) {
boolean recordExist = false;
for (int i = 0; i < 5; i++) {
List<Map<String, Object>> result = queryForListWithLog(sql);
@@ -391,23 +513,41 @@ public abstract class PipelineBaseE2EIT {
assertTrue(recordExist, "The insert record must exist after the stop");
}
- protected int getTargetTableRecordsCount(final String tableName) {
+ /**
+ * Get target table records count.
+ *
+ * @param tableName table name
+ * @return target table records count
+ */
+ public int getTargetTableRecordsCount(final String tableName) {
List<Map<String, Object>> targetList = queryForListWithLog("SELECT COUNT(1) AS count FROM " + tableName);
assertFalse(targetList.isEmpty());
return ((Number) targetList.get(0).get("count")).intValue();
}
- protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) {
+ /**
+ * Assert greater than order table init rows.
+ *
+ * @param tableInitRows table init rows
+ * @param schema schema
+ */
+ public void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) {
String tableName = Strings.isNullOrEmpty(schema) ? "t_order" : String.format("%s.t_order", schema);
int recordsCount = getTargetTableRecordsCount(tableName);
assertTrue(recordsCount > tableInitRows, "actual count " + recordsCount);
}
+ /**
+ * Generate ShardingSphere data source from proxy.
+ *
+ * @return ShardingSphere data source
+ * @throws SQLException SQL exception
+ */
// TODO proxy support for some fields still needs to be optimized, such as binary of MySQL, after these problems are optimized, Proxy dataSource can be used.
- protected DataSource generateShardingSphereDataSourceFromProxy() throws SQLException {
+ public DataSource generateShardingSphereDataSourceFromProxy() throws SQLException {
Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !getYamlRootConfig().getRules().isEmpty());
YamlRootConfiguration rootConfig = getYamlRootConfig();
- if (PipelineEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
+ if (PipelineEnvTypeEnum.DOCKER == PipelineE2EEnvironment.getInstance().getItEnvType()) {
DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
String sourceUrl = String.join(":", storageContainer.getNetworkAliases().get(0), Integer.toString(storageContainer.getExposedPort()));
String targetUrl = String.join(":", storageContainer.getHost(), Integer.toString(storageContainer.getMappedPort()));
@@ -422,7 +562,11 @@ public abstract class PipelineBaseE2EIT {
}
private YamlRootConfiguration getYamlRootConfig() {
- String result = queryForListWithLog("EXPORT DATABASE CONFIGURATION").get(0).get("result").toString();
- return YamlEngine.unmarshal(result, YamlRootConfiguration.class);
+ return YamlEngine.unmarshal(queryForListWithLog("EXPORT DATABASE CONFIGURATION").get(0).get("result").toString(), YamlRootConfiguration.class);
+ }
+
+ @Override
+ public void close() {
+ containerComposer.stop();
}
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 9cf6cd9164d..436bf08d945 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -39,14 +39,16 @@ import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
import org.apache.shardingsphere.test.e2e.env.container.atomic.util.StorageContainerUtil;
+import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -74,87 +76,93 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * MySQL CDC E2E IT.
+ * CDC E2E IT.
*/
@RunWith(Parameterized.class)
@Slf4j
-public final class CDCE2EIT extends PipelineBaseE2EIT {
+public final class CDCE2EIT {
private static final String CREATE_SHARDING_RULE_SQL = String.format("CREATE SHARDING TABLE RULE t_order("
+ "STORAGE_UNITS(%s,%s),"
+ "SHARDING_COLUMN=user_id,"
+ "TYPE(NAME='hash_mod',PROPERTIES('sharding-count'='4')),"
+ "KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME='snowflake'))"
- + ")", DS_0, DS_1);
+ + ")", PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1);
+
+ private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
+
+ private final PipelineContainerComposer containerComposer;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public CDCE2EIT(final PipelineTestParameter testParam) {
- super(testParam);
+ containerComposer = new PipelineContainerComposer(testParam);
}
@Parameters(name = "{0}")
public static Collection<PipelineTestParameter> getTestParameters() {
Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+ if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
MySQLDatabaseType mysqlDatabaseType = new MySQLDatabaseType();
- for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(mysqlDatabaseType)) {
+ for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(mysqlDatabaseType)) {
result.add(new PipelineTestParameter(mysqlDatabaseType, each, "env/scenario/general/mysql.xml"));
}
OpenGaussDatabaseType openGaussDatabaseType = new OpenGaussDatabaseType();
- for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(openGaussDatabaseType)) {
+ for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(openGaussDatabaseType)) {
result.add(new PipelineTestParameter(openGaussDatabaseType, each, "env/scenario/general/postgresql.xml"));
}
return result;
}
- @Override
- protected String getSourceTableOrderName() {
- return "t_order";
+ @After
+ public void tearDown() {
+ containerComposer.close();
}
@Test
public void assertCDCDataImportSuccess() throws SQLException, InterruptedException {
// make sure the program time zone same with the database server at CI.
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
- initEnvironment(getDatabaseType(), new CDCJobType());
- for (String each : Arrays.asList(DS_0, DS_1)) {
- registerStorageUnit(each);
+ containerComposer.initEnvironment(containerComposer.getDatabaseType(), new CDCJobType());
+ for (String each : Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)) {
+ containerComposer.registerStorageUnit(each);
}
createOrderTableRule();
- try (Connection connection = getProxyDataSource().getConnection()) {
+ try (Connection connection = containerComposer.getProxyDataSource().getConnection()) {
initSchemaAndTable(connection, 2);
}
- DataSource jdbcDataSource = generateShardingSphereDataSourceFromProxy();
- Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(getDatabaseType(), 20);
+ DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
+ Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 20);
log.info("init data begin: {}", LocalDateTime.now());
- DataSourceExecuteUtil.execute(jdbcDataSource, getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
+ DataSourceExecuteUtil.execute(jdbcDataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_ORDER_NAME), dataPair.getLeft());
log.info("init data end: {}", LocalDateTime.now());
- try (Connection connection = DriverManager.getConnection(getActualJdbcUrlTemplate(DS_4, false), getUsername(), getPassword())) {
+ try (Connection connection = DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
+ containerComposer.getUsername(), containerComposer.getPassword())) {
initSchemaAndTable(connection, 0);
}
startCDCClient();
- Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !queryForListWithLog("SHOW STREAMING LIST").isEmpty());
- String jobId = queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
- waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
- startIncrementTask(new E2EIncrementalTask(jdbcDataSource, getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 20));
- getIncreaseTaskThread().join(10000);
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
+ String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
+ containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
+ containerComposer.startIncrementTask(new E2EIncrementalTask(jdbcDataSource, SOURCE_TABLE_ORDER_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
+ containerComposer.getIncreaseTaskThread().join(10000L);
List<Map<String, Object>> actualProxyList;
try (Connection connection = jdbcDataSource.getConnection()) {
ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", getOrderTableNameWithSchema()));
- actualProxyList = transformResultSetToList(resultSet);
+ actualProxyList = containerComposer.transformResultSetToList(resultSet);
}
Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> listOrderRecords(getOrderTableNameWithSchema()).size() == actualProxyList.size());
- SchemaTableName schemaTableName = getDatabaseType().isSchemaAvailable()
- ? new SchemaTableName(new SchemaName(PipelineBaseE2EIT.SCHEMA_NAME), new TableName(getSourceTableOrderName()))
- : new SchemaTableName(new SchemaName(null), new TableName(getSourceTableOrderName()));
- PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(getActualJdbcUrlTemplate(DS_4, false), getUsername(), getPassword()),
- getDatabaseType());
- PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, getDatabaseType());
+ SchemaTableName schemaTableName = containerComposer.getDatabaseType().isSchemaAvailable()
+ ? new SchemaTableName(new SchemaName(PipelineContainerComposer.SCHEMA_NAME), new TableName(SOURCE_TABLE_ORDER_NAME))
+ : new SchemaTableName(new SchemaName(null), new TableName(SOURCE_TABLE_ORDER_NAME));
+ PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(
+ containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
+ containerComposer.getUsername(), containerComposer.getPassword()), containerComposer.getDatabaseType());
+ PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource);
- PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(PipelineBaseE2EIT.SCHEMA_NAME, "t_order");
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(PipelineContainerComposer.SCHEMA_NAME, "t_order");
PipelineColumnMetaData primaryKeyMetaData = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0);
SingleTableInventoryDataConsistencyChecker checker = new SingleTableInventoryDataConsistencyChecker("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
@@ -164,12 +172,12 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
}
private void createOrderTableRule() throws SQLException {
- proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
+ containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
}
private void initSchemaAndTable(final Connection connection, final int sleepSeconds) throws SQLException {
- createSchema(connection, sleepSeconds);
- String sql = getExtraSQLCommand().getCreateTableOrder(getSourceTableOrderName());
+ containerComposer.createSchema(connection, sleepSeconds);
+ String sql = containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_ORDER_NAME);
log.info("create table sql: {}", sql);
connection.createStatement().execute(sql);
if (sleepSeconds > 0) {
@@ -178,18 +186,19 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
}
private void startCDCClient() {
- ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter(appendExtraParam(getActualJdbcUrlTemplate(DS_4, false, 0)), getUsername(), getPassword());
+ ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter(containerComposer.appendExtraParam(
+ containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false, 0)), containerComposer.getUsername(), containerComposer.getPassword());
StartCDCClientParameter parameter = new StartCDCClientParameter(importDataSourceParam);
parameter.setAddress("localhost");
- parameter.setPort(getContainerComposer().getProxyCDCPort());
+ parameter.setPort(containerComposer.getContainerComposer().getProxyCDCPort());
parameter.setUsername(ProxyContainerConstants.USERNAME);
parameter.setPassword(ProxyContainerConstants.PASSWORD);
parameter.setDatabase("sharding_db");
// TODO add full=false test case later
parameter.setFull(true);
- String schema = getDatabaseType().isSchemaAvailable() ? "test" : "";
- parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable(getSourceTableOrderName()).setSchema(schema).build()));
- parameter.setDatabaseType(getDatabaseType().getType());
+ String schema = containerComposer.getDatabaseType().isSchemaAvailable() ? "test" : "";
+ parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_ORDER_NAME).setSchema(schema).build()));
+ parameter.setDatabaseType(containerComposer.getDatabaseType().getType());
CompletableFuture.runAsync(() -> new CDCClient(parameter).start(), executor).whenComplete((unused, throwable) -> {
if (null != throwable) {
log.error("cdc client sync failed, ", throwable);
@@ -198,17 +207,14 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
}
private List<Map<String, Object>> listOrderRecords(final String tableNameWithSchema) throws SQLException {
- try (Connection connection = DriverManager.getConnection(getActualJdbcUrlTemplate(DS_4, false), getUsername(), getPassword())) {
+ try (Connection connection = DriverManager.getConnection(
+ containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false), containerComposer.getUsername(), containerComposer.getPassword())) {
ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", tableNameWithSchema));
- return transformResultSetToList(resultSet);
+ return containerComposer.transformResultSetToList(resultSet);
}
}
private String getOrderTableNameWithSchema() {
- if (getDatabaseType().isSchemaAvailable()) {
- return String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName());
- } else {
- return getSourceTableOrderName();
- }
+ return containerComposer.getDatabaseType().isSchemaAvailable() ? String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME) : SOURCE_TABLE_ORDER_NAME;
}
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index eb1fae5cb3f..39b5d7003c1 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -21,11 +21,13 @@ import com.google.common.base.Strings;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
import org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
+import org.junit.After;
import org.opengauss.util.PSQLException;
import javax.xml.bind.JAXB;
@@ -44,48 +46,55 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Getter
@Slf4j
-public abstract class AbstractMigrationE2EIT extends PipelineBaseE2EIT {
+public abstract class AbstractMigrationE2EIT {
private final MigrationDistSQLCommand migrationDistSQLCommand;
+ private final PipelineContainerComposer containerComposer;
+
public AbstractMigrationE2EIT(final PipelineTestParameter testParam) {
- super(testParam);
- migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineBaseE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")), MigrationDistSQLCommand.class);
+ containerComposer = new PipelineContainerComposer(testParam);
+ migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(AbstractMigrationE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")), MigrationDistSQLCommand.class);
+ }
+
+ @After
+ public final void tearDown() {
+ containerComposer.close();
}
protected void addMigrationSourceResource() throws SQLException {
- if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
+ if (PipelineEnvTypeEnum.NATIVE == PipelineE2EEnvironment.getInstance().getItEnvType()) {
try {
- proxyExecuteWithLog("UNREGISTER MIGRATION SOURCE STORAGE UNIT ds_0", 2);
+ containerComposer.proxyExecuteWithLog("UNREGISTER MIGRATION SOURCE STORAGE UNIT ds_0", 2);
} catch (final SQLException ex) {
log.warn("Drop sharding_db failed, maybe it's not exist. error msg={}", ex.getMessage());
}
}
- String addSourceResource = migrationDistSQLCommand.getRegisterMigrationSourceStorageUnitTemplate().replace("${user}", getUsername())
- .replace("${password}", getPassword())
- .replace("${ds0}", appendExtraParam(getActualJdbcUrlTemplate(DS_0, true)));
- addResource(addSourceResource);
+ String addSourceResource = migrationDistSQLCommand.getRegisterMigrationSourceStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
+ .replace("${password}", containerComposer.getPassword())
+ .replace("${ds0}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, true)));
+ containerComposer.addResource(addSourceResource);
}
protected void addMigrationTargetResource() throws SQLException {
- String addTargetResource = migrationDistSQLCommand.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}", getUsername())
- .replace("${password}", getPassword())
- .replace("${ds2}", appendExtraParam(getActualJdbcUrlTemplate(DS_2, true)))
- .replace("${ds3}", appendExtraParam(getActualJdbcUrlTemplate(DS_3, true)))
- .replace("${ds4}", appendExtraParam(getActualJdbcUrlTemplate(DS_4, true)));
- addResource(addTargetResource);
- List<Map<String, Object>> resources = queryForListWithLog("SHOW STORAGE UNITS from sharding_db");
+ String addTargetResource = migrationDistSQLCommand.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
+ .replace("${password}", containerComposer.getPassword())
+ .replace("${ds2}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, true)))
+ .replace("${ds3}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true)))
+ .replace("${ds4}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true)));
+ containerComposer.addResource(addTargetResource);
+ List<Map<String, Object>> resources = containerComposer.queryForListWithLog("SHOW STORAGE UNITS from sharding_db");
assertThat(resources.size(), is(3));
}
protected void createSourceSchema(final String schemaName) throws SQLException {
- if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
- sourceExecuteWithLog(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
+ if (DatabaseTypeUtil.isPostgreSQL(containerComposer.getDatabaseType())) {
+ containerComposer.sourceExecuteWithLog(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
return;
}
- if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
+ if (DatabaseTypeUtil.isOpenGauss(containerComposer.getDatabaseType())) {
try {
- sourceExecuteWithLog(String.format("CREATE SCHEMA %s", schemaName));
+ containerComposer.sourceExecuteWithLog(String.format("CREATE SCHEMA %s", schemaName));
} catch (final SQLException ex) {
// only used for native mode.
if (ex instanceof PSQLException && "42P06".equals(ex.getSQLState())) {
@@ -98,57 +107,57 @@ public abstract class AbstractMigrationE2EIT extends PipelineBaseE2EIT {
}
protected void createTargetOrderTableRule() throws SQLException {
- proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 2);
+ containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 2);
}
protected void createTargetOrderTableEncryptRule() throws SQLException {
- proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableEncryptRule(), 2);
+ containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableEncryptRule(), 2);
}
protected void createTargetOrderItemTableRule() throws SQLException {
- proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(), 2);
+ containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(), 2);
}
protected void startMigration(final String sourceTableName, final String targetTableName) throws SQLException {
- proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName, targetTableName), 5);
+ containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName, targetTableName), 5);
}
protected void startMigrationWithSchema(final String sourceTableName, final String targetTableName) throws SQLException {
- proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName, targetTableName), 5);
+ containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName, targetTableName), 5);
}
protected void addMigrationProcessConfig() throws SQLException {
- proxyExecuteWithLog(migrationDistSQLCommand.getAlterMigrationRule(), 0);
+ containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getAlterMigrationRule(), 0);
}
protected void stopMigrationByJobId(final String jobId) throws SQLException {
- proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
+ containerComposer.proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
}
protected void startMigrationByJobId(final String jobId) throws SQLException {
- proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
+ containerComposer.proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
}
protected void commitMigrationByJobId(final String jobId) throws SQLException {
- proxyExecuteWithLog(String.format("COMMIT MIGRATION '%s'", jobId), 1);
+ containerComposer.proxyExecuteWithLog(String.format("COMMIT MIGRATION '%s'", jobId), 1);
}
protected List<String> listJobId() {
- List<Map<String, Object>> jobList = queryForListWithLog("SHOW MIGRATION LIST");
+ List<Map<String, Object>> jobList = containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
return jobList.stream().map(a -> a.get("id").toString()).collect(Collectors.toList());
}
protected String getJobIdByTableName(final String tableName) {
- List<Map<String, Object>> jobList = queryForListWithLog("SHOW MIGRATION LIST");
+ List<Map<String, Object>> jobList = containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find " + tableName + " table")).get("id").toString();
}
protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) throws SQLException {
- proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
+ containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
// TODO Need to add after the stop then to start, can continue the consistency check from the previous progress
List<Map<String, Object>> resultList = Collections.emptyList();
for (int i = 0; i < 10; i++) {
- resultList = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
+ resultList = containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
if (resultList.isEmpty()) {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
continue;
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index a3a685f991c..fc25f0bb2ac 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -23,9 +23,10 @@ import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -51,53 +52,47 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
- private final PipelineTestParameter testParam;
+ private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
public MySQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
super(testParam);
- this.testParam = testParam;
}
@Parameters(name = "{0}")
public static Collection<PipelineTestParameter> getTestParameters() {
Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+ if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
MySQLDatabaseType databaseType = new MySQLDatabaseType();
- for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(databaseType)) {
+ for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(databaseType)) {
result.add(new PipelineTestParameter(databaseType, each, "env/scenario/general/mysql.xml"));
}
return result;
}
- @Override
- protected String getSourceTableOrderName() {
- return "t_order_copy";
- }
-
@Test
public void assertMigrationSuccess() throws SQLException, InterruptedException {
- log.info("assertMigrationSuccess testParam:{}", testParam);
- initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
+ getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
addMigrationProcessConfig();
- createSourceOrderTable();
- createSourceOrderItemTable();
+ getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
+ getContainerComposer().createSourceOrderItemTable();
addMigrationSourceResource();
addMigrationTargetResource();
createTargetOrderTableRule();
createTargetOrderTableEncryptRule();
createTargetOrderItemTableRule();
- Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(testParam.getDatabaseType(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+ Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(getContainerComposer().getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
log.info("init data begin: {}", LocalDateTime.now());
- DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
- DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+ DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_ORDER_NAME), dataPair.getLeft());
+ DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
log.info("init data end: {}", LocalDateTime.now());
- startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+ startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
startMigration("t_order_item", "t_order_item");
- String orderJobId = getJobIdByTableName("ds_0." + getSourceTableOrderName());
- waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
- startIncrementTask(new E2EIncrementalTask(getSourceDataSource(), getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 30));
+ String orderJobId = getJobIdByTableName("ds_0." + SOURCE_TABLE_ORDER_NAME);
+ getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
+ getContainerComposer().startIncrementTask(
+ new E2EIncrementalTask(getContainerComposer().getSourceDataSource(), SOURCE_TABLE_ORDER_NAME, new SnowflakeKeyGenerateAlgorithm(), getContainerComposer().getDatabaseType(), 30));
assertMigrationSuccessById(orderJobId, "DATA_MATCH");
String orderItemJobId = getJobIdByTableName("ds_0.t_order_item");
assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
@@ -108,13 +103,12 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
}
List<String> lastJobIds = listJobId();
assertTrue(lastJobIds.isEmpty());
- proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
- assertGreaterThanOrderTableInitRows(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT, "");
- log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
+ getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
+ getContainerComposer().assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
}
private void assertMigrationSuccessById(final String jobId, final String algorithmType) throws SQLException, InterruptedException {
- List<Map<String, Object>> jobStatus = waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ List<Map<String, Object>> jobStatus = getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()), is(100));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 2b2c0ea2fa2..118778126bb 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -23,9 +23,10 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -49,28 +50,22 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
- private final PipelineTestParameter testParam;
+ private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
public PostgreSQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
super(testParam);
- this.testParam = testParam;
- }
-
- @Override
- protected String getSourceTableOrderName() {
- return "t_order_copy";
}
@Parameters(name = "{0}")
public static Collection<PipelineTestParameter> getTestParameters() {
Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+ if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
- for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType())) {
+ for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType())) {
result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), each, "env/scenario/general/postgresql.xml"));
}
- for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(new OpenGaussDatabaseType())) {
+ for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new OpenGaussDatabaseType())) {
result.add(new PipelineTestParameter(new OpenGaussDatabaseType(), each, "env/scenario/general/postgresql.xml"));
}
return result;
@@ -78,29 +73,29 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
@Test
public void assertMigrationSuccess() throws SQLException, InterruptedException {
- log.info("assertMigrationSuccess testParam:{}", testParam);
- initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
+ getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
addMigrationProcessConfig();
- createSourceSchema(PipelineBaseE2EIT.SCHEMA_NAME);
- createSourceOrderTable();
- createSourceOrderItemTable();
- createSourceTableIndexList(PipelineBaseE2EIT.SCHEMA_NAME);
- createSourceCommentOnList(PipelineBaseE2EIT.SCHEMA_NAME);
+ createSourceSchema(PipelineContainerComposer.SCHEMA_NAME);
+ getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
+ getContainerComposer().createSourceOrderItemTable();
+ getContainerComposer().createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME);
+ getContainerComposer().createSourceCommentOnList(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME);
addMigrationSourceResource();
addMigrationTargetResource();
createTargetOrderTableRule();
createTargetOrderItemTableRule();
- Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(testParam.getDatabaseType(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+ Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(getContainerComposer().getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
log.info("init data begin: {}", LocalDateTime.now());
- DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
- DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+ DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_ORDER_NAME), dataPair.getLeft());
+ DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
log.info("init data end: {}", LocalDateTime.now());
- startMigrationWithSchema(getSourceTableOrderName(), "t_order");
+ startMigrationWithSchema(SOURCE_TABLE_ORDER_NAME, "t_order");
Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> listJobId().size() > 0);
- String jobId = getJobIdByTableName("ds_0.test." + getSourceTableOrderName());
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- startIncrementTask(new E2EIncrementalTask(getSourceDataSource(), String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName()),
- new SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 20));
+ String jobId = getJobIdByTableName("ds_0.test." + SOURCE_TABLE_ORDER_NAME);
+ getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().startIncrementTask(new E2EIncrementalTask(
+ getContainerComposer().getSourceDataSource(), String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME),
+ new SnowflakeKeyGenerateAlgorithm(), getContainerComposer().getDatabaseType(), 20));
checkOrderMigration(jobId);
checkOrderItemMigration();
for (String each : listJobId()) {
@@ -108,28 +103,27 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
}
List<String> lastJobIds = listJobId();
assertTrue(lastJobIds.isEmpty());
- proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
- assertGreaterThanOrderTableInitRows(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1, PipelineBaseE2EIT.SCHEMA_NAME);
- log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
+ getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
+ getContainerComposer().assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, PipelineContainerComposer.SCHEMA_NAME);
}
private void checkOrderMigration(final String jobId) throws SQLException, InterruptedException {
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(jobId);
long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
- sourceExecuteWithLog(
- String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName()), recordId, 1, "afterStop"));
+ getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')",
+ String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME), recordId, 1, "afterStop"));
startMigrationByJobId(jobId);
// must refresh firstly, otherwise proxy can't get schema and table info
- proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
- assertProxyOrderRecordExist(String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getTargetTableOrderName()), recordId);
+ getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
+ getContainerComposer().assertProxyOrderRecordExist(String.join(".", PipelineContainerComposer.SCHEMA_NAME, getContainerComposer().getTargetTableOrderName()), recordId);
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
}
private void checkOrderItemMigration() throws SQLException, InterruptedException {
startMigrationWithSchema("t_order_item", "t_order_item");
String jobId = getJobIdByTableName("ds_0.test.t_order_item");
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
}
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 06154ae35d4..bd28b68bed1 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -21,8 +21,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -49,6 +50,8 @@ import static org.hamcrest.Matchers.is;
@Slf4j
public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
+ private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
+
public RulesMigrationE2EIT(final PipelineTestParameter testParam) {
super(testParam);
}
@@ -56,10 +59,10 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
@Parameters(name = "{0}")
public static Collection<PipelineTestParameter> getTestParameters() {
Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+ if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
- List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+ List<String> versions = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
if (versions.isEmpty()) {
return result;
}
@@ -67,11 +70,6 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
return result;
}
- @Override
- protected String getSourceTableOrderName() {
- return "t_order";
- }
-
@Test
public void assertNoRuleMigrationSuccess() throws Exception {
assertMigrationSuccess(null);
@@ -86,23 +84,23 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
}
private void assertMigrationSuccess(final Callable<Void> addRuleFn) throws Exception {
- initEnvironment(getDatabaseType(), new MigrationJobType());
- createSourceOrderTable();
- try (Connection connection = getSourceDataSource().getConnection()) {
- PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, new UUIDKeyGenerateAlgorithm(), getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+ getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
+ getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
+ try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
+ PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, new UUIDKeyGenerateAlgorithm(), SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
addMigrationSourceResource();
addMigrationTargetResource();
if (null != addRuleFn) {
addRuleFn.call();
}
- startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+ startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
String jobId = listJobId().get(0);
- waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
commitMigrationByJobId(jobId);
- proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
- assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT));
+ getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+ assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_ORDER_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
}
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 31c6f0edaff..fdef91f9a62 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -25,8 +25,9 @@ import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseT
import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -64,6 +65,8 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
+ "TYPE(NAME=\"hash_mod\",PROPERTIES(\"sharding-count\"=\"6\"))\n"
+ ");";
+ private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
+
public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
super(testParam);
}
@@ -71,34 +74,29 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
@Parameters(name = "{0}")
public static Collection<PipelineTestParameter> getTestParameters() {
Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+ if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
- List<String> mysqlVersion = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+ List<String> mysqlVersion = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
if (!mysqlVersion.isEmpty()) {
result.add(new PipelineTestParameter(new MySQLDatabaseType(), mysqlVersion.get(0), "env/common/none.xml"));
}
- List<String> postgresqlVersion = PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType());
+ List<String> postgresqlVersion = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType());
if (!postgresqlVersion.isEmpty()) {
result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), postgresqlVersion.get(0), "env/common/none.xml"));
}
return result;
}
- @Override
- protected String getSourceTableOrderName() {
- return "t_order";
- }
-
@Test
public void assertNoUniqueKeyMigrationSuccess() throws Exception {
String sql;
String consistencyCheckAlgorithmType;
- if (getDatabaseType() instanceof MySQLDatabaseType) {
+ if (getContainerComposer().getDatabaseType() instanceof MySQLDatabaseType) {
sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
// DATA_MATCH doesn't supported, could not order by records
consistencyCheckAlgorithmType = "CRC32_MATCH";
- } else if (getDatabaseType() instanceof PostgreSQLDatabaseType) {
+ } else if (getContainerComposer().getDatabaseType() instanceof PostgreSQLDatabaseType) {
sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id int NOT NULL,status varchar(255) NULL)";
consistencyCheckAlgorithmType = null;
} else {
@@ -108,13 +106,13 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(uniqueKey);
- assertProxyOrderRecordExist("t_order", uniqueKey);
+ getContainerComposer().assertProxyOrderRecordExist("t_order", uniqueKey);
return null;
});
}
private void insertOneOrder(final Object uniqueKey) throws SQLException {
- try (PreparedStatement preparedStatement = getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
+ try (PreparedStatement preparedStatement = getContainerComposer().getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
preparedStatement.setObject(1, uniqueKey);
preparedStatement.setObject(2, 1);
preparedStatement.setObject(3, "OK");
@@ -127,7 +125,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
public void assertMultiPrimaryKeyMigrationSuccess() throws Exception {
String sql;
String consistencyCheckAlgorithmType;
- if (getDatabaseType() instanceof MySQLDatabaseType) {
+ if (getContainerComposer().getDatabaseType() instanceof MySQLDatabaseType) {
sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
consistencyCheckAlgorithmType = "CRC32_MATCH";
} else {
@@ -137,7 +135,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(uniqueKey);
- assertProxyOrderRecordExist("t_order", uniqueKey);
+ getContainerComposer().assertProxyOrderRecordExist("t_order", uniqueKey);
return null;
});
}
@@ -146,7 +144,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
public void assertMultiUniqueKeyMigrationSuccess() throws Exception {
String sql;
String consistencyCheckAlgorithmType;
- if (getDatabaseType() instanceof MySQLDatabaseType) {
+ if (getContainerComposer().getDatabaseType() instanceof MySQLDatabaseType) {
sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
consistencyCheckAlgorithmType = "DATA_MATCH";
} else {
@@ -156,7 +154,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(uniqueKey);
- assertProxyOrderRecordExist("t_order", uniqueKey);
+ getContainerComposer().assertProxyOrderRecordExist("t_order", uniqueKey);
return null;
});
}
@@ -165,7 +163,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess() throws Exception {
String sql;
String consistencyCheckAlgorithmType;
- if (getDatabaseType() instanceof MySQLDatabaseType) {
+ if (getContainerComposer().getDatabaseType() instanceof MySQLDatabaseType) {
sql = "CREATE TABLE `%s` (`order_id` VARBINARY(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
// DATA_MATCH doesn't supported: Order by value must implements Comparable
consistencyCheckAlgorithmType = "CRC32_MATCH";
@@ -178,33 +176,33 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
assertMigrationSuccess(sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(uniqueKey);
// TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
- assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
+ getContainerComposer().assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
return null;
});
}
private void assertMigrationSuccess(final String sqlPattern, final String shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
final String consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws Exception {
- initEnvironment(getDatabaseType(), new MigrationJobType());
- sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
- try (Connection connection = getSourceDataSource().getConnection()) {
- PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, getSourceTableOrderName(), TABLE_INIT_ROW_COUNT);
+ getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
+ getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_ORDER_NAME));
+ try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
+ PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
addMigrationProcessConfig();
addMigrationSourceResource();
addMigrationTargetResource();
- proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT, shardingColumn), 2);
- startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+ getContainerComposer().proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT, shardingColumn), 2);
+ startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
String jobId = listJobId().get(0);
- waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
incrementalTaskFn.call();
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
if (null != consistencyCheckAlgorithmType) {
assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
}
commitMigrationByJobId(jobId);
- proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
- assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(TABLE_INIT_ROW_COUNT + 1));
+ getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+ assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_ORDER_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId();
assertTrue(lastJobIds.isEmpty());
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index ca97ada559b..3dc39456ea6 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -22,8 +22,9 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -46,6 +47,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
+ private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
+
public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
super(testParam);
}
@@ -53,10 +56,10 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
@Parameters(name = "{0}")
public static Collection<PipelineTestParameter> getTestParameters() {
Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+ if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
- List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+ List<String> versions = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
if (versions.isEmpty()) {
return result;
}
@@ -65,34 +68,29 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
return result;
}
- @Override
- protected String getSourceTableOrderName() {
- return "t_order";
- }
-
@Test
public void assertMigrationSuccess() throws SQLException, InterruptedException {
- initEnvironment(getDatabaseType(), new MigrationJobType());
+ getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
- sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
- try (Connection connection = getSourceDataSource().getConnection()) {
+ getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_ORDER_NAME));
+ try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
- PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+ PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
addMigrationProcessConfig();
addMigrationSourceResource();
addMigrationTargetResource();
createTargetOrderTableRule();
- startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+ startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
String jobId = listJobId().get(0);
- waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
- assertProxyOrderRecordExist("t_order", "a1");
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+ getContainerComposer().assertProxyOrderRecordExist("t_order", "a1");
+ getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
commitMigrationByJobId(jobId);
- proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
- assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));
+ getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+ assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_ORDER_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId();
assertTrue(lastJobIds.isEmpty());
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 2277d282e04..fd92fb88356 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -23,8 +23,9 @@ import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -46,34 +47,23 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
- private final PipelineTestParameter testParam;
-
public TextPrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam) {
super(testParam);
- this.testParam = testParam;
- }
-
- @Override
- protected String getSourceTableOrderName() {
- if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
- return "T_ORDER";
- }
- return "t_order";
}
@Parameters(name = "{0}")
public static Collection<PipelineTestParameter> getTestParameters() {
Collection<PipelineTestParameter> result = new LinkedList<>();
- if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+ if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
- for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType())) {
+ for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType())) {
result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/mysql.xml"));
}
- for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType())) {
+ for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType())) {
result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
}
- for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new OpenGaussDatabaseType())) {
+ for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new OpenGaussDatabaseType())) {
result.add(new PipelineTestParameter(new OpenGaussDatabaseType(), version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
}
return result;
@@ -81,25 +71,27 @@ public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
@Test
public void assertTextPrimaryMigrationSuccess() throws SQLException, InterruptedException {
- log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
- initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
- createSourceOrderTable();
- try (Connection connection = getSourceDataSource().getConnection()) {
+ getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
+ getContainerComposer().createSourceOrderTable(getSourceTableOrderName());
+ try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
- PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+ PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, getSourceTableOrderName(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
addMigrationProcessConfig();
addMigrationSourceResource();
addMigrationTargetResource();
createTargetOrderTableRule();
- startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+ startMigration(getSourceTableOrderName(), getContainerComposer().getTargetTableOrderName());
String jobId = listJobId().get(0);
- sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(), "1000000000", 1, "afterStop"));
- waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(), "1000000000", 1, "afterStop"));
+ getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
commitMigrationByJobId(jobId);
List<String> lastJobIds = listJobId();
assertTrue(lastJobIds.isEmpty());
- log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
+ }
+
+ private String getSourceTableOrderName() {
+ return DatabaseTypeUtil.isMySQL(getContainerComposer().getDatabaseType()) ? "T_ORDER" : "t_order";
}
}