You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2023/03/14 15:45:13 UTC

[shardingsphere] branch master updated: Remove PipelineContainerComposer.getTargetTableOrderName() (#24601)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 75d2e307e96 Remove PipelineContainerComposer.getTargetTableOrderName() (#24601)
75d2e307e96 is described below

commit 75d2e307e963eb14e9fb2c0bff7fef96c37d9ceb
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Mar 14 23:44:54 2023 +0800

    Remove PipelineContainerComposer.getTargetTableOrderName() (#24601)
    
    * Remove PipelineContainerComposer.getTargetTableOrderName()
---
 .../pipeline/cases/PipelineContainerComposer.java  | 120 +++++++++------------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |   2 +-
 .../cases/migration/AbstractMigrationE2EIT.java    |   8 +-
 .../general/MySQLMigrationGeneralE2EIT.java        |   4 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |   4 +-
 .../migration/general/RulesMigrationE2EIT.java     |   4 +-
 .../primarykey/IndexesMigrationE2EIT.java          |   4 +-
 .../primarykey/MariaDBMigrationE2EIT.java          |   4 +-
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   |   4 +-
 9 files changed, 75 insertions(+), 79 deletions(-)

diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 2ded13bc995..cb6ec673ffc 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -79,8 +79,6 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     public static final String SCHEMA_NAME = "test";
     
-    public static final String PROXY_DATABASE = "sharding_db";
-    
     public static final String DS_0 = "pipeline_it_0";
     
     public static final String DS_1 = "pipeline_it_1";
@@ -93,6 +91,8 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     public static final int TABLE_INIT_ROW_COUNT = 3000;
     
+    private static final String PROXY_DATABASE = "sharding_db";
+    
     private final BaseContainerComposer containerComposer;
     
     private final ExtraSQLCommand extraSQLCommand;
@@ -103,9 +103,9 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     private final String password;
     
-    private DataSource sourceDataSource;
+    private final DataSource sourceDataSource;
     
-    private DataSource proxyDataSource;
+    private final DataSource proxyDataSource;
     
     private Thread increaseTaskThread;
     
@@ -124,19 +124,15 @@ public final class PipelineContainerComposer implements AutoCloseable {
         }
         extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineContainerComposer.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
         containerComposer.start();
+        sourceDataSource = StorageContainerUtil.generateDataSource(appendExtraParameter(getActualJdbcUrlTemplate(DS_0, false)), username, password);
+        proxyDataSource = StorageContainerUtil.generateDataSource(
+                appendExtraParameter(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
         init(jobType);
     }
     
     @SneakyThrows(SQLException.class)
     private void init(final JobType jobType) {
-        sourceDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(getActualJdbcUrlTemplate(DS_0, false)), username, password);
-        proxyDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)),
-                ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
-        String defaultDatabaseName = "";
-        if (DatabaseTypeUtil.isPostgreSQL(databaseType) || DatabaseTypeUtil.isOpenGauss(databaseType)) {
-            defaultDatabaseName = "postgres";
-        }
-        String jdbcUrl = containerComposer.getProxyJdbcUrl(defaultDatabaseName);
+        String jdbcUrl = containerComposer.getProxyJdbcUrl(DatabaseTypeUtil.isPostgreSQL(databaseType) || DatabaseTypeUtil.isOpenGauss(databaseType) ? "postgres" : "");
         try (Connection connection = DriverManager.getConnection(jdbcUrl, ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
             cleanUpPipelineJobs(connection, jobType);
             cleanUpProxyDatabase(connection);
@@ -145,21 +141,30 @@ public final class PipelineContainerComposer implements AutoCloseable {
         cleanUpDataSource();
     }
     
-    /**
-     * Append extra parameter.
-     * 
-     * @param jdbcUrl JDBC URL
-     * @return appended JDBC URL
-     */
-    public String appendExtraParam(final String jdbcUrl) {
-        String result = jdbcUrl;
-        if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
-            result = new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("rewriteBatchedStatements", Boolean.TRUE.toString())));
+    private void cleanUpPipelineJobs(final Connection connection, final JobType jobType) throws SQLException {
+        if (PipelineEnvTypeEnum.NATIVE != PipelineE2EEnvironment.getInstance().getItEnvType()) {
+            return;
+        }
+        String jobTypeName = jobType.getTypeName();
+        for (Map<String, Object> each : queryJobs(connection, jobTypeName)) {
+            String jobId = each.get("id").toString();
+            Map<String, Object> jobInfo = queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, jobId)).get(0);
+            String status = jobInfo.get("status").toString();
+            if (JobStatus.FINISHED.name().equals(status)) {
+                connection.createStatement().execute(String.format("COMMIT %s '%s'", jobTypeName, jobId));
+            } else {
+                connection.createStatement().execute(String.format("ROLLBACK %s '%s'", jobTypeName, jobId));
+            }
         }
-        if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType()) || DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
-            result = new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified")));
+    }
+    
+    private List<Map<String, Object>> queryJobs(final Connection connection, final String jobTypeName) {
+        try {
+            return transformResultSetToList(connection.createStatement().executeQuery(String.format("SHOW %s LIST", jobTypeName)));
+        } catch (final SQLException ex) {
+            log.warn("{} execute failed, message {}", String.format("SHOW %s LIST", jobTypeName), ex.getMessage());
+            return Collections.emptyList();
         }
-        return result;
     }
     
     private void cleanUpProxyDatabase(final Connection connection) {
@@ -174,32 +179,11 @@ public final class PipelineContainerComposer implements AutoCloseable {
         }
     }
     
