You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/21 05:01:40 UTC

[shardingsphere] branch master updated: Improve migration IT, add clean up pipeline at native mode (#21105)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d265315c2fc Improve migration IT, add clean up pipeline at native mode (#21105)
d265315c2fc is described below

commit d265315c2fc35309de886d1b4f358d19530c090c
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Sep 21 13:01:30 2022 +0800

    Improve migration IT, add clean up pipeline at native mode (#21105)
    
    * Fix postgres task failed and add init data time log.
    
    * Fix postgres task failed and add init data time log.
    
    * Add cleanUp pipeline at native mode
    
    * Fix bug
    
    * Fix codestyle
    
    * rollback PostgresSQL checker
---
 .../data/pipeline/cases/base/BaseITCase.java       |  6 ++---
 .../cases/migration/AbstractMigrationITCase.java   | 22 ++++++++++++++----
 .../general/PostgreSQLMigrationGeneralIT.java      |  5 ++++-
 .../primarykey/TextPrimaryKeyMigrationIT.java      |  4 +++-
 .../cases/task/PostgreSQLIncrementTask.java        | 26 ++++++++++------------
 .../pipeline/env/IntegrationTestEnvironment.java   |  2 +-
 .../env/scenario/primary_key/unique_key/mysql.xml  |  2 +-
 7 files changed, 42 insertions(+), 25 deletions(-)

diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index bc6af3f4258..2a0f31d0f45 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -129,7 +129,7 @@ public abstract class BaseITCase {
             password = ENV.getActualDataSourcePassword(databaseType);
         }
         createProxyDatabase(parameterized.getDatabaseType());
-        if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+        if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
             cleanUpDataSource();
         }
         extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource(parameterized.getScenario())), ExtraSQLCommand.class);
@@ -137,7 +137,7 @@ public abstract class BaseITCase {
     }
     
     private void cleanUpDataSource() {
-        for (String each : Arrays.asList(DS_0, DS_2, DS_3, DS_4)) {
+        for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
             containerComposer.cleanUpDatabase(each);
         }
     }
@@ -149,7 +149,7 @@ public abstract class BaseITCase {
         }
         String jdbcUrl = containerComposer.getProxyJdbcUrl(defaultDatabaseName);
         try (Connection connection = DriverManager.getConnection(jdbcUrl, ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
-            if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+            if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
                 try {
                     connectionExecuteWithLog(connection, String.format("DROP DATABASE %s", PROXY_DATABASE));
                 } catch (final SQLException ex) {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 157131a2b2f..97aafb8cd7a 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -49,10 +49,24 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
     public AbstractMigrationITCase(final ScalingParameterized parameterized) {
         super(parameterized);
         migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/migration-command.xml")), MigrationDistSQLCommand.class);
+        if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
+            try {
+                cleanUpPipelineJobs();
+            } catch (final SQLException ex) {
+                throw new RuntimeException(ex);
+            }
+        }
+    }
+    
+    private void cleanUpPipelineJobs() throws SQLException {
+        List<String> jobIds = listJobId();
+        for (String each : jobIds) {
+            proxyExecuteWithLog(String.format("ROLLBACK MIGRATION '%s'", each), 0);
+        }
     }
     
     protected void addMigrationSourceResource() throws SQLException {
-        if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+        if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
             try {
                 proxyExecuteWithLog("DROP MIGRATION SOURCE RESOURCE ds_0", 2);
             } catch (final SQLException ex) {
@@ -108,15 +122,15 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
     }
     
     protected void startMigration(final String sourceTableName, final String targetTableName) throws SQLException {
-        proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName, targetTableName), 1);
+        proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName, targetTableName), 5);
     }
     
     protected void startMigrationWithSchema(final String sourceTableName, final String targetTableName) throws SQLException {
-        proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName, targetTableName), 1);
+        proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName, targetTableName), 5);
     }
     
     protected void addMigrationProcessConfig() throws SQLException {
-        if (ENV.getItEnvType() == ITEnvTypeEnum.NATIVE) {
+        if (ITEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
             try {
                 proxyExecuteWithLog("DROP MIGRATION PROCESS CONFIGURATION '/'", 0);
             } catch (final SQLException ex) {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 8c89cdf0628..582c4842de1 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -35,6 +35,7 @@ import org.junit.runners.Parameterized.Parameters;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 import java.sql.SQLException;
+import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -93,8 +94,10 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
         createTargetOrderItemTableRule();
         Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(KEY_GENERATE_ALGORITHM, parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
         JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
+        log.info("init data begin: {}", LocalDateTime.now());
         jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
         jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+        log.info("init data end: {}", LocalDateTime.now());
         checkOrderMigration(jdbcTemplate);
         checkOrderItemMigration();
         if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
@@ -109,7 +112,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
     
     private void checkOrderMigration(final JdbcTemplate jdbcTemplate) throws SQLException, InterruptedException {
         startMigrationWithSchema(getSourceTableOrderName(), "t_order");
-        startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, SCHEMA_NAME, getSourceTableOrderName(), false, 20));
+        startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, SCHEMA_NAME, getSourceTableOrderName(), 20));
         String jobId = getJobIdByTableName(getSourceTableOrderName());
         waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(jobId);
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index f06b4624061..45258013c6b 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -34,6 +34,7 @@ import org.junit.runners.Parameterized.Parameters;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -105,6 +106,7 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
     }
     
     private void batchInsertOrder() throws SQLException {
+        log.info("init data begin: {}", LocalDateTime.now());
         UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
         try (Connection connection = getSourceDataSource().getConnection()) {
             PreparedStatement preparedStatement = connection.prepareStatement(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (?,?,?)", getSourceTableOrderName()));
@@ -116,6 +118,6 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
             }
             preparedStatement.executeBatch();
         }
-        log.info("init data succeed");
+        log.info("init data end: {}", LocalDateTime.now());
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index a30ea237fef..191890a223b 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -42,8 +42,6 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
     
     private final String orderTableName;
     
-    private final boolean incrementOrderItemTogether;
-    
     private final int executeCountLimit;
     
     static {
@@ -58,15 +56,13 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
         while (executeCount < executeCountLimit && !Thread.currentThread().isInterrupted()) {
             Object orderId = insertOrder();
             if (0 == executeCount % 2) {
-                jdbcTemplate.update(String.format(prefixSchema("DELETE FROM ${schema}%s WHERE order_id = ?", schema), orderTableName), orderId);
+                jdbcTemplate.update(String.format("DELETE FROM %s WHERE order_id = ?", getTableNameWithSchema(orderTableName)), orderId);
             } else {
                 updateOrderByPrimaryKey(orderId);
             }
-            if (incrementOrderItemTogether) {
-                Object orderItemPrimaryKey = insertOrderItem();
-                jdbcTemplate.update(prefixSchema("UPDATE ${schema}t_order_item SET status = ? WHERE item_id = ?", schema), "updated" + Instant.now().getEpochSecond(), orderItemPrimaryKey);
-                jdbcTemplate.update(prefixSchema("DELETE FROM ${schema}t_order_item WHERE item_id = ?", schema), orderItemPrimaryKey);
-            }
+            Object orderItemPrimaryKey = insertOrderItem();
+            jdbcTemplate.update(String.format("UPDATE %s SET status = ? WHERE item_id = ?", getTableNameWithSchema("t_order_item")), "updated" + Instant.now().getEpochSecond(), orderItemPrimaryKey);
+            jdbcTemplate.update(String.format("DELETE FROM %s WHERE item_id = ?", getTableNameWithSchema("t_order_item")), orderItemPrimaryKey);
             executeCount++;
         }
         log.info("PostgreSQL increment task runnable execute successfully.");
@@ -76,7 +72,9 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         String status = 0 == random.nextInt() % 2 ? null : "NOT-NULL";
         Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), status};
-        jdbcTemplate.update(String.format(prefixSchema("INSERT INTO ${schema}%s (order_id,user_id,status) VALUES (?, ?, ?)", schema), orderTableName), orderInsertDate);
+        String insertSQL = String.format("INSERT INTO %s (order_id,user_id,status) VALUES (?, ?, ?)", getTableNameWithSchema(orderTableName));
+        log.info("insert order sql:{}", insertSQL);
+        jdbcTemplate.update(insertSQL, orderInsertDate);
         return orderInsertDate[0];
     }
     
