You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/06 10:51:05 UTC

[shardingsphere] branch master updated: Refactor InventoryTaskSplitter (#26071)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b50f71bcb4 Refactor InventoryTaskSplitter (#26071)
7b50f71bcb4 is described below

commit 7b50f71bcb43aea349bae6671df88935ac3f149e
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Jun 6 18:50:57 2023 +0800

    Refactor InventoryTaskSplitter (#26071)
    
    * Use unified parameters ordering in InventoryTaskSplitter
    
    * Simplify getTableRecordsCount parameter
    
    * Extract InventoryRecordsCountCalculator
    
    * Refactor to use unified jobItemContext.updateInventoryRecordsCount for getInventoryPositions
    
    * Update
    
    * Remove shardingsphere-sql-federation-executor-original module references
    
    * Make all infra modules trigger pipeline E2E to find issue ASAP
    
    * REFRESH TABLE METADATA before access proxy in E2E
---
 .github/workflows/e2e-pipeline.yml                 |   6 +-
 .../prepare/InventoryRecordsCountCalculator.java   |  98 +++++++++++++++++++
 .../core/prepare/InventoryTaskSplitter.java        | 106 +++------------------
 .../general/MySQLMigrationGeneralE2EIT.java        |   2 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |   2 +-
 .../primarykey/IndexesMigrationE2EIT.java          |   3 +
 .../primarykey/MariaDBMigrationE2EIT.java          |   2 +-
 7 files changed, 120 insertions(+), 99 deletions(-)

diff --git a/.github/workflows/e2e-pipeline.yml b/.github/workflows/e2e-pipeline.yml
index d880d11c330..38b0f078ac0 100644
--- a/.github/workflows/e2e-pipeline.yml
+++ b/.github/workflows/e2e-pipeline.yml
@@ -22,8 +22,7 @@ on:
     branches: [ master, dev ]
     paths:
       - '.github/workflows/e2e-pipeline.yml'
-      - 'infra/common/src/main/**'
-      - 'infra/executor/src/main/**'
+      - 'infra/**/src/main/**'
       - 'mode/**/src/main/**'
       - 'proxy/**/src/main/**'
       - 'jdbc/core/src/main/**'
@@ -42,8 +41,7 @@ on:
     branches: [ master ]
     paths:
       - '.github/workflows/e2e-pipeline.yml'
-      - 'infra/common/src/main/**'
-      - 'infra/executor/src/main/**'
+      - 'infra/**/src/main/**'
       - 'mode/**/src/main/**'
       - 'proxy/**/src/main/**'
       - 'jdbc/core/src/main/**'
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryRecordsCountCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryRecordsCountCalculator.java
new file mode 100644
index 00000000000..facdda8e9af
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryRecordsCountCalculator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.prepare;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+/**
+ * Inventory records count calculator.
+ */
+@Slf4j
+public final class InventoryRecordsCountCalculator {
+    
+    /**
+     * Get table records count.
+     *
+     * @param dumperConfig dump configuration
+     * @param dataSource data source
+     * @return table records count
+     * @throws SplitPipelineJobByUniqueKeyException if there's exception from database
+     */
+    public static long getTableRecordsCount(final InventoryDumperConfiguration dumperConfig, final PipelineDataSourceWrapper dataSource) {
+        String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
+        String actualTableName = dumperConfig.getActualTableName();
+        PipelineSQLBuilder pipelineSQLBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, dataSource.getDatabaseType().getType());
+        Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
+        try {
+            if (sql.isPresent()) {
+                DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, dataSource.getDatabaseType().getType());
+                long result = getEstimatedCount(databaseType, dataSource, sql.get());
+                return result > 0 ? result : getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+            }
+            return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+        } catch (final SQLException ex) {
+            String uniqueKey = dumperConfig.hasUniqueKey() ? dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
+            throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), uniqueKey, ex);
+        }
+    }
+    
+    private static long getEstimatedCount(final DatabaseType databaseType, final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = connection.prepareStatement(estimatedCountSQL)) {
+            if (databaseType instanceof MySQLDatabaseType) {
+                preparedStatement.setString(1, connection.getCatalog());
+            }
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                resultSet.next();
+                return resultSet.getLong(1);
+            }
+        }
+    }
+    
+    private static long getCount(final DataSource dataSource, final String countSQL) throws SQLException {
+        long startTimeMillis = System.currentTimeMillis();
+        long result;
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = connection.prepareStatement(countSQL)) {
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                resultSet.next();
+                result = resultSet.getLong(1);
+            }
+        }
+        log.info("getCount cost {} ms, sql: {}", System.currentTimeMillis() - startTimeMillis, countSQL);
+        return result;
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 116df621efb..86b781f41ab 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -21,7 +21,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
@@ -48,11 +47,7 @@ import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -61,7 +56,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -118,7 +112,7 @@ public final class InventoryTaskSplitter {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         dumperConfig.getTableNameMap().forEach((key, value) -> {
             InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig);
-            // use original table name, for meta data loader, since some database table name case-sensitive
+            // use original table name, for metadata loader, since some database table name case-sensitive
             inventoryDumperConfig.setActualTableName(key.getOriginal());
             inventoryDumperConfig.setLogicTableName(value.getOriginal());
             inventoryDumperConfig.setPosition(new PlaceholderPosition());
@@ -129,7 +123,7 @@ public final class InventoryTaskSplitter {
     }
     
     private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
-                                                                       final DataSource dataSource) {
+                                                                       final PipelineDataSourceWrapper dataSource) {
         if (null == dumperConfig.getUniqueKeyColumns()) {
             String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
             String actualTableName = dumperConfig.getActualTableName();
@@ -141,7 +135,7 @@ public final class InventoryTaskSplitter {
         PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead();
         int batchSize = readConfig.getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = jobProcessContext.getReadRateLimitAlgorithm();
-        Collection<IngestPosition> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource);
+        Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperConfig, jobItemContext, dataSource);
         int i = 0;
         for (IngestPosition each : inventoryPositions) {
             InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
@@ -157,8 +151,8 @@ public final class InventoryTaskSplitter {
         return result;
     }
     
-    private Collection<IngestPosition> getInventoryPositions(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig,
-                                                             final DataSource dataSource) {
+    private Collection<IngestPosition> getInventoryPositions(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
+                                                             final PipelineDataSourceWrapper dataSource) {
         InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress();
         if (null != initProgress) {
             // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
@@ -167,83 +161,29 @@ public final class InventoryTaskSplitter {
                 return result;
             }
         }
+        jobItemContext.updateInventoryRecordsCount(InventoryRecordsCountCalculator.getTableRecordsCount(dumperConfig, dataSource));
         if (!dumperConfig.hasUniqueKey()) {
-            return getPositionWithoutUniqueKey(jobItemContext, dataSource, dumperConfig);
+            return Collections.singletonList(new NoUniqueKeyPosition());
         }
         List<PipelineColumnMetaData> uniqueKeyColumns = dumperConfig.getUniqueKeyColumns();
         if (1 == uniqueKeyColumns.size()) {
             int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
             if (PipelineJdbcUtils.isIntegerColumn(firstColumnDataType)) {
-                return getPositionByIntegerUniqueKeyRange(jobItemContext, dataSource, dumperConfig);
+                return getPositionByIntegerUniqueKeyRange(dumperConfig, jobItemContext, dataSource);
             }
             if (PipelineJdbcUtils.isStringColumn(firstColumnDataType)) {
-                return getPositionByStringUniqueKeyRange(jobItemContext, dataSource, dumperConfig);
+                // TODO Support string unique key table splitting. Ascii characters ordering are different in different versions of databases.
+                return Collections.singletonList(new StringPrimaryKeyPosition(null, null));
             }
         }
-        return getUnsupportedPosition(jobItemContext, dataSource, dumperConfig);
-    }
-    
-    private Collection<IngestPosition> getPositionWithoutUniqueKey(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                   final InventoryDumperConfiguration dumperConfig) {
-        long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
-        jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
-        return Collections.singletonList(new NoUniqueKeyPosition());
-    }
-    
-    private long getTableRecordsCount(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
-        PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
-        String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
-        String actualTableName = dumperConfig.getActualTableName();
-        PipelineSQLBuilder pipelineSQLBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType());
-        Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
-        try {
-            if (sql.isPresent()) {
-                DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, jobConfig.getSourceDatabaseType());
-                long result = getEstimatedCount(databaseType, dataSource, sql.get());
-                return result > 0 ? result : getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
-            }
-            return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
-        } catch (final SQLException ex) {
-            String uniqueKey = dumperConfig.hasUniqueKey() ? dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
-            throw new SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), uniqueKey, ex);
-        }
-    }
-    
-    private long getEstimatedCount(final DatabaseType databaseType, final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(estimatedCountSQL)) {
-            if (databaseType instanceof MySQLDatabaseType) {
-                preparedStatement.setString(1, connection.getCatalog());
-            }
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                resultSet.next();
-                return resultSet.getLong(1);
-            }
-        }
-    }
-    
-    private long getCount(final DataSource dataSource, final String countSQL) throws SQLException {
-        long startTimeMillis = System.currentTimeMillis();
-        long result;
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = connection.prepareStatement(countSQL)) {
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                resultSet.next();
-                result = resultSet.getLong(1);
-            }
-        }
-        log.info("getCountSQLResult cost {} ms", System.currentTimeMillis() - startTimeMillis);
-        return result;
+        return Collections.singletonList(new UnsupportedKeyPosition());
     }
     
