You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/02/21 03:46:44 UTC

[shardingsphere] branch master updated: Support branch database type in pipeline job (#24264)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new beaf2594998 Support branch database type in pipeline job (#24264)
beaf2594998 is described below

commit beaf25949986e9926a2330c888f1f471b0abc5c1
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Feb 21 11:46:34 2023 +0800

    Support branch database type in pipeline job (#24264)
    
    * Remove DefaultPipelineSQLBuilder
    
    * Update 01-initdb.sql
    
    * Add PipelineTypedSPILoader
    
    * Replace TypedSPILoader.findService to PipelineTypedSPILoader.findDatabaseTypedService
    
    * Replace TypedSPILoader.getService to PipelineTypedSPILoader.getDatabaseTypedService
    
    * Check sharding rule exists in metadata before migration
    
    * Improve PipelineTypedSPILoader to get default service
    
    * Remove DefaultPositionInitializer
    
    * Revert "Improve PipelineTypedSPILoader to get default service"
    
    This reverts commit cbbbb9dc8133c004c2f44f9d23a4bc20cb706052.
    
    * Refactor DefaultColumnValueReader to BasicColumnValueReader
    
    * Improve PipelineTypedSPILoader
    
    * Compatible branch database in left snippets
    
    * Compatible branch database in unit test
---
 ...rdingSpherePipelineDataSourceConfiguration.java |  4 +-
 .../StandardPipelineDataSourceConfiguration.java   |  4 +-
 .../datasource/JdbcQueryPropertiesExtension.java   |  2 +-
 .../pipeline/util/spi/PipelineTypedSPILoader.java  | 64 ++++++++++++++++++++++
 ...RC32MatchDataConsistencyCalculateAlgorithm.java |  9 +--
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  7 ++-
 .../datasource/AbstractDataSourceChecker.java      |  4 +-
 .../H2JdbcQueryPropertiesExtension.java}           | 30 +++++-----
 .../pipeline/core/importer/DataSourceImporter.java |  4 +-
 ...alueReader.java => BasicColumnValueReader.java} | 13 +++--
 .../core/ingest/dumper/InventoryDumper.java        |  7 ++-
 ...YamlJobItemIncrementalTasksProgressSwapper.java |  4 +-
 .../metadata/generator/PipelineDDLGenerator.java   |  4 +-
 .../core/prepare/InventoryTaskSplitter.java        |  6 +-
 .../core/prepare/PipelineJobPreparerUtils.java     | 23 +++++---
 .../datasource/AbstractDataSourcePreparer.java     |  4 +-
 .../data/pipeline/core/task/IncrementalTask.java   |  3 +-
 .../data/pipeline/core/util/DatabaseTypeUtil.java  | 52 ++++++++++++++++++
 .../pipeline/core/util/JDBCStreamQueryUtil.java    | 23 ++++++--
 ...ne.spi.datasource.JdbcQueryPropertiesExtension} |  2 +-
 ...ipeline.spi.ingest.position.PositionInitializer | 18 ------
 ...data.pipeline.spi.sqlbuilder.PipelineSQLBuilder | 18 ------
 .../pipeline/core/util/DatabaseTypeUtilTest.java}  | 36 +++++-------
 .../migration/api/impl/MigrationJobAPI.java        | 11 +++-
 .../migration/prepare/MigrationJobPreparer.java    |  4 +-
 .../src/test/resources/env/mysql/01-initdb.sql     |  5 ++
 .../test/resources/env/postgresql/01-initdb.sql    |  7 +++
 .../core/util/spi/PipelineTypedSPILoaderTest.java  | 62 +++++++++++++++++++++
 28 files changed, 306 insertions(+), 124 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index 5ffc2c94e0d..23f115a4629 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -25,10 +25,10 @@ import lombok.NoArgsConstructor;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
