You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/21 05:01:40 UTC
[shardingsphere] branch master updated: Improve migration IT, add clean up pipeline at native mode (#21105)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d265315c2fc Improve migration IT, add clean up pipeline at native mode (#21105)
d265315c2fc is described below
commit d265315c2fc35309de886d1b4f358d19530c090c
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Sep 21 13:01:30 2022 +0800
Improve migration IT, add clean up pipeline at native mode (#21105)
* Fix postgres task failed and add init data time log.
* Fix postgres task failed and add init data time log.
* Add cleanUp pipeline at native mode
* Fix bug
* Fix codestyle
* rollback PostgresSQL checker
---
.../data/pipeline/cases/base/BaseITCase.java | 6 ++---
.../cases/migration/AbstractMigrationITCase.java | 22 ++++++++++++++----
.../general/PostgreSQLMigrationGeneralIT.java | 5 ++++-
.../primarykey/TextPrimaryKeyMigrationIT.java | 4 +++-
.../cases/task/PostgreSQLIncrementTask.java | 26 ++++++++++------------
.../pipeline/env/IntegrationTestEnvironment.java | 2 +-
.../env/scenario/primary_key/unique_key/mysql.xml | 2 +-
7 files changed, 42 insertions(+), 25 deletions(-)
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index bc6af3f4258..2a0f31d0f45 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -129,7 +129,7 @@ public abstract class BaseITCase {
password = ENV.getActualDataSourcePassword(databaseType);
}
createProxyDatabase(parameterized.getDatabaseType());
- if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
cleanUpDataSource();
}
extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource(parameterized.getScenario())), ExtraSQLCommand.class);
@@ -137,7 +137,7 @@ public abstract class BaseITCase {
}
private void cleanUpDataSource() {
- for (String each : Arrays.asList(DS_0, DS_2, DS_3, DS_4)) {
+ for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
containerComposer.cleanUpDatabase(each);
}
}
@@ -149,7 +149,7 @@ public abstract class BaseITCase {
}
String jdbcUrl = containerComposer.getProxyJdbcUrl(defaultDatabaseName);
try (Connection connection = DriverManager.getConnection(jdbcUrl, ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
- if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
try {
connectionExecuteWithLog(connection, String.format("DROP DATABASE %s", PROXY_DATABASE));
} catch (final SQLException ex) {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 157131a2b2f..97aafb8cd7a 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -49,10 +49,24 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
public AbstractMigrationITCase(final ScalingParameterized parameterized) {
super(parameterized);
migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/migration-command.xml")), MigrationDistSQLCommand.class);
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
+ try {
+ cleanUpPipelineJobs();
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ private void cleanUpPipelineJobs() throws SQLException {
+ List<String> jobIds = listJobId();
+ for (String each : jobIds) {
+ proxyExecuteWithLog(String.format("ROLLBACK MIGRATION '%s'", each), 0);
+ }
}
protected void addMigrationSourceResource() throws SQLException {
- if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
try {
proxyExecuteWithLog("DROP MIGRATION SOURCE RESOURCE ds_0", 2);
} catch (final SQLException ex) {
@@ -108,15 +122,15 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
}
protected void startMigration(final String sourceTableName, final String targetTableName) throws SQLException {
- proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName, targetTableName), 1);
+ proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName, targetTableName), 5);
}
protected void startMigrationWithSchema(final String sourceTableName, final String targetTableName) throws SQLException {
- proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName, targetTableName), 1);
+ proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName, targetTableName), 5);
}
protected void addMigrationProcessConfig() throws SQLException {
- if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
try {
proxyExecuteWithLog("DROP MIGRATION PROCESS CONFIGURATION '/'", 0);
} catch (final SQLException ex) {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 8c89cdf0628..582c4842de1 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -35,6 +35,7 @@ import org.junit.runners.Parameterized.Parameters;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.SQLException;
+import java.time.LocalDateTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -93,8 +94,10 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
createTargetOrderItemTableRule();
Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(KEY_GENERATE_ALGORITHM, parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
+ log.info("init data begin: {}", LocalDateTime.now());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+ log.info("init data end: {}", LocalDateTime.now());
checkOrderMigration(jdbcTemplate);
checkOrderItemMigration();
if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
@@ -109,7 +112,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
private void checkOrderMigration(final JdbcTemplate jdbcTemplate) throws SQLException, InterruptedException {
startMigrationWithSchema(getSourceTableOrderName(), "t_order");
- startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, SCHEMA_NAME, getSourceTableOrderName(), false, 20));
+ startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, SCHEMA_NAME, getSourceTableOrderName(), 20));
String jobId = getJobIdByTableName(getSourceTableOrderName());
waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(jobId);
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index f06b4624061..45258013c6b 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -34,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.time.LocalDateTime;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -105,6 +106,7 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
}
private void batchInsertOrder() throws SQLException {
+ log.info("init data begin: {}", LocalDateTime.now());
UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
try (Connection connection = getSourceDataSource().getConnection()) {
PreparedStatement preparedStatement = connection.prepareStatement(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (?,?,?)", getSourceTableOrderName()));
@@ -116,6 +118,6 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
}
preparedStatement.executeBatch();
}
- log.info("init data succeed");
+ log.info("init data end: {}", LocalDateTime.now());
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index a30ea237fef..191890a223b 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -42,8 +42,6 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
private final String orderTableName;
- private final boolean incrementOrderItemTogether;
-
private final int executeCountLimit;
static {
@@ -58,15 +56,13 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
while (executeCount < executeCountLimit && !Thread.currentThread().isInterrupted()) {
Object orderId = insertOrder();
if (0 == executeCount % 2) {
- jdbcTemplate.update(String.format(prefixSchema("DELETE FROM ${schema}%s WHERE order_id = ?", schema), orderTableName), orderId);
+ jdbcTemplate.update(String.format("DELETE FROM %s WHERE order_id = ?", getTableNameWithSchema(orderTableName)), orderId);
} else {
updateOrderByPrimaryKey(orderId);
}
- if (incrementOrderItemTogether) {
- Object orderItemPrimaryKey = insertOrderItem();
- jdbcTemplate.update(prefixSchema("UPDATE ${schema}t_order_item SET status = ? WHERE item_id = ?", schema), "updated" + Instant.now().getEpochSecond(), orderItemPrimaryKey);
- jdbcTemplate.update(prefixSchema("DELETE FROM ${schema}t_order_item WHERE item_id = ?", schema), orderItemPrimaryKey);
- }
+ Object orderItemPrimaryKey = insertOrderItem();
+ jdbcTemplate.update(String.format("UPDATE %s SET status = ? WHERE item_id = ?", getTableNameWithSchema("t_order_item")), "updated" + Instant.now().getEpochSecond(), orderItemPrimaryKey);
+ jdbcTemplate.update(String.format("DELETE FROM %s WHERE item_id = ?", getTableNameWithSchema("t_order_item")), orderItemPrimaryKey);
executeCount++;
}
log.info("PostgreSQL increment task runnable execute successfully.");
@@ -76,7 +72,9 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
ThreadLocalRandom random = ThreadLocalRandom.current();
String status = 0 == random.nextInt() % 2 ? null : "NOT-NULL";
Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), status};
- jdbcTemplate.update(String.format(prefixSchema("INSERT INTO ${schema}%s (order_id,user_id,status) VALUES (?, ?, ?)", schema), orderTableName), orderInsertDate);
+ String insertSQL = String.format("INSERT INTO %s (order_id,user_id,status) VALUES (?, ?, ?)", getTableNameWithSchema(orderTableName));
+ log.info("insert order sql:{}", insertSQL);
+ jdbcTemplate.update(insertSQL, orderInsertDate);
return orderInsertDate[0];
}
@@ -84,20 +82,20 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
ThreadLocalRandom random = ThreadLocalRandom.current();
String status = 0 == random.nextInt() % 2 ? null : "NOT-NULL";
Object[] orderInsertItemDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
- jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)", schema), orderInsertItemDate);
+ jdbcTemplate.update(String.format("INSERT INTO %s(item_id,order_id,user_id,status) VALUES(?,?,?,?)", getTableNameWithSchema("t_order_item")), orderInsertItemDate);
return orderInsertItemDate[0];
}
private void updateOrderByPrimaryKey(final Object primaryKey) {
Object[] updateData = {"updated" + Instant.now().getEpochSecond(), primaryKey};
- jdbcTemplate.update(String.format(prefixSchema("UPDATE ${schema}%s SET status = ? WHERE order_id = ?", schema), updateData));
+ jdbcTemplate.update(String.format("UPDATE %s SET status = ? WHERE order_id = ?", getTableNameWithSchema(orderTableName)), updateData);
}
- private String prefixSchema(final String sql, final String schema) {
+ private String getTableNameWithSchema(final String tableName) {
if (StringUtils.isNotBlank(schema)) {
- return sql.replace("${schema}", schema + ".");
+ return String.join(".", schema, tableName);
} else {
- return sql.replace("${schema}", "");
+ return tableName;
}
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
index 134eb290975..f3f6c0d902e 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
@@ -153,7 +153,7 @@ public final class IntegrationTestEnvironment {
*/
public List<String> listStorageContainerImages(final DatabaseType databaseType) {
// Native mode needn't use docker image, just return a list which contain one item
- if (getItEnvType() == ITEnvTypeEnum.NATIVE) {
+ if (ITEnvTypeEnum.NATIVE == getItEnvType()) {
return databaseType.getType().equalsIgnoreCase(getNativeDatabaseType()) ? Collections.singletonList("") : Collections.emptyList();
}
switch (databaseType.getType()) {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
index 5f8a288d79a..baeb0d95b75 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
@@ -16,7 +16,7 @@
-->
<command>
<create-table-order>
- CREATE TABLE `t_order` (
+ CREATE TABLE `%s` (
`order_id` varchar(255) NOT NULL,
`user_id` INT NOT NULL,
`status` varchar(255) NULL,