-    private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                          final InventoryDumperConfiguration dumperConfig) {
+    private Collection<IngestPosition> getPositionByIntegerUniqueKeyRange(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
+                                                                          final PipelineDataSourceWrapper dataSource) {
         Collection<IngestPosition> result = new LinkedList<>();
-        PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
-        String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
+        String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, dataSource.getDatabaseType().getType())
                 .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey);
         int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
         try (
@@ -251,7 +191,6 @@ public final class InventoryTaskSplitter {
                 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
             // TODO query minimum value less than 0
             long beginId = 0;
-            long recordsCount = 0;
             for (int i = 0; i < Integer.MAX_VALUE; i++) {
                 preparedStatement.setLong(1, beginId);
                 preparedStatement.setLong(2, shardingSize);
@@ -260,7 +199,6 @@ public final class InventoryTaskSplitter {
                         break;
                     }
                     long endId = resultSet.getLong(1);
-                    recordsCount += resultSet.getLong(2);
                     if (0 == endId) {
                         break;
                     }
@@ -268,7 +206,6 @@ public final class InventoryTaskSplitter {
                     beginId = endId + 1;
                 }
             }
-            jobItemContext.updateInventoryRecordsCount(recordsCount);
             // fix empty table missing inventory task
             if (result.isEmpty()) {
                 result.add(new IntegerPrimaryKeyPosition(0, 0));
@@ -278,19 +215,4 @@ public final class InventoryTaskSplitter {
         }
         return result;
     }
-    
-    private Collection<IngestPosition> getPositionByStringUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
-                                                                         final InventoryDumperConfiguration dumperConfig) {
-        long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
-        jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
-        Collection<IngestPosition> result = new LinkedList<>();
-        result.add(new StringPrimaryKeyPosition(null, null));
-        return result;
-    }
-    
-    private Collection<IngestPosition> getUnsupportedPosition(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
-        long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
-        jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
-        return Collections.singletonList(new UnsupportedKeyPosition());
-    }
 }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 52703047d55..9a1325ab135 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -81,6 +81,7 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
                     new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
             containerComposer.assertProxyOrderRecordExist("t_order", 10000);
             assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH");
             String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