@@ -77,7 +77,7 @@ public final class ShardingSpherePipelineDataSourceConfiguration implements Pipe
     }
     
     private void appendJdbcQueryProperties(final String databaseType) {
-        Optional<JdbcQueryPropertiesExtension> extension = TypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType);
+        Optional<JdbcQueryPropertiesExtension> extension = PipelineTypedSPILoader.findDatabaseTypedService(JdbcQueryPropertiesExtension.class, databaseType);
         if (!extension.isPresent()) {
             return;
         }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 4fbaeabca1e..712dd2a2ea2 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -22,11 +22,11 @@ import lombok.Getter;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
 import org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 
@@ -98,7 +98,7 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa
     }
     
     private void appendJdbcQueryProperties(final String databaseType, final Map<String, Object> yamlConfig) {
-        Optional<JdbcQueryPropertiesExtension> extension = TypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType);
+        Optional<JdbcQueryPropertiesExtension> extension = PipelineTypedSPILoader.findDatabaseTypedService(JdbcQueryPropertiesExtension.class, databaseType);
         if (!extension.isPresent()) {
             return;
         }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
index 6697b100158..0704e3595ef 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/datasource/JdbcQueryPropertiesExtension.java
@@ -31,7 +31,7 @@ public interface JdbcQueryPropertiesExtension extends TypedSPI {
     /**
      * Extend query properties.
      *
-     * @return JDBC query properties for extension
+     * @return JDBC query properties for extension. Could NOT be null
      */
     Properties extendQueryProperties();
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
new file mode 100644
index 00000000000..3235905cbae
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.util.spi;
+
+import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.exception.ServiceProviderNotFoundServerException;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+
+import java.util.Optional;
+
+/**
+ * Pipeline typed SPI loader.
+ */
+public final class PipelineTypedSPILoader {
+    
+    /**
+     * Find database typed service.
+     *
+     * @param spiClass typed SPI class
+     * @param databaseType database type
+     * @param <T> SPI class type
+     * @return service
+     */
+    public static <T extends TypedSPI> Optional<T> findDatabaseTypedService(final Class<T> spiClass, final String databaseType) {
+        Optional<T> result = TypedSPILoader.findService(spiClass, databaseType);
+        if (result.isPresent()) {
+            return result;
+        }
+        Optional<DatabaseType> type = TypedSPILoader.findService(DatabaseType.class, databaseType);
+        if (type.isPresent() && type.get() instanceof BranchDatabaseType) {
+            return TypedSPILoader.findService(spiClass, ((BranchDatabaseType) type.get()).getTrunkDatabaseType().getType());
+        }
+        return result;
+    }
+    
+    /**
+     * Get database typed service.
+     *
+     * @param spiClass typed SPI class
+     * @param databaseType database type
+     * @param <T> SPI class type
+     * @return service
+     */
+    public static <T extends TypedSPI> T getDatabaseTypedService(final Class<T> spiClass, final String databaseType) {
+        return findDatabaseTypedService(spiClass, databaseType).orElseThrow(() -> new ServiceProviderNotFoundServerException(spiClass));
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index cc872eda2d0..4a9f08a9e9b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -24,11 +24,12 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
+import org.apache.shardingsphere.data.pipeline.core.util.DatabaseTypeUtil;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -47,11 +48,11 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
     
-    private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
+    private static final Collection<String> SUPPORTED_DATABASE_TYPES = DatabaseTypeUtil.getTrunkAndBranchDatabaseTypes(Collections.singleton(new MySQLDatabaseType().getType()));
     
     @Override
     public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter param) {
-        PipelineSQLBuilder sqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, param.getDatabaseType());
+        PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, param.getDatabaseType());
         List<CalculatedItem> calculatedItems = param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, param, each)).collect(Collectors.toList());
         return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index c7206ac7a30..482beb4b306 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -30,10 +30,12 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.BasicColumnValueReader;
 import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtil;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
