You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/15 01:50:44 UTC

[shardingsphere] branch master updated: Replace proxy data source to ShardingSphereDataSource to query, avoid "REFRESH TABLE METADATA" in process (#26340)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f7731b2b7cc Replace proxy data source to ShardingSphereDataSource to query, avoid "REFRESH TABLE METADATA" in process (#26340)
f7731b2b7cc is described below

commit f7731b2b7ccbf7bb59f78b9174eddf5e3c563671
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Thu Jun 15 09:50:27 2023 +0800

    Replace proxy data source to ShardingSphereDataSource to query, avoid "REFRESH TABLE METADATA" in process (#26340)
    
    * Replace proxy data source to ShardingSphereDataSource to query, avoid "REFRESH TABLE METADATA" in process
    
    * Wait new record with jdbc data source in PostgreSQLMigrationGeneralE2EIT
---
 .../pipeline/cases/PipelineContainerComposer.java  | 43 +++++++++++++++------
 .../general/MySQLMigrationGeneralE2EIT.java        |  7 ++--
 .../general/PostgreSQLMigrationGeneralE2EIT.java   | 12 +++---
 .../migration/general/RulesMigrationE2EIT.java     |  3 +-
 .../primarykey/IndexesMigrationE2EIT.java          | 45 +++++++++++-----------
 .../primarykey/MariaDBMigrationE2EIT.java          | 12 ++++--
 6 files changed, 73 insertions(+), 49 deletions(-)

diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 92f80e1bacf..15151dfbb98 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
@@ -378,10 +379,22 @@ public final class PipelineContainerComposer implements AutoCloseable {
      * @throws RuntimeException runtime exception
      */
     public List<Map<String, Object>> queryForListWithLog(final String sql) {
+        return queryForListWithLog(proxyDataSource, sql);
+    }
+    
+    /**
+     * Query for list with log.
+     *
+     * @param dataSource data source
+     * @param sql SQL
+     * @return query result
+     * @throws RuntimeException runtime exception
+     */
+    public List<Map<String, Object>> queryForListWithLog(final DataSource dataSource, final String sql) {
         log.info("Query SQL: {}", sql);
         int retryNumber = 0;
         while (retryNumber <= 3) {
-            try (Connection connection = proxyDataSource.getConnection()) {
+            try (Connection connection = dataSource.getConnection()) {
                 ResultSet resultSet = connection.createStatement().executeQuery(sql);
                 return transformResultSetToList(resultSet);
                 // CHECKSTYLE:OFF
@@ -457,47 +470,50 @@ public final class PipelineContainerComposer implements AutoCloseable {
     }
     
     /**
-     * Assert proxy order record exist.
+     * Assert order record exists in proxy.
      *
+     * @param dataSource data source
      * @param tableName table name
      * @param orderId order id
      */
-    public void assertProxyOrderRecordExist(final String tableName, final Object orderId) {
+    public void assertOrderRecordExist(final DataSource dataSource, final String tableName, final Object orderId) {
         String sql;
         if (orderId instanceof String) {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'", tableName, orderId);
         } else {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = %s", tableName, orderId);
         }
-        assertProxyOrderRecordExist(sql);
+        assertOrderRecordExist(dataSource, sql);
     }
     
     /**
      * Assert proxy order record exist.
      *
+     * @param dataSource data source
      * @param sql SQL
      */
-    public void assertProxyOrderRecordExist(final String sql) {
+    public void assertOrderRecordExist(final DataSource dataSource, final String sql) {
         boolean recordExist = false;
         for (int i = 0; i < 5; i++) {
-            List<Map<String, Object>> result = queryForListWithLog(sql);
+            List<Map<String, Object>> result = queryForListWithLog(dataSource, sql);
             recordExist = !result.isEmpty();
             if (recordExist) {
                 break;
             }
             Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true);
         }
-        assertTrue(recordExist, "The insert record must exist after the stop");
+        assertTrue(recordExist, "Order record does not exist");
     }
     
     /**
      * Get target table records count.
      *
+     * @param dataSource data source
      * @param tableName table name
      * @return target table records count
      */
-    public int getTargetTableRecordsCount(final String tableName) {
-        List<Map<String, Object>> targetList = queryForListWithLog("SELECT COUNT(1) AS count FROM " + tableName);
+    public int getTargetTableRecordsCount(final DataSource dataSource, final String tableName) {
+        List<Map<String, Object>> targetList = queryForListWithLog(dataSource, "SELECT COUNT(1) AS count FROM " + tableName);
         assertFalse(targetList.isEmpty());
         return ((Number) targetList.get(0).get("count")).intValue();
     }
@@ -505,12 +521,13 @@ public final class PipelineContainerComposer implements AutoCloseable {
     /**
      * Assert greater than order table init rows.
      *
+     * @param dataSource data source
      * @param tableInitRows table init rows
      * @param schema schema
      */
-    public void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) {
+    public void assertGreaterThanOrderTableInitRows(final DataSource dataSource, final int tableInitRows, final String schema) {
         String tableName = Strings.isNullOrEmpty(schema) ? "t_order" : String.format("%s.t_order", schema);
-        int recordsCount = getTargetTableRecordsCount(tableName);
+        int recordsCount = getTargetTableRecordsCount(dataSource, tableName);
         assertTrue(recordsCount > tableInitRows, "actual count " + recordsCount);
     }
     
@@ -522,8 +539,10 @@ public final class PipelineContainerComposer implements AutoCloseable {
      */
     // TODO proxy support for some fields still needs to be optimized, such as binary of MySQL, after these problems are optimized, Proxy dataSource can be used.
     public DataSource generateShardingSphereDataSourceFromProxy() throws SQLException {
-        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !getYamlRootConfig().getRules().isEmpty());
+        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
         YamlRootConfiguration rootConfig = getYamlRootConfig();
+        ShardingSpherePreconditions.checkNotNull(rootConfig.getDataSources(), () -> new IllegalStateException("dataSources is null"));
+        ShardingSpherePreconditions.checkNotNull(rootConfig.getRules(), () -> new IllegalStateException("rules is null"));
         if (PipelineEnvTypeEnum.DOCKER == PipelineE2EEnvironment.getInstance().getItEnvType()) {
             DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
             String sourceUrl = String.join(":", storageContainer.getNetworkAliases().get(0), Integer.toString(storageContainer.getExposedPort()));
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index b6e0807d441..680a1c16193 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.List;
@@ -82,8 +83,8 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
                     new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-            containerComposer.assertProxyOrderRecordExist("t_order", 10000);
+            DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
+            containerComposer.assertOrderRecordExist(jdbcDataSource, "t_order", 10000);
             assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH");
             String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
             assertMigrationSuccessById(containerComposer, orderItemJobId, "DATA_MATCH");
@@ -94,7 +95,7 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
             }
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
-            containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
+            containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
         }
     }
     
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index b80f7d33350..a8a2ea75ab2 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
+import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.List;
@@ -85,8 +86,8 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
                     containerComposer.getDatabaseType(), 20));
             TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-            containerComposer.assertProxyOrderRecordExist(schemaTableName, 10000);
+            DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
+            containerComposer.assertOrderRecordExist(jdbcDataSource, schemaTableName, 10000);
             checkOrderMigration(containerComposer, jobId);
             checkOrderItemMigration(containerComposer);
             for (String each : listJobId(containerComposer)) {
@@ -94,7 +95,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
             }
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
-            containerComposer.assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, PipelineContainerComposer.SCHEMA_NAME);
+            containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, PipelineContainerComposer.SCHEMA_NAME);
         }
     }
     
@@ -105,9 +106,8 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
         containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')",
                 String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
         startMigrationByJobId(containerComposer, jobId);
-        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
-                String.format("SELECT * FROM %s WHERE order_id = %s", String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId)).isEmpty());
-        containerComposer.assertProxyOrderRecordExist(String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
+        DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
+        containerComposer.assertOrderRecordExist(jdbcDataSource, String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
     
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 00048c42375..7ded08bd0e6 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -89,8 +89,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
         commitMigrationByJobId(containerComposer, jobId);
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
+        assertThat(containerComposer.getTargetTableRecordsCount(containerComposer.getProxyDataSource(), SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
     }
     
     private static boolean isEnabled() {
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 56ab40e2583..ae62f923a2a 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -22,6 +22,7 @@ import org.apache.commons.codec.binary.Hex;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
@@ -37,11 +38,12 @@ import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.function.Consumer;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -88,7 +90,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
             // TODO PostgreSQL update delete events not support if table without unique keys at increment task.
-            final Callable<Void> incrementalTaskFn = () -> {
+            final Consumer<DataSource> incrementalTaskFn = dataSource -> {
                 Object orderId = keyGenerateAlgorithm.generateKey();
                 insertOneOrder(containerComposer, orderId);
                 if (containerComposer.getDatabaseType() instanceof MySQLDatabaseType) {
@@ -96,7 +98,6 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
                     deleteOneOrder(containerComposer, orderId, "updated");
                     insertOneOrder(containerComposer, keyGenerateAlgorithm.generateKey());
                 }
-                return null;
             };
             assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
         }
@@ -110,7 +111,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         deleteOneOrder(containerComposer, orderId, updatedStatus);
     }
     
-    private void insertOneOrder(final PipelineContainerComposer containerComposer, final Object uniqueKey) throws SQLException {
+    private void insertOneOrder(final PipelineContainerComposer containerComposer, final Object uniqueKey) {
         try (
                 Connection connection = containerComposer.getSourceDataSource().getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
@@ -119,10 +120,12 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             preparedStatement.setObject(3, "OK");
             int actualCount = preparedStatement.executeUpdate();
             assertThat(actualCount, is(1));
+        } catch (final SQLException ex) {
+            throw new SQLWrapperException(ex);
         }
     }
     
-    private void updateOneOrder(final PipelineContainerComposer containerComposer, final Object uniqueKey, final String updatedStatus) throws SQLException {
+    private void updateOneOrder(final PipelineContainerComposer containerComposer, final Object uniqueKey, final String updatedStatus) {
         try (
                 Connection connection = containerComposer.getSourceDataSource().getConnection();
                 PreparedStatement preparedStatement = connection
@@ -133,10 +136,12 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             preparedStatement.setObject(4, "OK");
             int actualCount = preparedStatement.executeUpdate();
             assertThat(actualCount, is(1));
+        } catch (final SQLException ex) {
+            throw new SQLWrapperException(ex);
         }
     }
     
-    private void deleteOneOrder(final PipelineContainerComposer containerComposer, final Object uniqueKey, final String updatedStatus) throws SQLException {
+    private void deleteOneOrder(final PipelineContainerComposer containerComposer, final Object uniqueKey, final String updatedStatus) {
         try (
                 Connection connection = containerComposer.getSourceDataSource().getConnection();
                 PreparedStatement preparedStatement = connection
@@ -146,6 +151,8 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             preparedStatement.setObject(3, updatedStatus);
             int actualCount = preparedStatement.executeUpdate();
             assertThat(actualCount, is(1));
+        } catch (final SQLException ex) {
+            throw new SQLWrapperException(ex);
         }
     }
     
@@ -164,12 +171,10 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
-            assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+            assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-                containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
-                return null;
+                containerComposer.assertOrderRecordExist(dataSource, "t_order", uniqueKey);
             });
         }
     }
@@ -189,12 +194,10 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
-            assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+            assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-                containerComposer.assertProxyOrderRecordExist("t_order", uniqueKey);
-                return null;
+                containerComposer.assertOrderRecordExist(dataSource, "t_order", uniqueKey);
             });
         }
     }
