You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ji...@apache.org on 2023/03/13 16:23:40 UTC

[shardingsphere] branch master updated: Add PipelineContainerComposer (#24582)

This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 94062ee1c2d Add PipelineContainerComposer (#24582)
94062ee1c2d is described below

commit 94062ee1c2d4e0b9d66f8e9d23996feb816168b8
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Mar 14 00:23:21 2023 +0800

    Add PipelineContainerComposer (#24582)
    
    * Add PipelineContainerComposer
    
    * Add PipelineContainerComposer
---
 ...seE2EIT.java => PipelineContainerComposer.java} | 280 ++++++++++++++++-----
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 100 ++++----
 .../cases/migration/AbstractMigrationE2EIT.java    |  77 +++---
 .../general/MySQLMigrationGeneralE2EIT.java        |  44 ++--
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  64 +++--
 .../migration/general/RulesMigrationE2EIT.java     |  32 ++-
 .../primarykey/IndexesMigrationE2EIT.java          |  56 ++---
 .../primarykey/MariaDBMigrationE2EIT.java          |  36 ++-
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   |  42 ++--
 9 files changed, 432 insertions(+), 299 deletions(-)

diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
similarity index 67%
rename from test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
rename to test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 5ae6eef95e1..90e9398f1ba 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.e2e.data.pipeline.cases.base;
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases;
 
 import com.google.common.base.Strings;
-import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -29,6 +28,7 @@ import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
 import org.apache.shardingsphere.test.e2e.data.pipeline.command.ExtraSQLCommand;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
@@ -36,7 +36,6 @@ import org.apache.shardingsphere.test.e2e.data.pipeline.framework.container.comp
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.container.compose.DockerContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.container.compose.NativeContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.apache.shardingsphere.test.e2e.data.pipeline.framework.watcher.PipelineWatcher;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.storage.DockerStorageContainer;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
@@ -44,7 +43,6 @@ import org.apache.shardingsphere.test.e2e.env.container.atomic.util.StorageConta
 import org.apache.shardingsphere.test.e2e.env.runtime.DataSourceEnvironment;
 import org.apache.shardingsphere.test.util.PropertiesBuilder;
 import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
-import org.junit.Rule;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import javax.sql.DataSource;
@@ -71,33 +69,28 @@ import java.util.stream.Collectors;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@Getter(AccessLevel.PROTECTED)
+/**
+ * Pipeline container composer.
+ */
+@Getter
 @Slf4j
-public abstract class PipelineBaseE2EIT {
-    
-    protected static final PipelineE2EEnvironment ENV = PipelineE2EEnvironment.getInstance();
-    
-    protected static final String SCHEMA_NAME = "test";
-    
-    protected static final String PROXY_DATABASE = "sharding_db";
+public final class PipelineContainerComposer implements AutoCloseable {
     
-    protected static final String DS_0 = "pipeline_it_0";
+    public static final String SCHEMA_NAME = "test";
     
-    protected static final String DS_1 = "pipeline_it_1";
+    public static final String PROXY_DATABASE = "sharding_db";
     
-    protected static final String DS_2 = "pipeline_it_2";
+    public static final String DS_0 = "pipeline_it_0";
     
-    protected static final String DS_3 = "pipeline_it_3";
+    public static final String DS_1 = "pipeline_it_1";
     
-    protected static final String DS_4 = "pipeline_it_4";
+    public static final String DS_2 = "pipeline_it_2";
     
-    protected static final int TABLE_INIT_ROW_COUNT = 3000;
+    public static final String DS_3 = "pipeline_it_3";
     
-    private static final String REGISTER_STORAGE_UNIT_SQL = "REGISTER STORAGE UNIT ${ds} ( URL='${url}', USER='${user}', PASSWORD='${password}')";
+    public static final String DS_4 = "pipeline_it_4";
     
-    @Rule
-    @Getter(AccessLevel.NONE)
-    public PipelineWatcher pipelineWatcher;
+    public static final int TABLE_INIT_ROW_COUNT = 3000;
     
     private final BaseContainerComposer containerComposer;
     
@@ -115,24 +108,31 @@ public abstract class PipelineBaseE2EIT {
     
     private Thread increaseTaskThread;
     
-    public PipelineBaseE2EIT(final PipelineTestParameter testParam) {
+    public PipelineContainerComposer(final PipelineTestParameter testParam) {
         databaseType = testParam.getDatabaseType();
-        containerComposer = ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER
+        containerComposer = PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.DOCKER
                 ? new DockerContainerComposer(testParam.getDatabaseType(), testParam.getStorageContainerImage(), testParam.getStorageContainerCount())
                 : new NativeContainerComposer(testParam.getDatabaseType());
-        if (ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER) {
+        if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.DOCKER) {
             DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
             username = storageContainer.getUsername();
             password = storageContainer.getPassword();
         } else {
-            username = ENV.getActualDataSourceUsername(databaseType);
-            password = ENV.getActualDataSourcePassword(databaseType);
+            username = PipelineE2EEnvironment.getInstance().getActualDataSourceUsername(databaseType);
+            password = PipelineE2EEnvironment.getInstance().getActualDataSourcePassword(databaseType);
         }
-        extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineBaseE2EIT.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
-        pipelineWatcher = new PipelineWatcher(containerComposer);
+        extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineContainerComposer.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
+        containerComposer.start();
     }
     
-    protected void initEnvironment(final DatabaseType databaseType, final JobType jobType) throws SQLException {
+    /**
+     * Initialize environment.
+     * 
+     * @param databaseType database type
+     * @param jobType job type
+     * @throws SQLException SQL exception
+     */
+    public void initEnvironment(final DatabaseType databaseType, final JobType jobType) throws SQLException {
         sourceDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(getActualJdbcUrlTemplate(DS_0, false)), username, password);
         proxyDataSource = StorageContainerUtil.generateDataSource(appendExtraParam(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)),
                 ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
@@ -149,7 +149,13 @@ public abstract class PipelineBaseE2EIT {
         cleanUpDataSource();
     }
     
-    protected String appendExtraParam(final String jdbcUrl) {
+    /**
+     * Append extra parameter.
+     * 
+     * @param jdbcUrl JDBC URL
+     * @return appended JDBC URL
+     */
+    public String appendExtraParam(final String jdbcUrl) {
         String result = jdbcUrl;
         if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
             result = new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("rewriteBatchedStatements", Boolean.TRUE.toString())));
@@ -161,7 +167,7 @@ public abstract class PipelineBaseE2EIT {
     }
     
     private void cleanUpProxyDatabase(final Connection connection) {
-        if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+        if (PipelineEnvTypeEnum.NATIVE != PipelineE2EEnvironment.getInstance().getItEnvType()) {
             return;
         }
         try {
@@ -173,7 +179,7 @@ public abstract class PipelineBaseE2EIT {
     }
     
     private void cleanUpPipelineJobs(final Connection connection, final JobType jobType) throws SQLException {
-        if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+        if (PipelineEnvTypeEnum.NATIVE != PipelineE2EEnvironment.getInstance().getItEnvType()) {
             return;
         }
         String jobTypeName = jobType.getTypeName();
@@ -201,7 +207,7 @@ public abstract class PipelineBaseE2EIT {
     }
     
     private void cleanUpDataSource() {
-        if (PipelineEnvTypeEnum.NATIVE != ENV.getItEnvType()) {
+        if (PipelineEnvTypeEnum.NATIVE != PipelineE2EEnvironment.getInstance().getItEnvType()) {
             return;
         }
         for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
@@ -216,40 +222,77 @@ public abstract class PipelineBaseE2EIT {
         ThreadUtil.sleep(2, TimeUnit.SECONDS);
     }
     
-    protected void registerStorageUnit(final String storageUnitName) throws SQLException {
-        String registerStorageUnitTemplate = REGISTER_STORAGE_UNIT_SQL.replace("${ds}", storageUnitName)
+    /**
+     * Register storage unit.
+     * 
+     * @param storageUnitName storage unit name
+     * @throws SQLException SQL exception
+     */
+    public void registerStorageUnit(final String storageUnitName) throws SQLException {
+        String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} ( URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", storageUnitName)
                 .replace("${user}", getUsername())
                 .replace("${password}", getPassword())
                 .replace("${url}", appendExtraParam(getActualJdbcUrlTemplate(storageUnitName, true)));
         proxyExecuteWithLog(registerStorageUnitTemplate, 2);
     }
     
+    /**
+     * Add resource.
+     * 
+     * @param distSQL dist SQL
+     * @throws SQLException SQL exception
+     */
     // TODO Use registerStorageUnit instead, and remove the method
-    protected void addResource(final String distSQL) throws SQLException {
+    public void addResource(final String distSQL) throws SQLException {
         proxyExecuteWithLog(distSQL, 2);
     }
     
-    protected String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer, final int storageContainerIndex) {
-        if (PipelineEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
+    /**
+     * Get actual JDBC URL template.
+     * 
+     * @param databaseName database name
+     * @param isInContainer is in container
+     * @param storageContainerIndex storage container index
+     * @return actual JDBC URL template
+     */
+    public String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer, final int storageContainerIndex) {
+        if (PipelineEnvTypeEnum.DOCKER == PipelineE2EEnvironment.getInstance().getItEnvType()) {
             DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(storageContainerIndex);
             return isInContainer
                     ? DataSourceEnvironment.getURL(getDatabaseType(), storageContainer.getNetworkAliases().get(0), storageContainer.getExposedPort(), databaseName)
                     : storageContainer.getJdbcUrl(databaseName);
         }
-        return DataSourceEnvironment.getURL(getDatabaseType(), "127.0.0.1", ENV.getActualDataSourceDefaultPort(databaseType), databaseName);
+        return DataSourceEnvironment.getURL(getDatabaseType(), "127.0.0.1", PipelineE2EEnvironment.getInstance().getActualDataSourceDefaultPort(databaseType), databaseName);
     }
     
-    protected String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer) {
+    /**
+     * Get actual JDBC URL template.
+     * 
+     * @param databaseName database name
+     * @param isInContainer is in container
+     * @return actual JDBC URL template
+     */
+    public String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer) {
         return getActualJdbcUrlTemplate(databaseName, isInContainer, 0);
     }
     
-    protected abstract String getSourceTableOrderName();
-    
-    protected String getTargetTableOrderName() {
+    /**
+     * Get target table order name.
+     * 
+     * @return target table order name
+     */
+    public String getTargetTableOrderName() {
         return "t_order";
     }
     
-    protected void createSchema(final Connection connection, final int sleepSeconds) throws SQLException {
+    /**
+     * Create schema.
+     * 
+     * @param connection connection
+     * @param sleepSeconds sleep seconds
+     * @throws SQLException SQL exception
+     */
+    public void createSchema(final Connection connection, final int sleepSeconds) throws SQLException {
         if (!getDatabaseType().isSchemaAvailable()) {
             return;
         }
@@ -259,34 +302,72 @@ public abstract class PipelineBaseE2EIT {
         }
     }
     
-    protected void createSourceOrderTable() throws SQLException {
-        sourceExecuteWithLog(getExtraSQLCommand().getCreateTableOrder(getSourceTableOrderName()));
+    /**
+     * Create source order table.
+     * 
+     * @param sourceTableOrderName source table order name 
+     * @throws SQLException SQL exception
+     */
+    public void createSourceOrderTable(final String sourceTableOrderName) throws SQLException {
+        sourceExecuteWithLog(getExtraSQLCommand().getCreateTableOrder(sourceTableOrderName));
     }
     
-    protected void createSourceTableIndexList(final String schema) throws SQLException {
+    /**
+     * Create source table index list.
+     * 
+     * @param schema schema
+     * @param sourceTableOrderName source table order name
+     * @throws SQLException SQL exception
+     */
+    public void createSourceTableIndexList(final String schema, final String sourceTableOrderName) throws SQLException {
         if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
-            sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS idx_user_id ON %s.%s ( user_id )", schema, getSourceTableOrderName()));
+            sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS idx_user_id ON %s.%s ( user_id )", schema, sourceTableOrderName));
         } else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
-            sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON %s.%s ( user_id )", schema, getSourceTableOrderName()));
+            sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON %s.%s ( user_id )", schema, sourceTableOrderName));
         }
     }
     
-    protected void createSourceCommentOnList(final String schema) throws SQLException {
-        sourceExecuteWithLog(String.format("COMMENT ON COLUMN %s.%s.user_id IS 'user id'", schema, getSourceTableOrderName()));
+    /**
+     * Create source comment on list.
+     * 
+     * @param schema schema
+     * @param sourceTableOrderName source table order name
+     * @throws SQLException SQL exception
+     */
+    public void createSourceCommentOnList(final String schema, final String sourceTableOrderName) throws SQLException {
+        sourceExecuteWithLog(String.format("COMMENT ON COLUMN %s.%s.user_id IS 'user id'", schema, sourceTableOrderName));
     }
     
-    protected void createSourceOrderItemTable() throws SQLException {
+    /**
+     * Create source order item table.
+     * 
+     * @throws SQLException SQL exception
+     */
+    public void createSourceOrderItemTable() throws SQLException {
         sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
     }
     
-    protected void sourceExecuteWithLog(final String sql) throws SQLException {
+    /**
+     * Source execute with log.
+     * 
+     * @param sql SQL
+     * @throws SQLException SQL exception
+     */
+    public void sourceExecuteWithLog(final String sql) throws SQLException {
         log.info("source execute :{}", sql);
         try (Connection connection = sourceDataSource.getConnection()) {
             connection.createStatement().execute(sql);
         }
     }
     
-    protected void proxyExecuteWithLog(final String sql, final int sleepSeconds) throws SQLException {
+    /**
+     * Proxy execute with log.
+     * 
+     * @param sql SQL
+     * @param sleepSeconds sleep seconds
+     * @throws SQLException SQL exception
+     */
+    public void proxyExecuteWithLog(final String sql, final int sleepSeconds) throws SQLException {
         log.info("proxy execute :{}", sql);
         try (Connection connection = proxyDataSource.getConnection()) {
             connection.createStatement().execute(sql);
@@ -294,7 +375,12 @@ public abstract class PipelineBaseE2EIT {
         ThreadUtil.sleep(Math.max(sleepSeconds, 0), TimeUnit.SECONDS);
     }
     
-    protected void waitJobPrepareSuccess(final String distSQL) {
+    /**
+     * Wait job prepare success.
+     * 
+     * @param distSQL dist SQL
+     */
+    public void waitJobPrepareSuccess(final String distSQL) {
         for (int i = 0; i < 5; i++) {
             List<Map<String, Object>> jobStatus = queryForListWithLog(distSQL);
             Set<String> statusSet = jobStatus.stream().map(each -> String.valueOf(each.get("status"))).collect(Collectors.toSet());
@@ -304,7 +390,13 @@ public abstract class PipelineBaseE2EIT {
         }
     }
     
-    protected List<Map<String, Object>> queryForListWithLog(final String sql) {
+    /**
+     * Query for list with log.
+     * 
+     * @param sql SQL
+     * @return query result
+     */
+    public List<Map<String, Object>> queryForListWithLog(final String sql) {
         int retryNumber = 0;
         while (retryNumber <= 3) {
             try (Connection connection = proxyDataSource.getConnection()) {
@@ -319,7 +411,14 @@ public abstract class PipelineBaseE2EIT {
         throw new RuntimeException("can't get result from proxy");
     }
     
-    protected List<Map<String, Object>> transformResultSetToList(final ResultSet resultSet) throws SQLException {
+    /**
+     * Transform result set to list.
+     * 
+     * @param resultSet result set
+     * @return transformed result
+     * @throws SQLException SQL exception
+     */
+    public List<Map<String, Object>> transformResultSetToList(final ResultSet resultSet) throws SQLException {
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         int columns = resultSetMetaData.getColumnCount();
         List<Map<String, Object>> result = new ArrayList<>();
@@ -333,13 +432,25 @@ public abstract class PipelineBaseE2EIT {
         return result;
     }
     
-    protected void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
+    /**
+     * Start increment task.
+     * 
+     * @param baseIncrementTask base increment task
+     */
+    public void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
         increaseTaskThread = new Thread(baseIncrementTask);
         increaseTaskThread.start();
     }
     
+    /**
+     * Wait increment task finished.
+     * 
+     * @param distSQL dist SQL
+     * @return result
+     * @throws InterruptedException interrupted exception
+     */
     // TODO use DAO to query via DistSQL
-    protected List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) throws InterruptedException {
+    public List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) throws InterruptedException {
         if (null != increaseTaskThread) {
             TimeUnit.SECONDS.timedJoin(increaseTaskThread, 30);
         }
@@ -368,7 +479,13 @@ public abstract class PipelineBaseE2EIT {
         return Collections.emptyList();
     }
     
-    protected void assertProxyOrderRecordExist(final String tableName, final Object orderId) {
+    /**
+     * Assert proxy order record exist.
+     * 
+     * @param tableName table name
+     * @param orderId order id
+     */
+    public void assertProxyOrderRecordExist(final String tableName, final Object orderId) {
         String sql;
         if (orderId instanceof String) {
             sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'", tableName, orderId);
@@ -378,7 +495,12 @@ public abstract class PipelineBaseE2EIT {
         assertProxyOrderRecordExist(sql);
     }
     
-    protected void assertProxyOrderRecordExist(final String sql) {
+    /**
+     * Assert proxy order record exist.
+     * 
+     * @param sql SQL
+     */
+    public void assertProxyOrderRecordExist(final String sql) {
         boolean recordExist = false;
         for (int i = 0; i < 5; i++) {
             List<Map<String, Object>> result = queryForListWithLog(sql);
@@ -391,23 +513,41 @@ public abstract class PipelineBaseE2EIT {
         assertTrue(recordExist, "The insert record must exist after the stop");
     }
     
-    protected int getTargetTableRecordsCount(final String tableName) {
+    /**
+     * Get target table records count.
+     * 
+     * @param tableName table name
+     * @return target table records count
+     */
+    public int getTargetTableRecordsCount(final String tableName) {
         List<Map<String, Object>> targetList = queryForListWithLog("SELECT COUNT(1) AS count FROM " + tableName);
         assertFalse(targetList.isEmpty());
         return ((Number) targetList.get(0).get("count")).intValue();
     }
     
-    protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) {
+    /**
+     * Assert greater than order table init rows.
+     * 
+     * @param tableInitRows table init rows
+     * @param schema schema
+     */
+    public void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) {
         String tableName = Strings.isNullOrEmpty(schema) ? "t_order" : String.format("%s.t_order", schema);
         int recordsCount = getTargetTableRecordsCount(tableName);
         assertTrue(recordsCount > tableInitRows, "actual count " + recordsCount);
     }
     
+    /**
+     * Generate ShardingSphere data source from proxy.
+     * 
+     * @return ShardingSphere data source
+     * @throws SQLException SQL exception
+     */
     // TODO proxy support for some fields still needs to be optimized, such as binary of MySQL, after these problems are optimized, Proxy dataSource can be used.
-    protected DataSource generateShardingSphereDataSourceFromProxy() throws SQLException {
+    public DataSource generateShardingSphereDataSourceFromProxy() throws SQLException {
         Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !getYamlRootConfig().getRules().isEmpty());
         YamlRootConfiguration rootConfig = getYamlRootConfig();
-        if (PipelineEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
+        if (PipelineEnvTypeEnum.DOCKER == PipelineE2EEnvironment.getInstance().getItEnvType()) {
             DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
             String sourceUrl = String.join(":", storageContainer.getNetworkAliases().get(0), Integer.toString(storageContainer.getExposedPort()));
             String targetUrl = String.join(":", storageContainer.getHost(), Integer.toString(storageContainer.getMappedPort()));
@@ -422,7 +562,11 @@ public abstract class PipelineBaseE2EIT {
     }
     
     private YamlRootConfiguration getYamlRootConfig() {
-        String result = queryForListWithLog("EXPORT DATABASE CONFIGURATION").get(0).get("result").toString();
-        return YamlEngine.unmarshal(result, YamlRootConfiguration.class);
+        return YamlEngine.unmarshal(queryForListWithLog("EXPORT DATABASE CONFIGURATION").get(0).get("result").toString(), YamlRootConfiguration.class);
+    }
+    
+    @Override
+    public void close() {
+        containerComposer.stop();
     }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 9cf6cd9164d..436bf08d945 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -39,14 +39,16 @@ import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.util.StorageContainerUtil;
+import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -74,87 +76,93 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * MySQL CDC E2E IT.
+ * CDC E2E IT.
  */
 @RunWith(Parameterized.class)
 @Slf4j
-public final class CDCE2EIT extends PipelineBaseE2EIT {
+public final class CDCE2EIT {
     
     private static final String CREATE_SHARDING_RULE_SQL = String.format("CREATE SHARDING TABLE RULE t_order("
             + "STORAGE_UNITS(%s,%s),"
             + "SHARDING_COLUMN=user_id,"
             + "TYPE(NAME='hash_mod',PROPERTIES('sharding-count'='4')),"
             + "KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME='snowflake'))"
-            + ")", DS_0, DS_1);
+            + ")", PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1);
+    
+    private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
+    
+    private final PipelineContainerComposer containerComposer;
     
     private final ExecutorService executor = Executors.newSingleThreadExecutor();
     
     public CDCE2EIT(final PipelineTestParameter testParam) {
-        super(testParam);
+        containerComposer = new PipelineContainerComposer(testParam);
     }
     
     @Parameters(name = "{0}")
     public static Collection<PipelineTestParameter> getTestParameters() {
         Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+        if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
         MySQLDatabaseType mysqlDatabaseType = new MySQLDatabaseType();
-        for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(mysqlDatabaseType)) {
+        for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(mysqlDatabaseType)) {
             result.add(new PipelineTestParameter(mysqlDatabaseType, each, "env/scenario/general/mysql.xml"));
         }
         OpenGaussDatabaseType openGaussDatabaseType = new OpenGaussDatabaseType();
-        for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(openGaussDatabaseType)) {
+        for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(openGaussDatabaseType)) {
             result.add(new PipelineTestParameter(openGaussDatabaseType, each, "env/scenario/general/postgresql.xml"));
         }
         return result;
     }
     
-    @Override
-    protected String getSourceTableOrderName() {
-        return "t_order";
+    @After
+    public void tearDown() {
+        containerComposer.close();
     }
     
     @Test
     public void assertCDCDataImportSuccess() throws SQLException, InterruptedException {
         // make sure the program time zone same with the database server at CI.
         TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-        initEnvironment(getDatabaseType(), new CDCJobType());
-        for (String each : Arrays.asList(DS_0, DS_1)) {
-            registerStorageUnit(each);
+        containerComposer.initEnvironment(containerComposer.getDatabaseType(), new CDCJobType());
+        for (String each : Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)) {
+            containerComposer.registerStorageUnit(each);
         }
         createOrderTableRule();
-        try (Connection connection = getProxyDataSource().getConnection()) {
+        try (Connection connection = containerComposer.getProxyDataSource().getConnection()) {
             initSchemaAndTable(connection, 2);
         }
-        DataSource jdbcDataSource = generateShardingSphereDataSourceFromProxy();
-        Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(getDatabaseType(), 20);
+        DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
+        Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), 20);
         log.info("init data begin: {}", LocalDateTime.now());
-        DataSourceExecuteUtil.execute(jdbcDataSource, getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
+        DataSourceExecuteUtil.execute(jdbcDataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_ORDER_NAME), dataPair.getLeft());
         log.info("init data end: {}", LocalDateTime.now());
-        try (Connection connection = DriverManager.getConnection(getActualJdbcUrlTemplate(DS_4, false), getUsername(), getPassword())) {
+        try (Connection connection = DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
+                containerComposer.getUsername(), containerComposer.getPassword())) {
             initSchemaAndTable(connection, 0);
         }
         startCDCClient();
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !queryForListWithLog("SHOW STREAMING LIST").isEmpty());
-        String jobId = queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
-        waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
-        startIncrementTask(new E2EIncrementalTask(jdbcDataSource, getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 20));
-        getIncreaseTaskThread().join(10000);
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
+        String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
+        containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
+        containerComposer.startIncrementTask(new E2EIncrementalTask(jdbcDataSource, SOURCE_TABLE_ORDER_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
+        containerComposer.getIncreaseTaskThread().join(10000L);
         List<Map<String, Object>> actualProxyList;
         try (Connection connection = jdbcDataSource.getConnection()) {
             ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", getOrderTableNameWithSchema()));
-            actualProxyList = transformResultSetToList(resultSet);
+            actualProxyList = containerComposer.transformResultSetToList(resultSet);
         }
         Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2, TimeUnit.SECONDS).until(() -> listOrderRecords(getOrderTableNameWithSchema()).size() == actualProxyList.size());
-        SchemaTableName schemaTableName = getDatabaseType().isSchemaAvailable()
-                ? new SchemaTableName(new SchemaName(PipelineBaseE2EIT.SCHEMA_NAME), new TableName(getSourceTableOrderName()))
-                : new SchemaTableName(new SchemaName(null), new TableName(getSourceTableOrderName()));
-        PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(getActualJdbcUrlTemplate(DS_4, false), getUsername(), getPassword()),
-                getDatabaseType());
-        PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, getDatabaseType());
+        SchemaTableName schemaTableName = containerComposer.getDatabaseType().isSchemaAvailable()
+                ? new SchemaTableName(new SchemaName(PipelineContainerComposer.SCHEMA_NAME), new TableName(SOURCE_TABLE_ORDER_NAME))
+                : new SchemaTableName(new SchemaName(null), new TableName(SOURCE_TABLE_ORDER_NAME));
+        PipelineDataSourceWrapper targetDataSource = new PipelineDataSourceWrapper(StorageContainerUtil.generateDataSource(
+                containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
+                containerComposer.getUsername(), containerComposer.getPassword()), containerComposer.getDatabaseType());
+        PipelineDataSourceWrapper sourceDataSource = new PipelineDataSourceWrapper(jdbcDataSource, containerComposer.getDatabaseType());
         StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(targetDataSource);
-        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(PipelineBaseE2EIT.SCHEMA_NAME, "t_order");
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(PipelineContainerComposer.SCHEMA_NAME, "t_order");
         PipelineColumnMetaData primaryKeyMetaData = tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
         ConsistencyCheckJobItemProgressContext progressContext = new ConsistencyCheckJobItemProgressContext("", 0);
         SingleTableInventoryDataConsistencyChecker checker = new SingleTableInventoryDataConsistencyChecker("", sourceDataSource, targetDataSource, schemaTableName, schemaTableName,
@@ -164,12 +172,12 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
     }
     
     private void createOrderTableRule() throws SQLException {
-        proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
+        containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
     }
     
     private void initSchemaAndTable(final Connection connection, final int sleepSeconds) throws SQLException {
-        createSchema(connection, sleepSeconds);
-        String sql = getExtraSQLCommand().getCreateTableOrder(getSourceTableOrderName());
+        containerComposer.createSchema(connection, sleepSeconds);
+        String sql = containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_ORDER_NAME);
         log.info("create table sql: {}", sql);
         connection.createStatement().execute(sql);
         if (sleepSeconds > 0) {
@@ -178,18 +186,19 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
     }
     
     private void startCDCClient() {
-        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter(appendExtraParam(getActualJdbcUrlTemplate(DS_4, false, 0)), getUsername(), getPassword());
+        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter(containerComposer.appendExtraParam(
+                containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false, 0)), containerComposer.getUsername(), containerComposer.getPassword());
         StartCDCClientParameter parameter = new StartCDCClientParameter(importDataSourceParam);
         parameter.setAddress("localhost");
-        parameter.setPort(getContainerComposer().getProxyCDCPort());
+        parameter.setPort(containerComposer.getContainerComposer().getProxyCDCPort());
         parameter.setUsername(ProxyContainerConstants.USERNAME);
         parameter.setPassword(ProxyContainerConstants.PASSWORD);
         parameter.setDatabase("sharding_db");
         // TODO add full=false test case later
         parameter.setFull(true);
-        String schema = getDatabaseType().isSchemaAvailable() ? "test" : "";
-        parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable(getSourceTableOrderName()).setSchema(schema).build()));
-        parameter.setDatabaseType(getDatabaseType().getType());
+        String schema = containerComposer.getDatabaseType().isSchemaAvailable() ? "test" : "";
+        parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_ORDER_NAME).setSchema(schema).build()));
+        parameter.setDatabaseType(containerComposer.getDatabaseType().getType());
         CompletableFuture.runAsync(() -> new CDCClient(parameter).start(), executor).whenComplete((unused, throwable) -> {
             if (null != throwable) {
                 log.error("cdc client sync failed, ", throwable);
@@ -198,17 +207,14 @@ public final class CDCE2EIT extends PipelineBaseE2EIT {
     }
     
     private List<Map<String, Object>> listOrderRecords(final String tableNameWithSchema) throws SQLException {
-        try (Connection connection = DriverManager.getConnection(getActualJdbcUrlTemplate(DS_4, false), getUsername(), getPassword())) {
+        try (Connection connection = DriverManager.getConnection(
+                containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false), containerComposer.getUsername(), containerComposer.getPassword())) {
             ResultSet resultSet = connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER BY order_id ASC", tableNameWithSchema));
-            return transformResultSetToList(resultSet);
+            return containerComposer.transformResultSetToList(resultSet);
         }
     }
     
     private String getOrderTableNameWithSchema() {
-        if (getDatabaseType().isSchemaAvailable()) {
-            return String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName());
-        } else {
-            return getSourceTableOrderName();
-        }
+        return containerComposer.getDatabaseType().isSchemaAvailable() ? String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME) : SOURCE_TABLE_ORDER_NAME;
     }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index eb1fae5cb3f..39b5d7003c1 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -21,11 +21,13 @@ import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
+import org.junit.After;
 import org.opengauss.util.PSQLException;
 
 import javax.xml.bind.JAXB;
@@ -44,48 +46,55 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Getter
 @Slf4j
-public abstract class AbstractMigrationE2EIT extends PipelineBaseE2EIT {
+public abstract class AbstractMigrationE2EIT {
     
     private final MigrationDistSQLCommand migrationDistSQLCommand;
     
+    private final PipelineContainerComposer containerComposer;
+    
     public AbstractMigrationE2EIT(final PipelineTestParameter testParam) {
-        super(testParam);
-        migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineBaseE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")), MigrationDistSQLCommand.class);
+        containerComposer = new PipelineContainerComposer(testParam);
+        migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(AbstractMigrationE2EIT.class.getClassLoader().getResource("env/common/migration-command.xml")), MigrationDistSQLCommand.class);
+    }
+    
+    @After
+    public final void tearDown() {
+        containerComposer.close();
     }
     
     protected void addMigrationSourceResource() throws SQLException {
-        if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
+        if (PipelineEnvTypeEnum.NATIVE == PipelineE2EEnvironment.getInstance().getItEnvType()) {
             try {
-                proxyExecuteWithLog("UNREGISTER MIGRATION SOURCE STORAGE UNIT ds_0", 2);
+                containerComposer.proxyExecuteWithLog("UNREGISTER MIGRATION SOURCE STORAGE UNIT ds_0", 2);
             } catch (final SQLException ex) {
                 log.warn("Drop sharding_db failed, maybe it's not exist. error msg={}", ex.getMessage());
             }
         }
-        String addSourceResource = migrationDistSQLCommand.getRegisterMigrationSourceStorageUnitTemplate().replace("${user}", getUsername())
-                .replace("${password}", getPassword())
-                .replace("${ds0}", appendExtraParam(getActualJdbcUrlTemplate(DS_0, true)));
-        addResource(addSourceResource);
+        String addSourceResource = migrationDistSQLCommand.getRegisterMigrationSourceStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
+                .replace("${password}", containerComposer.getPassword())
+                .replace("${ds0}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, true)));
+        containerComposer.addResource(addSourceResource);
     }
     
     protected void addMigrationTargetResource() throws SQLException {
-        String addTargetResource = migrationDistSQLCommand.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}", getUsername())
-                .replace("${password}", getPassword())
-                .replace("${ds2}", appendExtraParam(getActualJdbcUrlTemplate(DS_2, true)))
-                .replace("${ds3}", appendExtraParam(getActualJdbcUrlTemplate(DS_3, true)))
-                .replace("${ds4}", appendExtraParam(getActualJdbcUrlTemplate(DS_4, true)));
-        addResource(addTargetResource);
-        List<Map<String, Object>> resources = queryForListWithLog("SHOW STORAGE UNITS from sharding_db");
+        String addTargetResource = migrationDistSQLCommand.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
+                .replace("${password}", containerComposer.getPassword())
+                .replace("${ds2}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, true)))
+                .replace("${ds3}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true)))
+                .replace("${ds4}", containerComposer.appendExtraParam(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true)));
+        containerComposer.addResource(addTargetResource);
+        List<Map<String, Object>> resources = containerComposer.queryForListWithLog("SHOW STORAGE UNITS from sharding_db");
         assertThat(resources.size(), is(3));
     }
     
     protected void createSourceSchema(final String schemaName) throws SQLException {
-        if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
-            sourceExecuteWithLog(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
+        if (DatabaseTypeUtil.isPostgreSQL(containerComposer.getDatabaseType())) {
+            containerComposer.sourceExecuteWithLog(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
             return;
         }
-        if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
+        if (DatabaseTypeUtil.isOpenGauss(containerComposer.getDatabaseType())) {
             try {
-                sourceExecuteWithLog(String.format("CREATE SCHEMA %s", schemaName));
+                containerComposer.sourceExecuteWithLog(String.format("CREATE SCHEMA %s", schemaName));
             } catch (final SQLException ex) {
                 // only used for native mode.
                 if (ex instanceof PSQLException && "42P06".equals(ex.getSQLState())) {
@@ -98,57 +107,57 @@ public abstract class AbstractMigrationE2EIT extends PipelineBaseE2EIT {
     }
     
     protected void createTargetOrderTableRule() throws SQLException {
-        proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 2);
+        containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 2);
     }
     
     protected void createTargetOrderTableEncryptRule() throws SQLException {
-        proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableEncryptRule(), 2);
+        containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableEncryptRule(), 2);
     }
     
     protected void createTargetOrderItemTableRule() throws SQLException {
-        proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(), 2);
+        containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(), 2);
     }
     
     protected void startMigration(final String sourceTableName, final String targetTableName) throws SQLException {
-        proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName, targetTableName), 5);
+        containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTable(sourceTableName, targetTableName), 5);
     }
     
     protected void startMigrationWithSchema(final String sourceTableName, final String targetTableName) throws SQLException {
-        proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName, targetTableName), 5);
+        containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getMigrationSingleTableWithSchema(sourceTableName, targetTableName), 5);
     }
     
     protected void addMigrationProcessConfig() throws SQLException {
-        proxyExecuteWithLog(migrationDistSQLCommand.getAlterMigrationRule(), 0);
+        containerComposer.proxyExecuteWithLog(migrationDistSQLCommand.getAlterMigrationRule(), 0);
     }
     
     protected void stopMigrationByJobId(final String jobId) throws SQLException {
-        proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
+        containerComposer.proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
     }
     
     protected void startMigrationByJobId(final String jobId) throws SQLException {
-        proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
+        containerComposer.proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
     }
     
     protected void commitMigrationByJobId(final String jobId) throws SQLException {
-        proxyExecuteWithLog(String.format("COMMIT MIGRATION '%s'", jobId), 1);
+        containerComposer.proxyExecuteWithLog(String.format("COMMIT MIGRATION '%s'", jobId), 1);
     }
     
     protected List<String> listJobId() {
-        List<Map<String, Object>> jobList = queryForListWithLog("SHOW MIGRATION LIST");
+        List<Map<String, Object>> jobList = containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
         return jobList.stream().map(a -> a.get("id").toString()).collect(Collectors.toList());
     }
     
     protected String getJobIdByTableName(final String tableName) {
-        List<Map<String, Object>> jobList = queryForListWithLog("SHOW MIGRATION LIST");
+        List<Map<String, Object>> jobList = containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
         return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find " + tableName + " table")).get("id").toString();
     }
     
     protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) throws SQLException {
-        proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
+        containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
         // TODO Need to add after the stop then to start, can continue the consistency check from the previous progress
         List<Map<String, Object>> resultList = Collections.emptyList();
         for (int i = 0; i < 10; i++) {
-            resultList = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
+            resultList = containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
             if (resultList.isEmpty()) {
                 ThreadUtil.sleep(3, TimeUnit.SECONDS);
                 continue;
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index a3a685f991c..fc25f0bb2ac 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -23,9 +23,10 @@ import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -51,53 +52,47 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Slf4j
 public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
-    private final PipelineTestParameter testParam;
+    private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
     
     public MySQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
-        this.testParam = testParam;
     }
     
     @Parameters(name = "{0}")
     public static Collection<PipelineTestParameter> getTestParameters() {
         Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+        if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
         MySQLDatabaseType databaseType = new MySQLDatabaseType();
-        for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(databaseType)) {
+        for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(databaseType)) {
             result.add(new PipelineTestParameter(databaseType, each, "env/scenario/general/mysql.xml"));
         }
         return result;
     }
     
-    @Override
-    protected String getSourceTableOrderName() {
-        return "t_order_copy";
-    }
-    
     @Test
     public void assertMigrationSuccess() throws SQLException, InterruptedException {
-        log.info("assertMigrationSuccess testParam:{}", testParam);
-        initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
+        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         addMigrationProcessConfig();
-        createSourceOrderTable();
-        createSourceOrderItemTable();
+        getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
+        getContainerComposer().createSourceOrderItemTable();
         addMigrationSourceResource();
         addMigrationTargetResource();
         createTargetOrderTableRule();
         createTargetOrderTableEncryptRule();
         createTargetOrderItemTableRule();
-        Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(testParam.getDatabaseType(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+        Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(getContainerComposer().getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         log.info("init data begin: {}", LocalDateTime.now());
-        DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
-        DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+        DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_ORDER_NAME), dataPair.getLeft());
+        DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
         log.info("init data end: {}", LocalDateTime.now());
-        startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+        startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
         startMigration("t_order_item", "t_order_item");
-        String orderJobId = getJobIdByTableName("ds_0." + getSourceTableOrderName());
-        waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
-        startIncrementTask(new E2EIncrementalTask(getSourceDataSource(), getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 30));
+        String orderJobId = getJobIdByTableName("ds_0." + SOURCE_TABLE_ORDER_NAME);
+        getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
+        getContainerComposer().startIncrementTask(
+                new E2EIncrementalTask(getContainerComposer().getSourceDataSource(), SOURCE_TABLE_ORDER_NAME, new SnowflakeKeyGenerateAlgorithm(), getContainerComposer().getDatabaseType(), 30));
         assertMigrationSuccessById(orderJobId, "DATA_MATCH");
         String orderItemJobId = getJobIdByTableName("ds_0.t_order_item");
         assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
@@ -108,13 +103,12 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
         }
         List<String> lastJobIds = listJobId();
         assertTrue(lastJobIds.isEmpty());
-        proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
-        assertGreaterThanOrderTableInitRows(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT, "");
-        log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
+        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
+        getContainerComposer().assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
     }
     
     private void assertMigrationSuccessById(final String jobId, final String algorithmType) throws SQLException, InterruptedException {
-        List<Map<String, Object>> jobStatus = waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        List<Map<String, Object>> jobStatus = getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         for (Map<String, Object> each : jobStatus) {
             assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
             assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()), is(100));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 2b2c0ea2fa2..118778126bb 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -23,9 +23,10 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -49,28 +50,22 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Slf4j
 public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
-    private final PipelineTestParameter testParam;
+    private static final String SOURCE_TABLE_ORDER_NAME = "t_order_copy";
     
     public PostgreSQLMigrationGeneralE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
-        this.testParam = testParam;
-    }
-    
-    @Override
-    protected String getSourceTableOrderName() {
-        return "t_order_copy";
     }
     
     @Parameters(name = "{0}")
     public static Collection<PipelineTestParameter> getTestParameters() {
         Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+        if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
-        for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType())) {
+        for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType())) {
             result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), each, "env/scenario/general/postgresql.xml"));
         }
-        for (String each : PipelineBaseE2EIT.ENV.listStorageContainerImages(new OpenGaussDatabaseType())) {
+        for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new OpenGaussDatabaseType())) {
             result.add(new PipelineTestParameter(new OpenGaussDatabaseType(), each, "env/scenario/general/postgresql.xml"));
         }
         return result;
@@ -78,29 +73,29 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
     
     @Test
     public void assertMigrationSuccess() throws SQLException, InterruptedException {
-        log.info("assertMigrationSuccess testParam:{}", testParam);
-        initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
+        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         addMigrationProcessConfig();
-        createSourceSchema(PipelineBaseE2EIT.SCHEMA_NAME);
-        createSourceOrderTable();
-        createSourceOrderItemTable();
-        createSourceTableIndexList(PipelineBaseE2EIT.SCHEMA_NAME);
-        createSourceCommentOnList(PipelineBaseE2EIT.SCHEMA_NAME);
+        createSourceSchema(PipelineContainerComposer.SCHEMA_NAME);
+        getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
+        getContainerComposer().createSourceOrderItemTable();
+        getContainerComposer().createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME);
+        getContainerComposer().createSourceCommentOnList(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME);
         addMigrationSourceResource();
         addMigrationTargetResource();
         createTargetOrderTableRule();
         createTargetOrderItemTableRule();
-        Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(testParam.getDatabaseType(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+        Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(getContainerComposer().getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         log.info("init data begin: {}", LocalDateTime.now());
-        DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
-        DataSourceExecuteUtil.execute(getSourceDataSource(), getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+        DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_ORDER_NAME), dataPair.getLeft());
+        DataSourceExecuteUtil.execute(getContainerComposer().getSourceDataSource(), getContainerComposer().getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
         log.info("init data end: {}", LocalDateTime.now());
-        startMigrationWithSchema(getSourceTableOrderName(), "t_order");
+        startMigrationWithSchema(SOURCE_TABLE_ORDER_NAME, "t_order");
         Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> listJobId().size() > 0);
-        String jobId = getJobIdByTableName("ds_0.test." + getSourceTableOrderName());
-        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        startIncrementTask(new E2EIncrementalTask(getSourceDataSource(), String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName()),
-                new SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 20));
+        String jobId = getJobIdByTableName("ds_0.test." + SOURCE_TABLE_ORDER_NAME);
+        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().startIncrementTask(new E2EIncrementalTask(
+                getContainerComposer().getSourceDataSource(), String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME),
+                new SnowflakeKeyGenerateAlgorithm(), getContainerComposer().getDatabaseType(), 20));
         checkOrderMigration(jobId);
         checkOrderItemMigration();
         for (String each : listJobId()) {
@@ -108,28 +103,27 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
         }
         List<String> lastJobIds = listJobId();
         assertTrue(lastJobIds.isEmpty());
-        proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
-        assertGreaterThanOrderTableInitRows(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1, PipelineBaseE2EIT.SCHEMA_NAME);
-        log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
+        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
+        getContainerComposer().assertGreaterThanOrderTableInitRows(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, PipelineContainerComposer.SCHEMA_NAME);
     }
     
     private void checkOrderMigration(final String jobId) throws SQLException, InterruptedException {
-        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(jobId);
         long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
-        sourceExecuteWithLog(
-                String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getSourceTableOrderName()), recordId, 1, "afterStop"));
+        getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')",
+                String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_ORDER_NAME), recordId, 1, "afterStop"));
         startMigrationByJobId(jobId);
         // must refresh firstly, otherwise proxy can't get schema and table info
-        proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
-        assertProxyOrderRecordExist(String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getTargetTableOrderName()), recordId);
+        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
+        getContainerComposer().assertProxyOrderRecordExist(String.join(".", PipelineContainerComposer.SCHEMA_NAME, getContainerComposer().getTargetTableOrderName()), recordId);
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
     }
     
     private void checkOrderItemMigration() throws SQLException, InterruptedException {
         startMigrationWithSchema("t_order_item", "t_order_item");
         String jobId = getJobIdByTableName("ds_0.test.t_order_item");
-        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
     }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 06154ae35d4..bd28b68bed1 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -21,8 +21,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -49,6 +50,8 @@ import static org.hamcrest.Matchers.is;
 @Slf4j
 public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     
+    private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
+    
     public RulesMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
     }
@@ -56,10 +59,10 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     @Parameters(name = "{0}")
     public static Collection<PipelineTestParameter> getTestParameters() {
         Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+        if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
-        List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+        List<String> versions = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
         if (versions.isEmpty()) {
             return result;
         }
@@ -67,11 +70,6 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
         return result;
     }
     
-    @Override
-    protected String getSourceTableOrderName() {
-        return "t_order";
-    }
-    
     @Test
     public void assertNoRuleMigrationSuccess() throws Exception {
         assertMigrationSuccess(null);
@@ -86,23 +84,23 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     }
     
     private void assertMigrationSuccess(final Callable<Void> addRuleFn) throws Exception {
-        initEnvironment(getDatabaseType(), new MigrationJobType());
-        createSourceOrderTable();
-        try (Connection connection = getSourceDataSource().getConnection()) {
-            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, new UUIDKeyGenerateAlgorithm(), getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
+        getContainerComposer().createSourceOrderTable(SOURCE_TABLE_ORDER_NAME);
+        try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
+            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, new UUIDKeyGenerateAlgorithm(), SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         }
         addMigrationSourceResource();
         addMigrationTargetResource();
         if (null != addRuleFn) {
             addRuleFn.call();
         }
-        startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+        startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
         String jobId = listJobId().get(0);
-        waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         commitMigrationByJobId(jobId);
-        proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT));
+        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+        assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_ORDER_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
     }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 31c6f0edaff..fdef91f9a62 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -25,8 +25,9 @@ import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseT
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -64,6 +65,8 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             + "TYPE(NAME=\"hash_mod\",PROPERTIES(\"sharding-count\"=\"6\"))\n"
             + ");";
     
+    private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
+    
     public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
     }
@@ -71,34 +74,29 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     @Parameters(name = "{0}")
     public static Collection<PipelineTestParameter> getTestParameters() {
         Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+        if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
-        List<String> mysqlVersion = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+        List<String> mysqlVersion = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
         if (!mysqlVersion.isEmpty()) {
             result.add(new PipelineTestParameter(new MySQLDatabaseType(), mysqlVersion.get(0), "env/common/none.xml"));
         }
-        List<String> postgresqlVersion = PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType());
+        List<String> postgresqlVersion = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType());
         if (!postgresqlVersion.isEmpty()) {
             result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), postgresqlVersion.get(0), "env/common/none.xml"));
         }
         return result;
     }
     
-    @Override
-    protected String getSourceTableOrderName() {
-        return "t_order";
-    }
-    
     @Test
     public void assertNoUniqueKeyMigrationSuccess() throws Exception {
         String sql;
         String consistencyCheckAlgorithmType;
-        if (getDatabaseType() instanceof MySQLDatabaseType) {
+        if (getContainerComposer().getDatabaseType() instanceof MySQLDatabaseType) {
             sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             // DATA_MATCH doesn't supported, could not order by records
             consistencyCheckAlgorithmType = "CRC32_MATCH";
-        } else if (getDatabaseType() instanceof PostgreSQLDatabaseType) {
+        } else if (getContainerComposer().getDatabaseType() instanceof PostgreSQLDatabaseType) {
             sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id int NOT NULL,status varchar(255) NULL)";
             consistencyCheckAlgorithmType = null;
         } else {
@@ -108,13 +106,13 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         Object uniqueKey = keyGenerateAlgorithm.generateKey();
         assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
             insertOneOrder(uniqueKey);
-            assertProxyOrderRecordExist("t_order", uniqueKey);
+            getContainerComposer().assertProxyOrderRecordExist("t_order", uniqueKey);
             return null;
         });
     }
     
     private void insertOneOrder(final Object uniqueKey) throws SQLException {
-        try (PreparedStatement preparedStatement = getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
+        try (PreparedStatement preparedStatement = getContainerComposer().getSourceDataSource().getConnection().prepareStatement("INSERT INTO t_order (order_id,user_id,status) VALUES (?,?,?)")) {
             preparedStatement.setObject(1, uniqueKey);
             preparedStatement.setObject(2, 1);
             preparedStatement.setObject(3, "OK");
@@ -127,7 +125,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     public void assertMultiPrimaryKeyMigrationSuccess() throws Exception {
         String sql;
         String consistencyCheckAlgorithmType;
-        if (getDatabaseType() instanceof MySQLDatabaseType) {
+        if (getContainerComposer().getDatabaseType() instanceof MySQLDatabaseType) {
             sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             consistencyCheckAlgorithmType = "CRC32_MATCH";
         } else {
@@ -137,7 +135,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         Object uniqueKey = keyGenerateAlgorithm.generateKey();
         assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
             insertOneOrder(uniqueKey);
-            assertProxyOrderRecordExist("t_order", uniqueKey);
+            getContainerComposer().assertProxyOrderRecordExist("t_order", uniqueKey);
             return null;
         });
     }
@@ -146,7 +144,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     public void assertMultiUniqueKeyMigrationSuccess() throws Exception {
         String sql;
         String consistencyCheckAlgorithmType;
-        if (getDatabaseType() instanceof MySQLDatabaseType) {
+        if (getContainerComposer().getDatabaseType() instanceof MySQLDatabaseType) {
             sql = "CREATE TABLE `%s` (`order_id` BIGINT NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), UNIQUE KEY (`order_id`,`user_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             consistencyCheckAlgorithmType = "DATA_MATCH";
         } else {
@@ -156,7 +154,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         Object uniqueKey = keyGenerateAlgorithm.generateKey();
         assertMigrationSuccess(sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
             insertOneOrder(uniqueKey);
-            assertProxyOrderRecordExist("t_order", uniqueKey);
+            getContainerComposer().assertProxyOrderRecordExist("t_order", uniqueKey);
             return null;
         });
     }
@@ -165,7 +163,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess() throws Exception {
         String sql;
         String consistencyCheckAlgorithmType;
-        if (getDatabaseType() instanceof MySQLDatabaseType) {
+        if (getContainerComposer().getDatabaseType() instanceof MySQLDatabaseType) {
             sql = "CREATE TABLE `%s` (`order_id` VARBINARY(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             // DATA_MATCH doesn't supported: Order by value must implements Comparable
             consistencyCheckAlgorithmType = "CRC32_MATCH";
@@ -178,33 +176,33 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         assertMigrationSuccess(sql, "order_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, () -> {
             insertOneOrder(uniqueKey);
             // TODO Select by byte[] from proxy doesn't work, so unhex function is used for now
-            assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
+            getContainerComposer().assertProxyOrderRecordExist(String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", Hex.encodeHexString(uniqueKey)));
             return null;
         });
     }
     
     private void assertMigrationSuccess(final String sqlPattern, final String shardingColumn, final KeyGenerateAlgorithm keyGenerateAlgorithm,
                                         final String consistencyCheckAlgorithmType, final Callable<Void> incrementalTaskFn) throws Exception {
-        initEnvironment(getDatabaseType(), new MigrationJobType());
-        sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
-        try (Connection connection = getSourceDataSource().getConnection()) {
-            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, getSourceTableOrderName(), TABLE_INIT_ROW_COUNT);
+        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
+        getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_ORDER_NAME));
+        try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
+            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         }
         addMigrationProcessConfig();
         addMigrationSourceResource();
         addMigrationTargetResource();
-        proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT, shardingColumn), 2);
-        startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+        getContainerComposer().proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT, shardingColumn), 2);
+        startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
         String jobId = listJobId().get(0);
-        waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         incrementalTaskFn.call();
-        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         if (null != consistencyCheckAlgorithmType) {
             assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
         }
         commitMigrationByJobId(jobId);
-        proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(TABLE_INIT_ROW_COUNT + 1));
+        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+        assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_ORDER_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
         List<String> lastJobIds = listJobId();
         assertTrue(lastJobIds.isEmpty());
     }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index ca97ada559b..3dc39456ea6 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -22,8 +22,9 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -46,6 +47,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Slf4j
 public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     
+    private static final String SOURCE_TABLE_ORDER_NAME = "t_order";
+    
     public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
     }