-    private void cleanUpPipelineJobs(final Connection connection, final JobType jobType) throws SQLException {
-        if (PipelineEnvTypeEnum.NATIVE != PipelineE2EEnvironment.getInstance().getItEnvType()) {
-            return;
-        }
-        String jobTypeName = jobType.getTypeName();
-        List<Map<String, Object>> jobList;
-        try {
-            ResultSet resultSet = connection.createStatement().executeQuery(String.format("SHOW %s LIST", jobTypeName));
-            jobList = transformResultSetToList(resultSet);
-        } catch (final SQLException ex) {
-            log.warn("{} execute failed, message {}", String.format("SHOW %s LIST", jobTypeName), ex.getMessage());
-            return;
-        }
-        if (jobList.isEmpty()) {
-            return;
-        }
-        for (Map<String, Object> each : jobList) {
-            String jobId = each.get("id").toString();
-            Map<String, Object> jobInfo = queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, jobId)).get(0);
-            String status = jobInfo.get("status").toString();
-            if (JobStatus.FINISHED.name().equals(status)) {
-                connection.createStatement().execute(String.format("COMMIT %s '%s'", jobTypeName, jobId));
-            } else {
-                connection.createStatement().execute(String.format("ROLLBACK %s '%s'", jobTypeName, jobId));
-            }
-        }
+    private void createProxyDatabase(final Connection connection) throws SQLException {
+        String sql = String.format("CREATE DATABASE %s", PROXY_DATABASE);
+        log.info("Create proxy database {}", PROXY_DATABASE);
+        connection.createStatement().execute(sql);
+        ThreadUtil.sleep(2, TimeUnit.SECONDS);
     }
     
     private void cleanUpDataSource() {
@@ -211,11 +195,20 @@ public final class PipelineContainerComposer implements AutoCloseable {
         }
     }
     