@@ -91,7 +93,8 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         try {
             Collection<Collection<Object>> records = new LinkedList<>();
             Object maxUniqueKeyValue = null;
-            ColumnValueReader columnValueReader = TypedSPILoader.getService(ColumnValueReader.class, param.getDatabaseType());
+            ColumnValueReader columnValueReader = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, param.getDatabaseType())
+                    .orElseGet(() -> new BasicColumnValueReader(param.getDatabaseType()));
             ResultSet resultSet = calculationContext.getResultSet();
             while (resultSet.next()) {
                 if (isCanceling()) {
@@ -171,7 +174,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     }
     
     private String getQuerySQL(final DataConsistencyCalculateParameter param) {
-        PipelineSQLBuilder sqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, param.getDatabaseType());
+        PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, param.getDatabaseType());
         String logicTableName = param.getLogicTableName();
         String schemaName = param.getSchemaName();
         String uniqueKey = param.getUniqueKey().getName();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
index 418cfc74c82..636bfba2bab 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -65,7 +65,7 @@ public abstract class AbstractDataSourceChecker implements DataSourceChecker {
     }
     
     private boolean checkEmpty(final DataSource dataSource, final String schemaName, final String tableName) throws SQLException {
-        String sql = TypedSPILoader.getService(PipelineSQLBuilder.class, getDatabaseType()).buildCheckEmptySQL(schemaName, tableName);
+        String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, getDatabaseType()).buildCheckEmptySQL(schemaName, tableName);
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement(sql);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/H2JdbcQueryPropertiesExtension.java
similarity index 59%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/H2JdbcQueryPropertiesExtension.java
index 78730503675..ba14a9ded9f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/H2JdbcQueryPropertiesExtension.java
@@ -15,30 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.datasource;
+
+import org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
+
+import java.util.Properties;
 
 /**
- * Default pipeline SQL builder.
+ * H2 JDBC query properties extension.
+ *
+ * <p>H2 is branch database of MySQL, but JDBC URL isn't compatible with MySQL.</p>
  */