@@ -216,18 +219,16 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
             // TODO Insert binary string in VARBINARY column. But KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is not Comparable
             byte[] uniqueKey = new byte[]{-1, 0, 1};
-            assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
+            assertMigrationSuccess(containerComposer, sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
-                containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
                 // TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
-                containerComposer.assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
-                return null;
+                containerComposer.assertOrderRecordExist(dataSource, String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
             });
         }
     }
     
     private void assertMigrationSuccess(final PipelineContainerComposer containerComposer, final String sqlPattern, final String shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
-                                        final String consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws Exception {
+                                        final String consistencyCheckAlgorithmType, final Consumer<DataSource> incrementalTaskFn) throws Exception {
         containerComposer.sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_NAME));
         try (Connection connection = containerComposer.getSourceDataSource().getConnection()) {
             PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, SOURCE_TABLE_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
@@ -240,14 +241,14 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         startMigration(containerComposer, SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
         String jobId = listJobId(containerComposer).get(0);
         containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        incrementalTaskFn.call();
+        DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
+        incrementalTaskFn.accept(jdbcDataSource);
         containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         if (null != consistencyCheckAlgorithmType) {
             assertCheckMigrationSuccess(containerComposer, jobId, consistencyCheckAlgorithmType);
         }
         commitMigrationByJobId(containerComposer, jobId);
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
+        assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
         List<String> lastJobIds = listJobId(containerComposer);
         assertTrue(lastJobIds.isEmpty());
     }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 972f60576b7..5936fa974f7 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -30,10 +30,12 @@ import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
@@ -42,6 +44,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+// TODO Use MariaDB docker image
+@Disabled
 @PipelineE2ESettings(fetchSingle = true, database = @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/common/none.xml"))
 @Slf4j
 class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
@@ -53,7 +57,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
-    void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
+    void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             containerComposer.sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_NAME));
@@ -70,12 +74,12 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
             String jobId = listJobId(containerComposer).get(0);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
-            containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-            containerComposer.assertProxyOrderRecordExist("t_order", "a1");
+            DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
+            containerComposer.assertOrderRecordExist(jdbcDataSource, "t_order", "a1");
             containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
             assertCheckMigrationSuccess(containerComposer, jobId, "CRC32_MATCH");
             commitMigrationByJobId(containerComposer, jobId);
-            assertThat(containerComposer.getTargetTableRecordsCount(SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
+            assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
             List<String> lastJobIds = listJobId(containerComposer);
             assertTrue(lastJobIds.isEmpty());
         }