@@ -92,7 +93,6 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
             }
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
             containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
         }
     }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 4b79eca1492..6b4b716c1f5 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -84,6 +84,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
                     containerComposer.getDatabaseType(), 20));
             TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
+            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
             containerComposer.assertProxyOrderRecordExist(schemaTableName, 10000);
             checkOrderMigration(containerComposer, jobId);
             checkOrderItemMigration(containerComposer);
@@ -92,7 +93,6 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
             }
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
             containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, PipelineContainerComposer.SCHEMA_NAME);
         }
     }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 72d838da673..d28f37f5fed 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -167,6 +167,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
+                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
                 containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
                 return null;
             });
@@ -191,6 +192,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
+                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
                 containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
                 return null;
             });
@@ -216,6 +218,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             byte[] uniqueKey = new byte[]{-1, 0, 1};
             assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
                 insertOneOrder(containerComposer, uniqueKey);
+                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
                 // TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
                 containerComposer.assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
                 return null;
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index e726f02e360..e7dc4789f5f 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -69,11 +69,11 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
             String jobId = listJobId(containerComposer).get(0);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
             containerComposer.assertProxyOrderRecordExist("t_order", "a1");
             containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
             assertCheckMigrationSuccess(containerComposer, jobId, "CRC32_MATCH");
             commitMigrationByJobId(containerComposer, jobId);
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
             assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());