@@ -84,20 +82,20 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         String status = 0 == random.nextInt() % 2 ? null : "NOT-NULL";
         Object[] orderInsertItemDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
-        jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)", schema), orderInsertItemDate);
+        jdbcTemplate.update(String.format("INSERT INTO %s(item_id,order_id,user_id,status) VALUES(?,?,?,?)", getTableNameWithSchema("t_order_item")), orderInsertItemDate);
         return orderInsertItemDate[0];
     }
     
     private void updateOrderByPrimaryKey(final Object primaryKey) {
         Object[] updateData = {"updated" + Instant.now().getEpochSecond(), primaryKey};
-        jdbcTemplate.update(String.format(prefixSchema("UPDATE ${schema}%s SET status = ? WHERE order_id = ?", schema), updateData));
+        jdbcTemplate.update(String.format("UPDATE %s SET status = ? WHERE order_id = ?", getTableNameWithSchema(orderTableName)), updateData);
     }
     
-    private String prefixSchema(final String sql, final String schema) {
+    private String getTableNameWithSchema(final String tableName) {
         if (StringUtils.isNotBlank(schema)) {
-            return sql.replace("${schema}", schema + ".");
+            return String.join(".", schema, tableName);
         } else {
-            return sql.replace("${schema}", "");
+            return tableName;
         }
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
index 134eb290975..f3f6c0d902e 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/IntegrationTestEnvironment.java
@@ -153,7 +153,7 @@ public final class IntegrationTestEnvironment {
      */
     public List<String> listStorageContainerImages(final DatabaseType databaseType) {
         // Native mode needn't use docker image, just return a list which contain one item
-        if (getItEnvType() == ITEnvTypeEnum.NATIVE) {
+        if (ITEnvTypeEnum.NATIVE == getItEnvType()) {
             return databaseType.getType().equalsIgnoreCase(getNativeDatabaseType()) ? Collections.singletonList("") : Collections.emptyList();
         }
         switch (databaseType.getType()) {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
index 5f8a288d79a..baeb0d95b75 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/primary_key/unique_key/mysql.xml
@@ -16,7 +16,7 @@
   -->
 <command>
     <create-table-order>
-        CREATE TABLE `t_order` (
+        CREATE TABLE `%s` (
         `order_id` varchar(255) NOT NULL,
         `user_id` INT NOT NULL,
         `status` varchar(255) NULL,