You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/07 01:52:27 UTC

[shardingsphere] branch master updated: Auto refresh table metadata at migration (#26083)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 20410412951 Auto refresh table metadata at migration  (#26083)
20410412951 is described below

commit 204104129513f34abb4d6d95af53498315af6496
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Jun 7 09:52:15 2023 +0800

    Auto refresh table metadata at migration  (#26083)
    
    * Fix miss set transaction_commit_millis
    
    * Auto refresh table metadata at migration
    
    * Update doc
---
 .../shardingsphere-proxy/migration/usage.cn.md            |  6 ------
 .../shardingsphere-proxy/migration/usage.en.md            |  6 ------
 .../pipeline/cdc/util/DataRecordResultConvertUtils.java   |  3 ++-
 .../scenario/migration/api/impl/MigrationJobAPI.java      | 15 +++++++++++++++
 .../scenario/migration/prepare/MigrationJobPreparer.java  |  1 +
 .../migration/general/MySQLMigrationGeneralE2EIT.java     |  1 -
 .../general/PostgreSQLMigrationGeneralE2EIT.java          |  3 ---
 .../cases/migration/general/RulesMigrationE2EIT.java      |  1 -
 .../cases/migration/primarykey/IndexesMigrationE2EIT.java |  4 ----
 .../cases/migration/primarykey/MariaDBMigrationE2EIT.java |  3 +--
 10 files changed, 19 insertions(+), 24 deletions(-)

diff --git a/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.cn.md b/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.cn.md
index d5ba3b47f27..041a60bafe0 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.cn.md
@@ -714,10 +714,4 @@ SHOW MIGRATION CHECK STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
 COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
 ```
 
-8. 刷新元数据。
-
-```sql
-REFRESH TABLE METADATA;
-```
-
 更多 DistSQL 请参见 [RAL #数据迁移](/cn/user-manual/shardingsphere-proxy/distsql/syntax/ral/#%E6%95%B0%E6%8D%AE%E8%BF%81%E7%A7%BB)。
diff --git a/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.en.md b/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.en.md
index b9d68cb1200..a534ee60cd4 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.en.md
@@ -714,10 +714,4 @@ Result example:
 COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
 ```
 
-8. Refresh table metadata.
-
-```sql
-REFRESH TABLE METADATA;
-```
-
 Please refer to [RAL#Migration](/en/user-manual/shardingsphere-proxy/distsql/syntax/ral/#migration) for more details.
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
index f1c517451c1..361192d0e39 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
@@ -63,6 +63,7 @@ public final class DataRecordResultConvertUtils {
         } else if (IngestDataChangeType.DELETE.equals(dataRecord.getType())) {
             dataChangeType = DataChangeType.DELETE;
         }
-        return DataRecordResult.Record.newBuilder().setMetaData(metaData).addAllBefore(before).addAllAfter(after).setDataChangeType(dataChangeType).build();
+        return DataRecordResult.Record.newBuilder().setMetaData(metaData).addAllBefore(before).addAllAfter(after).setTransactionCommitMillis(dataRecord.getCommitTime())
+                .setDataChangeType(dataChangeType).build();
     }
 }
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index c22b44b41b5..fc21c681542 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -94,6 +94,7 @@ import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSour
 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
 import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
 import org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
+import org.apache.shardingsphere.mode.manager.ContextManager;
 
 import javax.sql.DataSource;
 import java.nio.charset.StandardCharsets;
@@ -414,6 +415,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
                 }
             }
         }
+        refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
     }
     
     @Override
@@ -504,6 +506,19 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
         return "";
     }
     
+    /**
+     * Refresh table metadata.
+     *
+     * @param jobId job id
+     * @param databaseName database name
+     */
+    public void refreshTableMetadata(final String jobId, final String databaseName) {
+        // TODO use origin database name now, wait reloadDatabaseMetaData fix case-sensitive probelm
+        ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
+        ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
+        contextManager.reloadDatabaseMetaData(database.getName());
+    }
+    
     @Override
     public JobType getJobType() {
         return JobTypeFactory.getInstance(MigrationJobType.TYPE_CODE);
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index be3ff4883b6..14404df2ba5 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -166,6 +166,7 @@ public final class MigrationJobPreparer {
         ShardingSphereMetaData metaData = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
         SQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(metaData, jobConfig.getTargetDatabaseName());
         PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new PrepareTargetTablesParameter(createTableConfig, dataSourceManager, sqlParserEngine));
+        jobAPI.refreshTableMetadata(jobConfig.getJobId(), jobConfig.getTargetDatabaseName());
     }
     
     private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 9a1325ab135..811a4f41609 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -81,7 +81,6 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
                     new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
             containerComposer.assertProxyOrderRecordExist("t_order", 10000);
             assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH");
             String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 6b4b716c1f5..23433d3f252 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -84,7 +84,6 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
                     containerComposer.getDatabaseType(), 20));
             TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
             containerComposer.assertProxyOrderRecordExist(schemaTableName, 10000);
             checkOrderMigration(containerComposer, jobId);
             checkOrderItemMigration(containerComposer);
@@ -100,8 +99,6 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     private void checkOrderMigration(final PipelineContainerComposer containerComposer, final String jobId) throws SQLException {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(containerComposer, jobId);
-        // must refresh firstly, otherwise proxy can't get schema and table info
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
         long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
         containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')",
                 String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 57a27c7e217..37f5c2aba35 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -87,7 +87,6 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
         commitMigrationByJobId(containerComposer, jobId);
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
         assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
     }
     
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index d28f37f5fed..bfc53d1a0cd 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -167,7 +167,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
                 containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
                 return null;
             });
@@ -192,7 +191,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
                 containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
                 return null;
             });
@@ -218,7 +216,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             byte[] uniqueKey = new byte[]{-1, 0, 1};
             assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
                 insertOneOrder(containerComposer, uniqueKey);
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
                 // TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
                 containerComposer.assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
                 return null;
@@ -245,7 +242,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             assertCheckMigrationSuccess(containerComposer, jobId, consistencyCheckAlgorithmType);
         }
         commitMigrationByJobId(containerComposer, jobId);
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
         assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
         List<String> lastJobIds = listJobId(containerComposer);
         assertTrue(lastJobIds.isEmpty());
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index e7dc4789f5f..47264856beb 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -53,7 +53,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
-    void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
+    void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             containerComposer.sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_NAME));
@@ -69,7 +69,6 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
             String jobId = listJobId(containerComposer).get(0);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
             containerComposer.assertProxyOrderRecordExist("t_order", "a1");
             containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
             assertCheckMigrationSuccess(containerComposer, jobId, "CRC32_MATCH");