@@ -53,10 +56,10 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     @Parameters(name = "{0}")
     public static Collection<PipelineTestParameter> getTestParameters() {
         Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+        if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
-        List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+        List<String> versions = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
         if (versions.isEmpty()) {
             return result;
         }
@@ -65,34 +68,29 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
         return result;
     }
     
-    @Override
-    protected String getSourceTableOrderName() {
-        return "t_order";
-    }
-    
     @Test
     public void assertMigrationSuccess() throws SQLException, InterruptedException {
-        initEnvironment(getDatabaseType(), new MigrationJobType());
+        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
         String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
-        sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
-        try (Connection connection = getSourceDataSource().getConnection()) {
+        getContainerComposer().sourceExecuteWithLog(String.format(sqlPattern, SOURCE_TABLE_ORDER_NAME));
+        try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
             KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
-            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, SOURCE_TABLE_ORDER_NAME, PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         }
         addMigrationProcessConfig();
         addMigrationSourceResource();
         addMigrationTargetResource();
         createTargetOrderTableRule();
-        startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+        startMigration(SOURCE_TABLE_ORDER_NAME, getContainerComposer().getTargetTableOrderName());
         String jobId = listJobId().get(0);
-        waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
-        assertProxyOrderRecordExist("t_order", "a1");
-        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+        getContainerComposer().assertProxyOrderRecordExist("t_order", "a1");
+        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
         commitMigrationByJobId(jobId);
-        proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));
+        getContainerComposer().proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+        assertThat(getContainerComposer().getTargetTableRecordsCount(SOURCE_TABLE_ORDER_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
         List<String> lastJobIds = listJobId();
         assertTrue(lastJobIds.isEmpty());
     }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 2277d282e04..fd92fb88356 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -23,8 +23,9 @@ import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
@@ -46,34 +47,23 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Slf4j
 public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     
-    private final PipelineTestParameter testParam;
-    
     public TextPrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
-        this.testParam = testParam;
-    }
-    
-    @Override
-    protected String getSourceTableOrderName() {
-        if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
-            return "T_ORDER";
-        }
-        return "t_order";
     }
     
     @Parameters(name = "{0}")
     public static Collection<PipelineTestParameter> getTestParameters() {
         Collection<PipelineTestParameter> result = new LinkedList<>();
-        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+        if (PipelineE2EEnvironment.getInstance().getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
-        for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType())) {
+        for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType())) {
             result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/mysql.xml"));
         }
