You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zi...@apache.org on 2023/03/15 22:20:45 UTC

[shardingsphere] branch master updated: Add PipelineE2ETestCaseArgumentsProvider (#24633)

This is an automated email from the ASF dual-hosted git repository.

zichaowang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 165e49ea540 Add PipelineE2ETestCaseArgumentsProvider (#24633)
165e49ea540 is described below

commit 165e49ea5406c8a761e63b7a6cdaf1067494a450
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu Mar 16 06:20:27 2023 +0800

    Add PipelineE2ETestCaseArgumentsProvider (#24633)
    
    * Add PipelineE2ETestCaseArgumentsProvider
    
    * Add PipelineE2ETestCaseArgumentsProvider
---
 .../pipeline/cases/base/BaseIncrementTask.java     |  1 -
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 37 ++-------
 .../createtable/CreateTableSQLGeneratorIT.java     | 42 ++--------
 .../general/MySQLMigrationGeneralE2EIT.java        | 28 ++-----
 .../general/PostgreSQLMigrationGeneralE2EIT.java   | 32 ++------
 .../migration/general/RulesMigrationE2EIT.java     | 25 ++----
 .../primarykey/IndexesMigrationE2EIT.java          | 40 +++------
 .../primarykey/MariaDBMigrationE2EIT.java          | 23 ++----
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   | 38 +++------
 .../param}/PipelineE2ECondition.java               |  2 +-
 .../framework/param/PipelineE2ESettings.java       | 63 ++++++++++++++
 .../PipelineE2ETestCaseArgumentsProvider.java      | 63 ++++++++++++++
 .../framework/watcher/PipelineWatcher.java         | 95 ----------------------
 13 files changed, 193 insertions(+), 296 deletions(-)

diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/BaseIncrementTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/BaseIncrementTask.java
index 1cff3748270..671a5ed9b07 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/BaseIncrementTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/BaseIncrementTask.java
@@ -19,5 +19,4 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.base;
 
 // TODO remove later
 public abstract class BaseIncrementTask implements Runnable {
-    
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 620fd09dd11..8c34fef523a 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -35,23 +35,21 @@ import org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTabl
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.util.StorageContainerUtil;
 import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
@@ -62,9 +60,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -72,13 +68,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * CDC E2E IT.
  */
+@PipelineE2ESettings(database = {
+        @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/scenario/general/mysql.xml"),
+        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/postgresql.xml")})
 @Slf4j
 public final class CDCE2EIT {
     
@@ -95,7 +93,7 @@ public final class CDCE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
         if (TimeZone.getDefault() != TimeZone.getTimeZone("UTC") && PipelineEnvTypeEnum.DOCKER == PipelineE2EEnvironment.getInstance().getItEnvType()) {
             // make sure the time zone of locally running program same with the database server at CI.
@@ -198,25 +196,8 @@ public final class CDCE2EIT {
     }
     
     private static boolean isEnabled() {
-        // TODO fix later
-        // return PipelineE2ECondition.isEnabled();
+        // TODO xinze fix it
+        // return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new OpenGaussDatabaseType());
         return false;
     }
-    
-    private static class TestCaseArgumentsProvider implements ArgumentsProvider {
-        
-        @Override
-        public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
-            Collection<Arguments> result = new LinkedList<>();
-            MySQLDatabaseType mysqlDatabaseType = new MySQLDatabaseType();
-            for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(mysqlDatabaseType)) {
-                result.add(Arguments.of(new PipelineTestParameter(mysqlDatabaseType, each, "env/scenario/general/mysql.xml")));
-            }
-            OpenGaussDatabaseType openGaussDatabaseType = new OpenGaussDatabaseType();
-            for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(openGaussDatabaseType)) {
-                result.add(Arguments.of(new PipelineTestParameter(openGaussDatabaseType, each, "env/scenario/general/postgresql.xml")));
-            }
-            return result.stream();
-        }
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
index b7efcfd1703..5a2a904a1f8 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/createtable/CreateTableSQLGeneratorIT.java
@@ -23,11 +23,13 @@ import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineE2ECondition;
 import org.apache.shardingsphere.test.e2e.data.pipeline.entity.CreateTableSQLGeneratorAssertionEntity;
 import org.apache.shardingsphere.test.e2e.data.pipeline.entity.CreateTableSQLGeneratorAssertionsRootEntity;
 import org.apache.shardingsphere.test.e2e.data.pipeline.entity.CreateTableSQLGeneratorOutputEntity;
-import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.DockerImageVersion;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.storage.DockerStorageContainer;
@@ -39,10 +41,7 @@ import org.apache.shardingsphere.test.e2e.env.container.atomic.storage.impl.MySQ
 import org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import javax.sql.DataSource;
@@ -57,21 +56,16 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Objects;
 import java.util.regex.Pattern;
-import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@PipelineE2ESettings(database = {
+        @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/scenario/create-table-generator/mysql/create-table-sql-generator.xml"),
+        @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = "env/scenario/create-table-generator/postgresql/create-table-sql-generator.xml"),
+        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml")})
 public final class CreateTableSQLGeneratorIT {
     
-    private static final String POSTGRES_CASE_FILE_PATH = "postgresql/create-table-sql-generator.xml";
-    
-    private static final String MYSQL_CASE_FILE_PATH = "mysql/create-table-sql-generator.xml";
-    
-    private static final String OPEN_GAUSS_CASE_FILE_PATH = "opengauss/create-table-sql-generator.xml";
-    
-    private static final String PARENT_PATH = "env/scenario/create-table-generator";
-    
     private static final String DEFAULT_SCHEMA = "public";
     
     private static final String DEFAULT_DATABASE = "pipeline_it_0";
@@ -87,7 +81,7 @@ public final class CreateTableSQLGeneratorIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertGenerateCreateTableSQL(final PipelineTestParameter testParam) throws SQLException {
         startStorageContainer(testParam.getDatabaseType(), testParam.getStorageContainerImage());
         CreateTableSQLGeneratorAssertionsRootEntity rootEntity = JAXB.unmarshal(
@@ -144,22 +138,4 @@ public final class CreateTableSQLGeneratorIT {
     private static boolean isEnabled() {
         return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new PostgreSQLDatabaseType(), new OpenGaussDatabaseType());
     }
-    
-    private static class TestCaseArgumentsProvider implements ArgumentsProvider {
-        
-        @Override
-        public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
-            Collection<PipelineTestParameter> result = new LinkedList<>();
-            for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType())) {
-                result.add(new PipelineTestParameter(new MySQLDatabaseType(), each, String.join("/", PARENT_PATH, MYSQL_CASE_FILE_PATH)));
-            }
-            for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType())) {
-                result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), each, String.join("/", PARENT_PATH, POSTGRES_CASE_FILE_PATH)));
-            }
-            for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new OpenGaussDatabaseType())) {
-                result.add(new PipelineTestParameter(new OpenGaussDatabaseType(), each, String.join("/", PARENT_PATH, OPEN_GAUSS_CASE_FILE_PATH)));
-            }
-            return result.stream().map(Arguments::of);
-        }
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index b95fddfc186..dc2abb4354d 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -24,33 +24,30 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineE2ECondition;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
-import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
 import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.SQLException;
 import java.time.LocalDateTime;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@PipelineE2ESettings(database = @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/scenario/general/mysql.xml"))
 @Slf4j
 public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
@@ -60,7 +57,7 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             addMigrationProcessConfig(containerComposer);
@@ -109,17 +106,4 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     private static boolean isEnabled() {
         return PipelineE2ECondition.isEnabled(new MySQLDatabaseType());
     }
-    
-    private static class TestCaseArgumentsProvider implements ArgumentsProvider {
-        
-        @Override
-        public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
-            Collection<Arguments> result = new LinkedList<>();
-            MySQLDatabaseType databaseType = new MySQLDatabaseType();
-            for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(databaseType)) {
-                result.add(Arguments.of(new PipelineTestParameter(databaseType, each, "env/scenario/general/mysql.xml")));
-            }
-            return result.stream();
-        }
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 5a88515b90f..266ef8ac0df 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -24,31 +24,30 @@ import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseTy
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineE2ECondition;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
-import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
 import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import java.sql.SQLException;
 import java.time.LocalDateTime;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@PipelineE2ESettings(database = {
+        @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = "env/scenario/general/postgresql.xml"),
+        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/postgresql.xml")})
 @Slf4j
 public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
@@ -58,7 +57,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             addMigrationProcessConfig(containerComposer);
@@ -118,19 +117,4 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
     private static boolean isEnabled() {
         return PipelineE2ECondition.isEnabled(new PostgreSQLDatabaseType(), new OpenGaussDatabaseType());
     }
-    
-    private static class TestCaseArgumentsProvider implements ArgumentsProvider {
-        
-        @Override
-        public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
-            Collection<Arguments> result = new LinkedList<>();
-            for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType())) {
-                result.add(Arguments.of(new PipelineTestParameter(new PostgreSQLDatabaseType(), each, "env/scenario/general/postgresql.xml")));
-            }
-            for (String each : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new OpenGaussDatabaseType())) {
-                result.add(Arguments.of(new PipelineTestParameter(new OpenGaussDatabaseType(), each, "env/scenario/general/postgresql.xml")));
-            }
-            return result.stream();
-        }
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 05f9ea88ae7..5be6538fd7f 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -21,22 +21,19 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobTy
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineE2ECondition;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
-import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
-import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -46,6 +43,7 @@ import static org.hamcrest.Matchers.is;
  * 1) no any rule.
  * 2) only encrypt rule.
  */
