You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/07 01:52:27 UTC
[shardingsphere] branch master updated: Auto refresh table metadata at migration (#26083)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 20410412951 Auto refresh table metadata at migration (#26083)
20410412951 is described below
commit 204104129513f34abb4d6d95af53498315af6496
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Jun 7 09:52:15 2023 +0800
Auto refresh table metadata at migration (#26083)
* Fix miss set transaction_commit_millis
* Auto refresh table metadata at migration
* Update doc
---
.../shardingsphere-proxy/migration/usage.cn.md | 6 ------
.../shardingsphere-proxy/migration/usage.en.md | 6 ------
.../pipeline/cdc/util/DataRecordResultConvertUtils.java | 3 ++-
.../scenario/migration/api/impl/MigrationJobAPI.java | 15 +++++++++++++++
.../scenario/migration/prepare/MigrationJobPreparer.java | 1 +
.../migration/general/MySQLMigrationGeneralE2EIT.java | 1 -
.../general/PostgreSQLMigrationGeneralE2EIT.java | 3 ---
.../cases/migration/general/RulesMigrationE2EIT.java | 1 -
.../cases/migration/primarykey/IndexesMigrationE2EIT.java | 4 ----
.../cases/migration/primarykey/MariaDBMigrationE2EIT.java | 3 +--
10 files changed, 19 insertions(+), 24 deletions(-)
diff --git a/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.cn.md b/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.cn.md
index d5ba3b47f27..041a60bafe0 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.cn.md
@@ -714,10 +714,4 @@ SHOW MIGRATION CHECK STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
```
-8. 刷新元数据。
-
-```sql
-REFRESH TABLE METADATA;
-```
-
更多 DistSQL 请参见 [RAL #数据迁移](/cn/user-manual/shardingsphere-proxy/distsql/syntax/ral/#%E6%95%B0%E6%8D%AE%E8%BF%81%E7%A7%BB)。
diff --git a/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.en.md b/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.en.md
index b9d68cb1200..a534ee60cd4 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/migration/usage.en.md
@@ -714,10 +714,4 @@ Result example:
COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
```
-8. Refresh table metadata.
-
-```sql
-REFRESH TABLE METADATA;
-```
-
Please refer to [RAL#Migration](/en/user-manual/shardingsphere-proxy/distsql/syntax/ral/#migration) for more details.
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
index f1c517451c1..361192d0e39 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtils.java
@@ -63,6 +63,7 @@ public final class DataRecordResultConvertUtils {
} else if (IngestDataChangeType.DELETE.equals(dataRecord.getType())) {
dataChangeType = DataChangeType.DELETE;
}
- return DataRecordResult.Record.newBuilder().setMetaData(metaData).addAllBefore(before).addAllAfter(after).setDataChangeType(dataChangeType).build();
+ return DataRecordResult.Record.newBuilder().setMetaData(metaData).addAllBefore(before).addAllAfter(after).setTransactionCommitMillis(dataRecord.getCommitTime())
+ .setDataChangeType(dataChangeType).build();
}
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index c22b44b41b5..fc21c681542 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -94,6 +94,7 @@ import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSour
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
import org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
+import org.apache.shardingsphere.mode.manager.ContextManager;
import javax.sql.DataSource;
import java.nio.charset.StandardCharsets;
@@ -414,6 +415,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
}
}
}
+ refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
}
@Override
@@ -504,6 +506,19 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
return "";
}
+ /**
+ * Refresh table metadata.
+ *
+ * @param jobId job id
+ * @param databaseName database name
+ */
+ public void refreshTableMetadata(final String jobId, final String databaseName) {
+ // TODO use origin database name now, wait reloadDatabaseMetaData fix case-sensitive probelm
+ ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager();
+ ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName);
+ contextManager.reloadDatabaseMetaData(database.getName());
+ }
+
@Override
public JobType getJobType() {
return JobTypeFactory.getInstance(MigrationJobType.TYPE_CODE);
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index be3ff4883b6..14404df2ba5 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -166,6 +166,7 @@ public final class MigrationJobPreparer {
ShardingSphereMetaData metaData = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(metaData, jobConfig.getTargetDatabaseName());
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new PrepareTargetTablesParameter(createTableConfig, dataSourceManager, sqlParserEngine));
+ jobAPI.refreshTableMetadata(jobConfig.getJobId(), jobConfig.getTargetDatabaseName());
}
private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 9a1325ab135..811a4f41609 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -81,7 +81,6 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist("t_order", 10000);
assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH");
String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 6b4b716c1f5..23433d3f252 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -84,7 +84,6 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
containerComposer.getDatabaseType(), 20));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist(schemaTableName, 10000);
checkOrderMigration(containerComposer, jobId);
checkOrderItemMigration(containerComposer);
@@ -100,8 +99,6 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
private void checkOrderMigration(final PipelineContainerComposer containerComposer, final String jobId) throws SQLException {
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(containerComposer, jobId);
- // must refresh firstly, otherwise proxy can't get schema and table info
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')",
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 57a27c7e217..37f5c2aba35 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -87,7 +87,6 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
commitMigrationByJobId(containerComposer, jobId);
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index d28f37f5fed..bfc53d1a0cd 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -167,7 +167,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
return null;
});
@@ -192,7 +191,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
return null;
});
@@ -218,7 +216,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
byte[] uniqueKey = new byte[]{-1, 0, 1};
assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(containerComposer, uniqueKey);
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
// TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
containerComposer.assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
return null;
@@ -245,7 +242,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
assertCheckMigrationSuccess(containerComposer, jobId, consistencyCheckAlgorithmType);
}
commitMigrationByJobId(containerComposer, jobId);
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index e7dc4789f5f..47264856beb 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -53,7 +53,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
@ParameterizedTest(name = "{0}")
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
- void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
+ void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException {
try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
containerComposer.sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_NAME));
@@ -69,7 +69,6 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
String jobId = listJobId(containerComposer).get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
containerComposer.sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist("t_order", "a1");
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(containerComposer, jobId, "CRC32_MATCH");