-        for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType())) {
+        for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType())) {
             result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
         }
-        for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new OpenGaussDatabaseType())) {
+        for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new OpenGaussDatabaseType())) {
             result.add(new PipelineTestParameter(new OpenGaussDatabaseType(), version, "env/scenario/primary_key/text_primary_key/postgresql.xml"));
         }
         return result;
@@ -81,25 +71,27 @@ public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @Test
     public void assertTextPrimaryMigrationSuccess() throws SQLException, InterruptedException {
-        log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
-        initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
-        createSourceOrderTable();
-        try (Connection connection = getSourceDataSource().getConnection()) {
+        getContainerComposer().initEnvironment(getContainerComposer().getDatabaseType(), new MigrationJobType());
+        getContainerComposer().createSourceOrderTable(getSourceTableOrderName());
+        try (Connection connection = getContainerComposer().getSourceDataSource().getConnection()) {
             UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
-            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, keyGenerateAlgorithm, getSourceTableOrderName(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         }
         addMigrationProcessConfig();
         addMigrationSourceResource();
         addMigrationTargetResource();
         createTargetOrderTableRule();
-        startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+        startMigration(getSourceTableOrderName(), getContainerComposer().getTargetTableOrderName());
         String jobId = listJobId().get(0);
-        sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(), "1000000000", 1, "afterStop"));
-        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        getContainerComposer().sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(), "1000000000", 1, "afterStop"));
+        getContainerComposer().waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         commitMigrationByJobId(jobId);
         List<String> lastJobIds = listJobId();
         assertTrue(lastJobIds.isEmpty());
-        log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
+    }
+    
+    private String getSourceTableOrderName() {
+        return DatabaseTypeUtil.isMySQL(getContainerComposer().getDatabaseType()) ? "T_ORDER" : "t_order";
     }
 }