+@PipelineE2ESettings(fetchSingle = true, database = @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/scenario/primary_key/text_primary_key/mysql.xml"))
 public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String SOURCE_TABLE_NAME = "t_order";
@@ -54,7 +52,7 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertNoRuleMigrationSuccess(final PipelineTestParameter testParam) throws Exception {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             assertMigrationSuccess(containerComposer, null);
@@ -63,7 +61,7 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertOnlyEncryptRuleMigrationSuccess(final PipelineTestParameter testParam) throws Exception {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             assertMigrationSuccess(containerComposer, () -> {
@@ -96,13 +94,4 @@ public final class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     private static boolean isEnabled() {
         return PipelineE2ECondition.isEnabled(new MySQLDatabaseType());
     }
-    
-    private static class TestCaseArgumentsProvider implements ArgumentsProvider {
-        
-        @Override
-        public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
-            List<String> versions = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
-            return Stream.of(Arguments.of(new PipelineTestParameter(new MySQLDatabaseType(), versions.get(0), "env/scenario/primary_key/text_primary_key/mysql.xml")));
-        }
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 7c0f1311aab..f33deeef6c1 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -25,26 +25,22 @@ import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateA
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineE2ECondition;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
-import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -57,6 +53,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * 3) multiple columns primary key, first column type is VARCHAR.
  * 4) multiple columns unique key, first column type is BIGINT.
  */
+@PipelineE2ESettings(fetchSingle = true, database = {
+        @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/common/none.xml"),
+        @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = "env/common/none.xml")})
 public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String ORDER_TABLE_SHARDING_RULE_FORMAT = "CREATE SHARDING TABLE RULE t_order(\n"
@@ -71,7 +70,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertNoUniqueKeyMigrationSuccess(final PipelineTestParameter testParam) throws Exception {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             String sql;
@@ -108,7 +107,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertMultiPrimaryKeyMigrationSuccess(final PipelineTestParameter testParam) throws Exception {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             String sql;
@@ -131,7 +130,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertMultiUniqueKeyMigrationSuccess(final PipelineTestParameter testParam) throws Exception {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             String sql;
@@ -154,7 +153,7 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess(final PipelineTestParameter testParam) throws Exception {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             String sql;
@@ -206,21 +205,4 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     private static boolean isEnabled() {
         return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new PostgreSQLDatabaseType());
     }
-    
-    private static class TestCaseArgumentsProvider implements ArgumentsProvider {
-        
-        @Override
-        public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
-            Collection<Arguments> result = new LinkedList<>();
-            List<String> mysqlVersion = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
-            if (!mysqlVersion.isEmpty()) {
-                result.add(Arguments.of(new PipelineTestParameter(new MySQLDatabaseType(), mysqlVersion.get(0), "env/common/none.xml")));
-            }
-            List<String> postgresqlVersion = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType());
-            if (!postgresqlVersion.isEmpty()) {
-                result.add(Arguments.of(new PipelineTestParameter(new PostgreSQLDatabaseType(), postgresqlVersion.get(0), "env/common/none.xml")));
-            }
-            return result.stream();
-        }
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index d716a245b70..40bd432a662 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -23,27 +23,26 @@ import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineE2ECondition;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
-import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.List;
-import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@PipelineE2ESettings(fetchSingle = true, database = @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/common/none.xml"))
 @Slf4j
 public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     
@@ -53,7 +52,7 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertMigrationSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
@@ -84,14 +83,4 @@ public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     private static boolean isEnabled() {
         return PipelineE2ECondition.isEnabled(new MySQLDatabaseType());
     }
-    
-    private static class TestCaseArgumentsProvider implements ArgumentsProvider {
-        
-        @Override
-        public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
-            List<String> versions = PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType());
-            // TODO use MariaDBDatabaseType
-            return Stream.of(Arguments.of(new PipelineTestParameter(new MySQLDatabaseType(), versions.get(0), "env/common/none.xml")));
-        }
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 7137b242b61..df860d51323 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -23,35 +23,35 @@ import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseTy
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
-import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineE2ECondition;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
-import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtil;
 import org.junit.jupiter.api.condition.EnabledIf;
-import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
+@PipelineE2ESettings(fetchSingle = true, database = {
+        @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/scenario/primary_key/text_primary_key/mysql.xml"),
+        @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = "env/scenario/primary_key/text_primary_key/postgresql.xml"),
+        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/primary_key/text_primary_key/postgresql.xml")})
+public final class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     
     private static final String TARGET_TABLE_NAME = "t_order";
     
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
-    @ArgumentsSource(TestCaseArgumentsProvider.class)
+    @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     public void assertTextPrimaryMigrationSuccess(final PipelineTestParameter testParam) throws SQLException, InterruptedException {
         try (PipelineContainerComposer containerComposer = new PipelineContainerComposer(testParam, new MigrationJobType())) {
             containerComposer.createSourceOrderTable(getSourceTableName(containerComposer));
@@ -82,22 +82,4 @@ public class TextPrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT {
     private static boolean isEnabled() {
         return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new PostgreSQLDatabaseType(), new OpenGaussDatabaseType());
     }
-    
-    private static class TestCaseArgumentsProvider implements ArgumentsProvider {
-        
-        @Override
-        public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
-            Collection<Arguments> result = new LinkedList<>();
-            for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new MySQLDatabaseType())) {
-                result.add(Arguments.of(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/mysql.xml")));
-            }
-            for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new PostgreSQLDatabaseType())) {
-                result.add(Arguments.of(new PipelineTestParameter(new PostgreSQLDatabaseType(), version, "env/scenario/primary_key/text_primary_key/postgresql.xml")));
-            }
-            for (String version : PipelineE2EEnvironment.getInstance().listStorageContainerImages(new OpenGaussDatabaseType())) {
-                result.add(Arguments.of(new PipelineTestParameter(new OpenGaussDatabaseType(), version, "env/scenario/primary_key/text_primary_key/postgresql.xml")));
-            }
-            return result.stream();
-        }
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineE2ECondition.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ECondition.java
similarity index 96%
rename from test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineE2ECondition.java
rename to test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ECondition.java
index 73120babd1e..b4d995db225 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineE2ECondition.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ECondition.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.e2e.data.pipeline.cases;
+package org.apache.shardingsphere.test.e2e.data.pipeline.framework.param;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ESettings.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ESettings.java
new file mode 100644
index 00000000000..62f7ca25a98
--- /dev/null
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ESettings.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.framework.param;
+
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Pipeline E2E settings.
+ */
+@Inherited
+@Retention(RetentionPolicy.RUNTIME)
+public @interface PipelineE2ESettings {
+    
+    /**
+     * Whether fetch single.
+     * 
+     * @return fetch single
+     */
+    boolean fetchSingle() default false;
+    
+    /**
+     * Get database settings.
+     * 
+     * @return database settings
+     */
+    PipelineE2EDatabaseSettings[] database();
+    
+    @Inherited
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface PipelineE2EDatabaseSettings {
+        
+        /**
+         * Get database type.
+         * 
+         * @return database type
+         */
+        String type();
+        
+        /**
+         * Get scenario files.
+         * 
+         * @return scenario files
+         */
+        String[] scenarioFiles();
+    }
+}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ETestCaseArgumentsProvider.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ETestCaseArgumentsProvider.java
new file mode 100644
index 00000000000..16ed342724d
--- /dev/null
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineE2ETestCaseArgumentsProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.framework.param;
+
+import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Pipeline E2E test case arguments provider.
+ */
+public final class PipelineE2ETestCaseArgumentsProvider implements ArgumentsProvider {
+    
+    @Override
+    public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) {
+        Collection<Arguments> result = new LinkedList<>();
+        PipelineE2ESettings settings = extensionContext.getRequiredTestClass().getAnnotation(PipelineE2ESettings.class);
+        Preconditions.checkNotNull(settings, "Annotation PipelineE2ESettings is required.");
+        for (PipelineE2EDatabaseSettings each : settings.database()) {
+            result.addAll(provideArguments(settings, each));
+        }
+        return result.stream();
+    }
+    
+    private Collection<Arguments> provideArguments(final PipelineE2ESettings settings, final PipelineE2EDatabaseSettings databaseSettings) {
+        DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, databaseSettings.type());
+        List<String> storageContainerImages = PipelineE2EEnvironment.getInstance().listStorageContainerImages(databaseType);
+        return settings.fetchSingle() && !storageContainerImages.isEmpty()
+                ? provideArguments(databaseSettings.scenarioFiles(), databaseType, storageContainerImages.get(0))
+                : storageContainerImages.stream().flatMap(each -> provideArguments(databaseSettings.scenarioFiles(), databaseType, each).stream()).collect(Collectors.toList());
+    }
+    
+    private Collection<Arguments> provideArguments(final String[] scenarioFiles, final DatabaseType databaseType, final String storageContainerImage) {
+        return Arrays.stream(scenarioFiles).map(each -> Arguments.of(new PipelineTestParameter(databaseType, storageContainerImage, each))).collect(Collectors.toList());
+    }
+}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java
deleted file mode 100644
index fb7cff65d87..00000000000
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.e2e.data.pipeline.framework.watcher;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.mode.repository.cluster.zookeeper.ZookeeperRepository;
-import org.apache.shardingsphere.test.e2e.data.pipeline.framework.container.compose.BaseContainerComposer;
-import org.apache.shardingsphere.test.e2e.data.pipeline.framework.container.compose.DockerContainerComposer;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-@RequiredArgsConstructor
-@Slf4j
-public class PipelineWatcher extends TestWatcher {
-    
-    private final BaseContainerComposer containerComposer;
-    
-    // TODO now the meta data mistake is not reproduce, but keep this method, it may be used again
-    private void outputZookeeperData() {
-        if (!(containerComposer instanceof DockerContainerComposer)) {
-            return;
-        }
-        DockerContainerComposer dockerContainerComposer = (DockerContainerComposer) containerComposer;
-        DatabaseType databaseType = dockerContainerComposer.getStorageContainers().get(0).getDatabaseType();
-        String namespace = "it_db_" + databaseType.getType().toLowerCase();
-        ClusterPersistRepositoryConfiguration config = new ClusterPersistRepositoryConfiguration("ZooKeeper", namespace,
-                dockerContainerComposer.getGovernanceContainer().getServerLists(), new Properties());
-        ClusterPersistRepository zookeeperRepository = new ZookeeperRepository();
-        zookeeperRepository.init(config);
-        List<String> childrenKeys = zookeeperRepository.getChildrenKeys("/");
-        for (String each : childrenKeys) {
-            if (!"pipeline".equals(each)) {
-                continue;
-            }
-            Map<String, String> nodeMap = new LinkedHashMap<>();
-            addZookeeperData(each, "", zookeeperRepository, nodeMap);
-            log.warn("zookeeper data, node:{}, data:{}", each, nodeMap);
-        }
-    }
-    
-    private void addZookeeperData(final String key, final String parentPath, final ClusterPersistRepository zookeeperRepository, final Map<String, String> nodeMap) {
-        String path = String.join("/", parentPath, key);
-        String data = zookeeperRepository.getDirectly(path);
-        nodeMap.put(path, data);
-        List<String> childrenKeys = zookeeperRepository.getChildrenKeys(path);
-        for (String each : childrenKeys) {
-            addZookeeperData(each, path, zookeeperRepository, nodeMap);
-        }
-    }
-    
-    @Override
-    protected void starting(final Description description) {
-        log.info("pipeline E2E starting: {}", description);
-        containerComposer.start();
-    }
-    
-    @Override
-    protected void succeeded(final Description description) {
-        log.info("pipeline E2E succeeded: {}", description);
-    }
-    
-    @Override
-    protected void failed(final Throwable ex, final Description description) {
-        log.info("pipeline E2E failed: {}", description, ex);
-    }
-    
-    @Override
-    protected void finished(final Description description) {
-        containerComposer.close();
-    }
-}