-    private void createProxyDatabase(final Connection connection) throws SQLException {
-        String sql = String.format("CREATE DATABASE %s", PROXY_DATABASE);
-        log.info("create proxy database {}", PROXY_DATABASE);
-        connection.createStatement().execute(sql);
-        ThreadUtil.sleep(2, TimeUnit.SECONDS);
+    /**
+     * Append extra parameter.
+     * 
+     * @param jdbcUrl JDBC URL
+     * @return appended JDBC URL
+     */
+    public String appendExtraParameter(final String jdbcUrl) {
+        if (DatabaseTypeUtil.isMySQL(databaseType)) {
+            return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("rewriteBatchedStatements", Boolean.TRUE.toString())));
+        }
+        if (DatabaseTypeUtil.isPostgreSQL(databaseType) || DatabaseTypeUtil.isOpenGauss(databaseType)) {
+            return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified")));
+        }
+        return jdbcUrl;
     }
     
     /**
@@ -228,7 +221,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
         String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} ( URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", storageUnitName)
                 .replace("${user}", getUsername())
                 .replace("${password}", getPassword())
-                .replace("${url}", appendExtraParam(getActualJdbcUrlTemplate(storageUnitName, true)));
+                .replace("${url}", appendExtraParameter(getActualJdbcUrlTemplate(storageUnitName, true)));
         proxyExecuteWithLog(registerStorageUnitTemplate, 2);
     }
     
@@ -272,15 +265,6 @@ public final class PipelineContainerComposer implements AutoCloseable {
         return getActualJdbcUrlTemplate(databaseName, isInContainer, 0);
     }
     
-    /**
-     * Get target table order name.
-     * 
-     * @return target table order name
-     */
-    public String getTargetTableOrderName() {
-        return "t_order";
-    }
-    
     /**
      * Create schema.
      * 
@@ -399,12 +383,12 @@ public final class PipelineContainerComposer implements AutoCloseable {
                 ResultSet resultSet = connection.createStatement().executeQuery(sql);
                 return transformResultSetToList(resultSet);
             } catch (final SQLException ex) {
-                log.error("data access error", ex);
+                log.error("Data access error.", ex);
             }
             ThreadUtil.sleep(3, TimeUnit.SECONDS);
             retryNumber++;
         }
-        throw new RuntimeException("can't get result from proxy");
+        throw new RuntimeException("Can not get result from proxy.");
     }
     
     /**
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 2d673cbf4f0..6326cfc5abf 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -186,7 +186,7 @@ public final class CDCE2EIT {
     }
     
     private void startCDCClient() {
-        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter(containerComposer.appendExtraParam(
+        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter(containerComposer.appendExtraParameter(
                 containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false, 0)), containerComposer.getUsername(), containerComposer.getPassword());
         StartCDCClientParameter parameter = new StartCDCClientParameter(importDataSourceParam);
         parameter.setAddress("localhost");
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 1990027d006..cd8cf6e4685 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -73,16 +73,16 @@ public abstract class AbstractMigrationE2EIT {
         }
         String addSourceResource = migrationDistSQLCommand.getRegisterMigrationSourceStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
                 .replace("${password}", containerComposer.getPassword())
-                .replace("${ds0}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, true)));
+                .replace("${ds0}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, true)));
         containerComposer.addResource(addSourceResource);
     }
     
     protected void addMigrationTargetResource() throws SQLException {
         String addTargetResource = migrationDistSQLCommand.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
                 .replace("${password}", containerComposer.getPassword())
-                .replace("${ds2}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, true)))
-                .replace("${ds3}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true)))
-                .replace("${ds4}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true)));
+                .replace("${ds2}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, true)))
+                .replace("${ds3}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true)))
+                .replace("${ds4}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true)));
         containerComposer.addResource(addTargetResource);
         List<Map<String, Object>> resources = containerComposer.queryForListWithLog("SHOW STORAGE UNITS from sharding_db");
         assertThat(resources.size(), is(3));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 837137243e9..9b762db9da6 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -54,6 +54,8 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
     
+    private static final String TARGET_TABLE_ORDER_NAME = "t_order";
+    
     public MySQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
         super(testParam, new MigrationJobType());
     }
@@ -86,7 +88,7 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
         DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_ORDER_NAME), dataPair.getLeft());
         DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
         log.info("init data end: {}", LocalDateTime.now());
-        startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
+        startMigration(SOURCE_TABLE_ORDER_NAME, TARGET_TABLE_ORDER_NAME);
         startMigration("t_order_item", "t_order_item");
         String orderJobId = getJobIdByTableName("ds_0." + SOURCE_TABLE_ORDER_NAME);
         getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 6f425f4f4a3..031ba2f4103 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -52,6 +52,8 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
     
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
     
+    private static final String TARGET_TABLE_ORDER_NAME = "t_order";
+    
     public PostgreSQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
         super(testParam, new MigrationJobType());
     }
@@ -115,7 +117,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
         startMigrationByJobId(jobId);
         // must refresh firstly, otherwise proxy can't get schema and table info
         getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
-        getContainerComposer().assertProxyOrderRecordExist(String.join(".", PipelineContainerComposer.SCHEMA_NAME, getContainerComposer().getTargetTableOrderName()), recordId);
+        getContainerComposer().assertProxyOrderRecordExist(String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_ORDER_NAME), recordId);
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
     }
     
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 0e5d24adc19..0ac8a5d5126 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -52,6 +52,8 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
     
+    private static final String TARGET_TABLE_ORDER_NAME = "t_order";
+    
     public RulesMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam, new MigrationJobType());
     }
@@ -93,7 +95,7 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
         if (null != addRuleFn) {
             addRuleFn.call();
         }
-        startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
+        startMigration(SOURCE_TABLE_ORDER_NAME, TARGET_TABLE_ORDER_NAME);
         String jobId = listJobId().get(0);
         getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 90bae9da72f..421f91f2f9b 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -67,6 +67,8 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
     
+    private static final String TARGET_TABLE_ORDER_NAME = "t_order";
+    
     public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam, new MigrationJobType());
     }
@@ -191,7 +193,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         addMigrationSourceResource();
         addMigrationTargetResource();
         getContainerComposer().proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT, shardingColumn), 2);
-        startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
+        startMigration(SOURCE_TABLE_ORDER_NAME, TARGET_TABLE_ORDER_NAME);
         String jobId = listJobId().get(0);
         getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         incrementalTaskFn.call();
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index c327c3f1c5b..3913483748a 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -49,6 +49,8 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
     
+    private static final String TARGET_TABLE_ORDER_NAME = "t_order";
+    
     public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam, new MigrationJobType());
     }
@@ -80,7 +82,7 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
         addMigrationSourceResource();
         addMigrationTargetResource();
         createTargetOrderTableRule();
-        startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
+        startMigration(SOURCE_TABLE_ORDER_NAME, TARGET_TABLE_ORDER_NAME);
         String jobId = listJobId().get(0);
         getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         getContainerComposer().sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index c2b652ce7fa..d257521895d 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -47,6 +47,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Slf4j
 public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     
+    private static final String TARGET_TABLE_ORDER_NAME = "t_order";
+    
     public TextPrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam, new MigrationJobType());
     }
@@ -80,7 +82,7 @@ public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
         addMigrationSourceResource();
         addMigrationTargetResource();
         createTargetOrderTableRule();
-        startMigration(getSourceTableOrderName(), getContainerComposer().getTargetTableOrderName());
+        startMigration(getSourceTableOrderName(), TARGET_TABLE_ORDER_NAME);
         String jobId = listJobId().get(0);
         getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(), "1000000000", 1, "afterStop"));
         getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));