-public final class DefaultPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
-    
-    @Override
-    public boolean isDefault() {
-        return true;
-    }
+public final class H2JdbcQueryPropertiesExtension implements JdbcQueryPropertiesExtension {
     
-    @Override
-    protected boolean isKeyword(final String item) {
-        return false;
-    }
+    private final Properties queryProps = new Properties();
     
     @Override
-    protected String getLeftIdentifierQuoteString() {
-        return "";
+    public Properties extendQueryProperties() {
+        return queryProps;
     }
     
     @Override
-    protected String getRightIdentifierQuoteString() {
-        return "";
+    public String getType() {
+        return "H2";
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 6874778e804..db2da8610d2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -41,8 +41,8 @@ import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -86,7 +86,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
         rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
         this.dataSourceManager = (PipelineDataSourceManager) importerConnector.getConnector();
         this.channel = channel;
-        pipelineSqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, importerConfig.getDataSourceConfig().getDatabaseType().getType());
+        pipelineSqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, importerConfig.getDataSourceConfig().getDatabaseType().getType());
         this.jobProgressListener = jobProgressListener;
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultColumnValueReader.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
similarity index 81%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultColumnValueReader.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
index 37c03115492..17f312fdffd 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultColumnValueReader.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
@@ -17,14 +17,19 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
+import lombok.RequiredArgsConstructor;
+
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 
 /**
- * Default column value reader.
+ * Basic column value reader.
  */
-public final class DefaultColumnValueReader extends AbstractColumnValueReader {
+@RequiredArgsConstructor
+public final class BasicColumnValueReader extends AbstractColumnValueReader {
+    
+    private final String databaseType;
     
     @Override
     protected Object doReadValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
@@ -32,7 +37,7 @@ public final class DefaultColumnValueReader extends AbstractColumnValueReader {
     }
     
     @Override
-    public boolean isDefault() {
-        return true;
+    public String getType() {
+        return databaseType;
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 395dc244917..277ab2084c2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -44,11 +44,11 @@ import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -85,8 +85,9 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
         this.dumperConfig = dumperConfig;
         this.channel = channel;
         this.dataSource = dataSource;
-        sqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType());
-        columnValueReader = TypedSPILoader.getService(ColumnValueReader.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType());
+        String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
+        sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, databaseType);
+        columnValueReader = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, databaseType).orElseGet(() -> new BasicColumnValueReader(databaseType));
         this.metaDataLoader = metaDataLoader;
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index 26536ec0e51..e2c5b84ebe1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 
 /**
  * YAML job item incremental tasks progress swapper.
@@ -60,7 +60,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper {
         }
         IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
         // TODO consider to remove parameter databaseType
-        PositionInitializer positionInitializer = TypedSPILoader.getService(PositionInitializer.class, databaseType);
+        PositionInitializer positionInitializer = PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, databaseType);
         taskProgress.setPosition(positionInitializer.init(yamlProgress.getPosition()));
         taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
         return new JobItemIncrementalTasksProgress(taskProgress);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index c7a103d356c..36f1ab11531 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.generator;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.binder.QueryContext;
 import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
@@ -32,7 +33,6 @@ import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtil;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.SQLSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.constraint.ConstraintSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.index.IndexSegment;
@@ -75,7 +75,7 @@ public final class PipelineDDLGenerator {
                                    final String schemaName, final String sourceTableName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
         long startTimeMillis = System.currentTimeMillis();
         StringBuilder result = new StringBuilder();
-        for (String each : TypedSPILoader.getService(CreateTableSQLGenerator.class, databaseType.getType()).generate(sourceDataSource, schemaName, sourceTableName)) {
+        for (String each : PipelineTypedSPILoader.getDatabaseTypedService(CreateTableSQLGenerator.class, databaseType.getType()).generate(sourceDataSource, schemaName, sourceTableName)) {
             Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
             queryContext.ifPresent(optional -> result.append(optional).append(DELIMITER).append(System.lineSeparator()));
         }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index b1c8433e99e..090e131a527 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -43,7 +43,7 @@ import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -172,7 +172,7 @@ public final class InventoryTaskSplitter {
         String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
         String actualTableName = dumperConfig.getActualTableName();
         // TODO with a large amount of data, count the full table will have performance problem
-        String sql = TypedSPILoader.getService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName, actualTableName);
+        String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName, actualTableName);
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
@@ -191,7 +191,7 @@ public final class InventoryTaskSplitter {
         Collection<IngestPosition<?>> result = new LinkedList<>();
         PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
-        String sql = TypedSPILoader.getService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
+        String sql = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
                 .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), uniqueKey);
         int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
         try (
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 4fc429ffe49..a0dabdec187 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -35,13 +35,14 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTa
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 
@@ -63,7 +64,7 @@ public final class PipelineJobPreparerUtils {
      * @return true if supported, otherwise false
      */
     public static boolean isIncrementalSupported(final String databaseType) {
-        return TypedSPILoader.contains(IncrementalDumperCreator.class, databaseType);
+        return PipelineTypedSPILoader.findDatabaseTypedService(IncrementalDumperCreator.class, databaseType).isPresent();
     }
     
     /**
@@ -74,7 +75,7 @@ public final class PipelineJobPreparerUtils {
      * @throws SQLException if prepare target schema fail
      */
     public static void prepareTargetSchema(final String databaseType, final PrepareTargetSchemasParameter prepareTargetSchemasParam) throws SQLException {
-        Optional<DataSourcePreparer> dataSourcePreparer = TypedSPILoader.findService(DataSourcePreparer.class, databaseType);
+        Optional<DataSourcePreparer> dataSourcePreparer = PipelineTypedSPILoader.findDatabaseTypedService(DataSourcePreparer.class, databaseType);
         if (!dataSourcePreparer.isPresent()) {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
@@ -91,7 +92,11 @@ public final class PipelineJobPreparerUtils {
     public static ShardingSphereSQLParserEngine getSQLParserEngine(final String targetDatabaseName) {
         ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
         ShardingSphereDatabase database = metaData.getDatabase(targetDatabaseName);
-        return metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(database.getProtocolType().getType());
+        DatabaseType databaseType = database.getProtocolType();
+        if (databaseType instanceof BranchDatabaseType) {
+            databaseType = ((BranchDatabaseType) databaseType).getTrunkDatabaseType();
+        }
+        return metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(databaseType.getType());
     }
     
     /**
@@ -102,7 +107,7 @@ public final class PipelineJobPreparerUtils {
      * @throws SQLException SQL exception
      */
     public static void prepareTargetTables(final String databaseType, final PrepareTargetTablesParameter prepareTargetTablesParam) throws SQLException {
-        Optional<DataSourcePreparer> dataSourcePreparer = TypedSPILoader.findService(DataSourcePreparer.class, databaseType);
+        Optional<DataSourcePreparer> dataSourcePreparer = PipelineTypedSPILoader.findDatabaseTypedService(DataSourcePreparer.class, databaseType);
         if (!dataSourcePreparer.isPresent()) {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
@@ -131,7 +136,7 @@ public final class PipelineJobPreparerUtils {
         }
         String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
         DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-        return TypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperConfig.getJobId());
+        return PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, databaseType).init(dataSource, dumperConfig.getJobId());
     }
     
     /**
@@ -144,7 +149,7 @@ public final class PipelineJobPreparerUtils {
         if (dataSources.isEmpty()) {
             return;
         }
-        DataSourceChecker dataSourceChecker = TypedSPILoader.findService(DataSourceChecker.class, databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
+        DataSourceChecker dataSourceChecker = PipelineTypedSPILoader.findDatabaseTypedService(DataSourceChecker.class, databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
         dataSourceChecker.checkConnection(dataSources);
         dataSourceChecker.checkPrivilege(dataSources);
         dataSourceChecker.checkVariable(dataSources);
@@ -158,7 +163,7 @@ public final class PipelineJobPreparerUtils {
      * @param targetDataSources target data sources
      */
     public static void checkTargetDataSource(final String databaseType, final ImporterConfiguration importerConfig, final Collection<? extends DataSource> targetDataSources) {
-        DataSourceChecker dataSourceChecker = TypedSPILoader.findService(DataSourceChecker.class, databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
+        DataSourceChecker dataSourceChecker = PipelineTypedSPILoader.findDatabaseTypedService(DataSourceChecker.class, databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
         if (null == targetDataSources || targetDataSources.isEmpty()) {
             log.info("target data source is empty, skip check");
             return;
@@ -176,7 +181,7 @@ public final class PipelineJobPreparerUtils {
      */
     public static void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
         DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
-        PositionInitializer positionInitializer = TypedSPILoader.getService(PositionInitializer.class, databaseType.getType());
+        PositionInitializer positionInitializer = PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, databaseType.getType());
         final long startTimeMillis = System.currentTimeMillis();
         log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType());
         if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 5e87e2630a1..f29b894c9e7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -25,10 +25,10 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -57,7 +57,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         }
         CreateTableConfiguration createTableConfig = param.getCreateTableConfig();
         String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType).orElse(null);
-        PipelineSQLBuilder sqlBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, targetDatabaseType.getType());
+        PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, targetDatabaseType.getType());
         Collection<String> createdSchemaNames = new HashSet<>();
         for (CreateTableEntry each : createTableConfig.getCreateTableEntries()) {
             String targetSchemaName = each.getTargetName().getSchemaName().getOriginal();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 97474b8fb6a..b279c1704fe 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -40,6 +40,7 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.util.Collection;
@@ -78,7 +79,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
         IngestPosition<?> position = dumperConfig.getPosition();
         taskProgress = createIncrementalTaskProgress(position, jobItemContext.getInitProgress());
         channel = createChannel(concurrency, pipelineChannelCreator, taskProgress);
-        dumper = TypedSPILoader.getService(
+        dumper = PipelineTypedSPILoader.getDatabaseTypedService(
                 IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
         importers = createImporters(concurrency, importerConfig, importerConnector, channel, jobItemContext);
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtil.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtil.java
new file mode 100644
index 00000000000..e04578920e7
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Set;
+
+/**
+ * Database type util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class DatabaseTypeUtil {
+    
+    /**
+     * Get trunk and branch database types.
+     *
+     * @param trunkDatabaseTypes trunk database types
+     * @return database types
+     */
+    public static Collection<String> getTrunkAndBranchDatabaseTypes(final Set<String> trunkDatabaseTypes) {
+        Collection<String> result = new LinkedList<>();
+        for (DatabaseType each : ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class)) {
+            if (trunkDatabaseTypes.contains(each.getType())
+                    || (each instanceof BranchDatabaseType && trunkDatabaseTypes.contains(((BranchDatabaseType) each).getTrunkDatabaseType().getType()))) {
+                result.add(each.getType());
+            }
+        }
+        return result;
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JDBCStreamQueryUtil.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JDBCStreamQueryUtil.java
index 36dc07c879d..56299033424 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JDBCStreamQueryUtil.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/JDBCStreamQueryUtil.java
@@ -18,7 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.core.util;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
@@ -45,24 +47,35 @@ public final class JDBCStreamQueryUtil {
      */
     public static PreparedStatement generateStreamQueryPreparedStatement(final DatabaseType databaseType, final Connection connection, final String sql) throws SQLException {
         if (databaseType instanceof MySQLDatabaseType) {
-            return generateMySQLStreamQueryPreparedStatement(connection, sql);
+            return generateForMySQL(connection, sql);
         }
         if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
-            return generatePostgreSQLStreamQueryPreparedStatement(connection, sql);
+            return generateForPostgreSQL(connection, sql);
+        }
+        if (databaseType instanceof H2DatabaseType) {
+            return generateByDefault(connection, sql);
+        }
+        if (databaseType instanceof BranchDatabaseType) {
+            return generateStreamQueryPreparedStatement(((BranchDatabaseType) databaseType).getTrunkDatabaseType(), connection, sql);
         }
         log.warn("not support {} streaming query now, pay attention to memory usage", databaseType.getType());
-        return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+        return generateByDefault(connection, sql);
     }
     
-    private static PreparedStatement generateMySQLStreamQueryPreparedStatement(final Connection connection, final String sql) throws SQLException {
+    // TODO Consider use SPI
+    private static PreparedStatement generateForMySQL(final Connection connection, final String sql) throws SQLException {
         PreparedStatement result = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
         result.setFetchSize(Integer.MIN_VALUE);
         return result;
     }
     
-    private static PreparedStatement generatePostgreSQLStreamQueryPreparedStatement(final Connection connection, final String sql) throws SQLException {
+    private static PreparedStatement generateForPostgreSQL(final Connection connection, final String sql) throws SQLException {
         PreparedStatement result = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.CLOSE_CURSORS_AT_COMMIT);
         connection.setAutoCommit(false);
         return result;
     }
+    
+    private static PreparedStatement generateByDefault(final Connection connection, final String sql) throws SQLException {
+        return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
similarity index 90%
rename from kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
rename to kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
index 3ec8c078164..b185951d3aa 100644
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DefaultColumnValueReader
+org.apache.shardingsphere.data.pipeline.core.datasource.H2JdbcQueryPropertiesExtension
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
deleted file mode 100644
index 7cc67f99c2e..00000000000
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.ingest.position.DefaultPositionInitializer
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
deleted file mode 100644
index 3ca94cb5097..00000000000
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.sqlbuilder.DefaultPipelineSQLBuilder
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtilTest.java
similarity index 52%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java
rename to kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtilTest.java
index 393ac30e83a..977d1716c42 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DatabaseTypeUtilTest.java
@@ -15,30 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.position;
+package org.apache.shardingsphere.data.pipeline.core.util;
 
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.junit.Test;
 
-import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
 
-/**
- * Default position initializer.
- */
-public final class DefaultPositionInitializer implements PositionInitializer {
-    
-    @Override
-    public IngestPosition<?> init(final DataSource dataSource, final String slotNameSuffix) {
-        return null;
-    }
-    
-    @Override
-    public IngestPosition<?> init(final String data) {
-        return null;
-    }
+import static org.junit.Assert.assertTrue;
+
+public final class DatabaseTypeUtilTest {
     
-    @Override
-    public boolean isDefault() {
-        return true;
+    @Test
+    public void assertGetBranchDatabaseTypes() {
+        Set<String> trunkDatabaseTypes = Collections.singleton(new MySQLDatabaseType().getType());
+        Collection<String> actual = DatabaseTypeUtil.getTrunkAndBranchDatabaseTypes(trunkDatabaseTypes);
+        assertTrue("MySQL not present", actual.contains("MySQL"));
+        assertTrue("MariaDB not present", actual.contains("MariaDB"));
     }
 }
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 7f914dee3c8..4361e59e62e 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -70,6 +70,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -88,6 +89,8 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.exception.metadata.ShardingRuleNotFoundException;
 
 import javax.sql.DataSource;
 import java.nio.charset.StandardCharsets;
@@ -199,7 +202,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
     
     private CreateTableConfiguration buildCreateTableConfiguration(final MigrationJobConfiguration jobConfig) {
         String sourceSchemaName = jobConfig.getSourceSchemaName();
-        String targetSchemaName = TypedSPILoader.getService(DatabaseType.class, jobConfig.getTargetDatabaseType()).isSchemaAvailable() ? sourceSchemaName : null;
+        String targetSchemaName = PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class, jobConfig.getTargetDatabaseType()).isSchemaAvailable() ? sourceSchemaName : null;
         CreateTableEntry createTableEntry = new CreateTableEntry(
                 jobConfig.getSource(), new SchemaTableName(new SchemaName(sourceSchemaName), new TableName(jobConfig.getSourceTableName())),
                 jobConfig.getTarget(), new SchemaTableName(new SchemaName(targetSchemaName), new TableName(jobConfig.getTargetTableName())));
@@ -307,7 +310,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
         String targetTableName = jobConfig.getTargetTableName();
         // TODO use jobConfig.targetSchemaName
         String targetSchemaName = jobConfig.getSourceSchemaName();
-        PipelineSQLBuilder pipelineSQLBuilder = TypedSPILoader.getService(PipelineSQLBuilder.class, jobConfig.getTargetDatabaseType());
+        PipelineSQLBuilder pipelineSQLBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, jobConfig.getTargetDatabaseType());
         try (
                 PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
                 Connection connection = dataSource.getConnection()) {
@@ -447,6 +450,10 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
     }
     
     private YamlRootConfiguration getYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
+        // TODO Check rules empty. Sharding rule could be empty
+        if (rules.stream().noneMatch(each -> each instanceof ShardingRuleConfiguration)) {
+            throw new ShardingRuleNotFoundException();
+        }
         YamlRootConfiguration result = new YamlRootConfiguration();
         result.setDatabaseName(databaseName);
         result.setDataSources(yamlDataSources);
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 1cb2c4114e3..0460ac79997 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -40,11 +40,11 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.Migra
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 
 import java.sql.SQLException;
@@ -140,7 +140,7 @@ public final class MigrationJobPreparer {
         CreateTableConfiguration createTableConfig = jobItemContext.getTaskConfig().getCreateTableConfig();
         PipelineDataSourceManager dataSourceManager = (PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector();
         PrepareTargetSchemasParameter prepareTargetSchemasParam = new PrepareTargetSchemasParameter(
-                TypedSPILoader.getService(DatabaseType.class, targetDatabaseType), createTableConfig, dataSourceManager);
+                PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class, targetDatabaseType), createTableConfig, dataSourceManager);
         PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParam);
         ShardingSphereSQLParserEngine sqlParserEngine = PipelineJobPreparerUtils.getSQLParserEngine(jobConfig.getTargetDatabaseName());
         PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new PrepareTargetTablesParameter(createTableConfig, dataSourceManager, sqlParserEngine));
diff --git a/test/e2e/pipeline/src/test/resources/env/mysql/01-initdb.sql b/test/e2e/pipeline/src/test/resources/env/mysql/01-initdb.sql
index 540d9d65ff7..4cb199d48f9 100644
--- a/test/e2e/pipeline/src/test/resources/env/mysql/01-initdb.sql
+++ b/test/e2e/pipeline/src/test/resources/env/mysql/01-initdb.sql
@@ -17,6 +17,11 @@
 
 REVOKE ALL PRIVILEGES ON *.* FROM 'test_user'@'%';
 
+DROP DATABASE IF EXISTS  pipeline_it_0;
+DROP DATABASE IF EXISTS  pipeline_it_1;
+DROP DATABASE IF EXISTS  pipeline_it_2;
+DROP DATABASE IF EXISTS  pipeline_it_3;
+DROP DATABASE IF EXISTS  pipeline_it_4;
 CREATE DATABASE pipeline_it_0;
 CREATE DATABASE pipeline_it_1;
 CREATE DATABASE pipeline_it_2;
diff --git a/test/e2e/pipeline/src/test/resources/env/postgresql/01-initdb.sql b/test/e2e/pipeline/src/test/resources/env/postgresql/01-initdb.sql
index 0ca196ab765..75035c5463e 100644
--- a/test/e2e/pipeline/src/test/resources/env/postgresql/01-initdb.sql
+++ b/test/e2e/pipeline/src/test/resources/env/postgresql/01-initdb.sql
@@ -16,11 +16,18 @@
 
 ALTER USER test_user NOSUPERUSER;
 ALTER USER test_user REPLICATION;
+
+DROP DATABASE IF EXISTS  pipeline_it_0;
+DROP DATABASE IF EXISTS  pipeline_it_1;
+DROP DATABASE IF EXISTS  pipeline_it_2;
+DROP DATABASE IF EXISTS  pipeline_it_3;
+DROP DATABASE IF EXISTS  pipeline_it_4;
 CREATE DATABASE pipeline_it_0;
 CREATE DATABASE pipeline_it_1;
 CREATE DATABASE pipeline_it_2;
 CREATE DATABASE pipeline_it_3;
 CREATE DATABASE pipeline_it_4;
+
 -- TODO remove unnecessary permissions
 GRANT CREATE, CONNECT ON DATABASE pipeline_it_0 TO test_user;
 GRANT CREATE, CONNECT ON DATABASE pipeline_it_1 TO test_user;
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
new file mode 100644
index 00000000000..0d94a5a70a6
--- /dev/null
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.data.pipeline.core.util.spi;
+
+import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLColumnValueReader;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
+import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public final class PipelineTypedSPILoaderTest {
+    
+    @Test
+    public void assertFindDatabaseTypedService() {
+        Optional<PipelineSQLBuilder> actual = PipelineTypedSPILoader.findDatabaseTypedService(PipelineSQLBuilder.class, "MariaDB");
+        assertTrue(actual.isPresent());
+        assertThat(actual.get().getType(), is("MySQL"));
+    }
+    
+    @Test
+    public void assertGetPipelineSQLBuilder() {
+        PipelineSQLBuilder actual = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, "MariaDB");
+        assertNotNull(actual);
+        assertThat(actual.getType(), is("MySQL"));
+    }
+    
+    @Test
+    public void assertFindColumnValueReaderByUnknown() {
+        Optional<ColumnValueReader> actual = PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, "Unknown");
+        assertFalse(actual.isPresent());
+    }
+    
+    @Test
+    public void assertGetColumnValueReaderByBranchDB() {
+        ColumnValueReader actual = PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, "MariaDB");
+        assertNotNull(actual);
+        assertThat(actual.getClass().getName(), is(MySQLColumnValueReader.class.getName()));
+    }
+}