You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/06 10:51:05 UTC
[shardingsphere] branch master updated: Refactor InventoryTaskSplitter (#26071)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7b50f71bcb4 Refactor InventoryTaskSplitter (#26071)
7b50f71bcb4 is described below
commit 7b50f71bcb43aea349bae6671df88935ac3f149e
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Jun 6 18:50:57 2023 +0800
Refactor InventoryTaskSplitter (#26071)
* Use unified parameters ordering in InventoryTaskSplitter
* Simplify getTableRecordsCount parameter
* Extract InventoryRecordsCountCalculator
* Refactor to use unified jobItemContext.updateInventoryRecordsCount for getInventoryPositions
* Update
* Remove shardingsphere-sql-federation-executor-original module references
* Make all infra modules trigger pipeline E2E to find issue ASAP
* REFRESH TABLE METADATA before access proxy in E2E
---
.github/workflows/e2e-pipeline.yml | 6 +-
.../prepare/InventoryRecordsCountCalculator.java | 98 +++++++++++++++++++
.../core/prepare/InventoryTaskSplitter.java | 106 +++------------------
.../general/MySQLMigrationGeneralE2EIT.java | 2 +-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 2 +-
.../primarykey/IndexesMigrationE2EIT.java | 3 +
.../primarykey/MariaDBMigrationE2EIT.java | 2 +-
7 files changed, 120 insertions(+), 99 deletions(-)
diff --git a/.github/workflows/e2e-pipeline.yml b/.github/workflows/e2e-pipeline.yml
index d880d11c330..38b0f078ac0 100644
--- a/.github/workflows/e2e-pipeline.yml
+++ b/.github/workflows/e2e-pipeline.yml
@@ -22,8 +22,7 @@ on:
branches: [ master, dev ]
paths:
- '.github/workflows/e2e-pipeline.yml'
- - 'infra/common/src/main/**'
- - 'infra/executor/src/main/**'
+ - 'infra/**/src/main/**'
- 'mode/**/src/main/**'
- 'proxy/**/src/main/**'
- 'jdbc/core/src/main/**'
@@ -42,8 +41,7 @@ on:
branches: [ master ]
paths:
- '.github/workflows/e2e-pipeline.yml'
- - 'infra/common/src/main/**'
- - 'infra/executor/src/main/**'
+ - 'infra/**/src/main/**'
- 'mode/**/src/main/**'
- 'proxy/**/src/main/**'
- 'jdbc/core/src/main/**'
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryRecordsCountCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryRecordsCountCalculator.java
new file mode 100644
index 00000000000..facdda8e9af
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryRecordsCountCalculator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.prepare;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+/**
+ * Inventory records count calculator.
+ */
+@Slf4j
+public final class InventoryRecordsCountCalculator {
+
+ /**
+ * Get table records count.
+ *
+ * @param dumperConfig dump configuration
+ * @param dataSource data source
+ * @return table records count
+ * @throws SplitPipelineJobByUniqueKeyException if there's exception from database
+ */
+ public static long getTableRecordsCount(final InventoryDumperConfiguration dumperConfig, final PipelineDataSourceWrapper dataSource) {
+ String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
+ String actualTableName = dumperConfig.getActualTableName();
+ PipelineSQLBuilder pipelineSQLBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, dataSource.getDatabaseType().getType());
+ Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
+ try {
+ if (sql.isPresent()) {
+ DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, dataSource.getDatabaseType().getType());
+ long result = getEstimatedCount(databaseType, dataSource, sql.get());
+ return result > 0 ? result : getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+ }
+ return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+ } catch (final SQLException ex) {
+ String uniqueKey = dumperConfig.hasUniqueKey() ? dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
+ throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), uniqueKey, ex);
+ }
+ }
+
+ private static long getEstimatedCount(final DatabaseType databaseType, final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(estimatedCountSQL)) {
+ if (databaseType instanceof MySQLDatabaseType) {
+ preparedStatement.setString(1, connection.getCatalog());
+ }
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ resultSet.next();
+ return resultSet.getLong(1);
+ }
+ }
+ }
+
+ private static long getCount(final DataSource dataSource, final String countSQL) throws SQLException {
+ long startTimeMillis = System.currentTimeMillis();
+ long result;
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(countSQL)) {
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ resultSet.next();
+ result = resultSet.getLong(1);
+ }
+ }
+ log.info("getCount cost {} ms, sql: {}", System.currentTimeMillis() - startTimeMillis, countSQL);
+ return result;
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 116df621efb..86b781f41ab 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -21,7 +21,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
@@ -48,11 +47,7 @@ import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
-import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -61,7 +56,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -118,7 +112,7 @@ public final class InventoryTaskSplitter {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
dumperConfig.getTableNameMap().forEach((key, value) -> {
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig);
- // use original table name, for meta data loader, since some database table name case-sensitive
+ // use original table name, for metadata loader, since some database table name case-sensitive
inventoryDumperConfig.setActualTableName(key.getOriginal());
inventoryDumperConfig.setLogicTableName(value.getOriginal());
inventoryDumperConfig.setPosition(new PlaceholderPosition());
@@ -129,7 +123,7 @@ public final class InventoryTaskSplitter {
}
private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
- final DataSource dataSource) {
+ final PipelineDataSourceWrapper dataSource) {
if (null == dumperConfig.getUniqueKeyColumns()) {
String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
String actualTableName = dumperConfig.getActualTableName();
@@ -141,7 +135,7 @@ public final class InventoryTaskSplitter {
PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead();
int batchSize = readConfig.getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
- Collection<IngestPosition> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource);
+ Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperConfig, jobItemContext, dataSource);
int i = 0;
for (IngestPosition each : inventoryPositions) {
InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
@@ -157,8 +151,8 @@ public final class InventoryTaskSplitter {
return result;
}
- private Collection<IngestPosition> getInventoryPositions(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig,
- final DataSource dataSource) {
+ private Collection<IngestPosition> getInventoryPositions(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
+ final PipelineDataSourceWrapper dataSource) {
InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress();
if (null != initProgress) {
// Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
@@ -167,83 +161,29 @@ public final class InventoryTaskSplitter {
return result;
}
}
+ jobItemContext.updateInventoryRecordsCount(InventoryRecordsCountCalculator.getTableRecordsCount(dumperConfig, dataSource));
if (!dumperConfig.hasUniqueKey()) {
- return getPositionWithoutUniqueKey(jobItemContext, dataSource, dumperConfig);
+ return Collections.singletonList(new NoUniqueKeyPosition());
}
List<PipelineColumnMetaData> uniqueKeyColumns = dumperConfig.getUniqueKeyColumns();
if (1 == uniqueKeyColumns.size()) {
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
- return getPositionByIntegerUniqueKeyRange(jobItemContext, dataSource, dumperConfig);
+ return getPositionByIntegerUniqueKeyRange(dumperConfig, jobItemContext, dataSource);
}
if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
- return getPositionByStringUniqueKeyRange(jobItemContext, dataSource, dumperConfig);
+ // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases.
+ return Collections.singletonList(new StringPrimaryKeyPosition(null, null));
}
}
- return getUnsupportedPosition(jobItemContext, dataSource, dumperConfig);
- }
-
- private Collection<IngestPosition> getPositionWithoutUniqueKey(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
- final InventoryDumperConfiguration dumperConfig) {
- long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
- jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
- return Collections.singletonList(new NoUniqueKeyPosition());
- }
-
- private long getTableRecordsCount(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
- PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
- String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
- String actualTableName = dumperConfig.getActualTableName();
- PipelineSQLBuilder pipelineSQLBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType());
- Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
- try {
- if (sql.isPresent()) {
- DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, jobConfig.getSourceDatabaseType());
- long result = getEstimatedCount(databaseType, dataSource, sql.get());
- return result > 0 ? result : getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
- }
- return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
- } catch (final SQLException ex) {
- String uniqueKey = dumperConfig.hasUniqueKey() ? dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
- throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), uniqueKey, ex);
- }
- }
-
- private long getEstimatedCount(final DatabaseType databaseType, final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(estimatedCountSQL)) {
- if (databaseType instanceof MySQLDatabaseType) {
- preparedStatement.setString(1, connection.getCatalog());
- }
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- resultSet.next();
- return resultSet.getLong(1);
- }
- }
- }
-
- private long getCount(final DataSource dataSource, final String countSQL) throws SQLException {
- long startTimeMillis = System.currentTimeMillis();
- long result;
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(countSQL)) {
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- resultSet.next();
- result = resultSet.getLong(1);
- }
- }
- log.info("getCountSQLResult cost {} ms", System.currentTimeMillis() - startTimeMillis);
- return result;
+ return Collections.singletonList(new UnsupportedKeyPosition());
}
- private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
- final InventoryDumperConfiguration dumperConfig) {
+ private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
+ final PipelineDataSourceWrapper dataSource) {
Collection<IngestPosition> result = new LinkedList<>();
- PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
- String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
+ String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, dataSource.getDatabaseType().getType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey);
int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
try (
@@ -251,7 +191,6 @@ public final class InventoryTaskSplitter {
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
// TODO query minimum value less than 0
long beginId = 0;
- long recordsCount = 0;
for (int i = 0; i < Integer.MAX_VALUE; i++) {
preparedStatement.setLong(1, beginId);
preparedStatement.setLong(2, shardingSize);
@@ -260,7 +199,6 @@ public final class InventoryTaskSplitter {
break;
}
long endId = resultSet.getLong(1);
- recordsCount += resultSet.getLong(2);
if (0 == endId) {
break;
}
@@ -268,7 +206,6 @@ public final class InventoryTaskSplitter {
beginId = endId + 1;
}
}
- jobItemContext.updateInventoryRecordsCount(recordsCount);
// fix empty table missing inventory task
if (result.isEmpty()) {
result.add(new IntegerPrimaryKeyPosition(0, 0));
@@ -278,19 +215,4 @@ public final class InventoryTaskSplitter {
}
return result;
}
-
- private Collection<IngestPosition> getPositionByStringUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
- final InventoryDumperConfiguration dumperConfig) {
- long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
- jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
- Collection<IngestPosition> result = new LinkedList<>();
- result.add(new StringPrimaryKeyPosition(null, null));
- return result;
- }
-
- private Collection<IngestPosition> getUnsupportedPosition(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
- long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
- jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
- return Collections.singletonList(new UnsupportedKeyPosition());
- }
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 52703047d55..9a1325ab135 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -81,6 +81,7 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist("t_order", 10000);
assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH");
String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
@@ -92,7 +93,6 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
}
List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
}
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 4b79eca1492..6b4b716c1f5 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -84,6 +84,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
containerComposer.getDatabaseType(), 20));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist(schemaTableName, 10000);
checkOrderMigration(containerComposer, jobId);
checkOrderItemMigration(containerComposer);
@@ -92,7 +93,6 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
}
List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, PipelineContainerComposer.SCHEMA_NAME);
}
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 72d838da673..d28f37f5fed 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -167,6 +167,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
return null;
});
@@ -191,6 +192,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
return null;
});
@@ -216,6 +218,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
byte[] uniqueKey = new byte[]{-1, 0, 1};
assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
insertOneOrder(containerComposer, uniqueKey);
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
// TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
containerComposer.assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
return null;
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index e726f02e360..e7dc4789f5f 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -69,11 +69,11 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
String jobId = listJobId(containerComposer).get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
containerComposer.sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
containerComposer.assertProxyOrderRecordExist("t_order", "a1");
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(containerComposer, jobId, "CRC32_MATCH");
commitMigrationByJobId(containerComposer, jobId);
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());