You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/01/27 04:12:07 UTC

[shardingsphere] branch master updated: Refactor PipelineSQLBuilder as SPI (#15094)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8759591  Refactor PipelineSQLBuilder as SPI (#15094)
8759591 is described below

commit 8759591d7655bc84d10050176c3e54122e16a8f3
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Jan 27 12:11:10 2022 +0800

    Refactor PipelineSQLBuilder as SPI (#15094)
    
    * Move and rename PipelineSQLBuilderFactory
    
    * Refactor PipelineSQLBuilder as SPI
    
    * Unit test
    
    * Rename PipelineSQLBuilderFactory newInstance to getSQLBuilder
---
 .../consistency/DataConsistencyCheckerImpl.java    |  4 ++--
 .../DataMatchSingleTableDataCalculator.java        |  4 ++--
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     | 11 ++++++++--
 .../sqlbuilder/PipelineSQLBuilderFactory.java}     | 24 +++++++++-------------
 .../rulealtered/prepare/InventoryTaskSplitter.java |  4 ++--
 .../job/environment/ScalingEnvironmentManager.java |  4 ++--
 .../scaling/core/spi/ScalingEntry.java             |  8 --------
 .../data/pipeline/mysql/MySQLScalingEntry.java     |  6 ------
 .../CRC32MatchMySQLSingleTableDataCalculator.java  |  7 ++++---
 .../check/datasource/MySQLDataSourceChecker.java   |  7 ++++---
 .../datasource/MySQLDataSourcePreparer.java        |  6 +++---
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |  8 ++++++++
 ...ipeline.mysql.ingest.column.value.ValueHandler} |  0
 ...ata.pipeline.spi.sqlbuilder.PipelineSQLBuilder} |  6 +-----
 .../pipeline/opengauss/OpenGaussScalingEntry.java  |  6 ------
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    | 10 ++++++++-
 ...ata.pipeline.spi.sqlbuilder.PipelineSQLBuilder} |  6 +-----
 .../postgresql/PostgreSQLScalingEntry.java         |  6 ------
 .../datasource/PostgreSQLDataSourceChecker.java    |  5 ++---
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   |  8 ++++++++
 ...ata.pipeline.spi.sqlbuilder.PipelineSQLBuilder} |  6 +-----
 .../PostgreSQLPipelineSQLBuilderTest.java          |  5 ++---
 .../spi/sqlbuilder/PipelineSQLBuilder.java         | 15 +++++++++-----
 .../core/fixture/FixturePipelineSQLBuilder.java    |  8 ++++++++
 .../pipeline/core/fixture/FixtureScalingEntry.java |  6 ------
 .../spi/sqlbuilder/PipelineSQLBuilderTest.java     |  2 +-
 ...data.pipeline.spi.sqlbuilder.PipelineSQLBuilder |  6 +-----
 27 files changed, 90 insertions(+), 98 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index abe863f..9a5693c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
@@ -42,7 +43,6 @@ import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFact
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -132,7 +132,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
     
     private long count(final DataSource dataSource, final String table, final DatabaseType databaseType) {
         try (Connection connection = dataSource.getConnection();
-             PreparedStatement preparedStatement = connection.prepareStatement(ScalingSQLBuilderFactory.newInstance(databaseType.getName()).buildCountSQL(table));
+             PreparedStatement preparedStatement = connection.prepareStatement(PipelineSQLBuilderFactory.getSQLBuilder(databaseType.getName()).buildCountSQL(table));
              ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
             return resultSet.getLong(1);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
index 8c810f5..e34bad5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
@@ -27,9 +27,9 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -82,7 +82,7 @@ public final class DataMatchSingleTableDataCalculator extends AbstractStreamingS
     @Override
     protected Optional<Object> calculateChunk(final DataCalculateParameter dataCalculateParameter) {
         String logicTableName = dataCalculateParameter.getLogicTableName();
-        PipelineSQLBuilder sqlBuilder = ScalingSQLBuilderFactory.newInstance(dataCalculateParameter.getDatabaseType());
+        PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getSQLBuilder(dataCalculateParameter.getDatabaseType());
         String uniqueKey = dataCalculateParameter.getUniqueKey();
         CalculatedResult previousCalculatedResult = (CalculatedResult) dataCalculateParameter.getPreviousCalculatedResult();
         Number startUniqueKeyValue = null != previousCalculatedResult ? previousCalculatedResult.getMaxUniqueKeyValue() : -1;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index af4957f..35f230d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 import com.google.common.base.Preconditions;
 import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -28,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentMap;
 /**
  * Abstract pipeline SQL builder.
  */
-@RequiredArgsConstructor
 public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     
     private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
@@ -51,6 +50,14 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     @Getter(AccessLevel.PROTECTED)
     private final Map<String, Set<String>> shardingColumnsMap;
     
+    public AbstractPipelineSQLBuilder() {
+        shardingColumnsMap = Collections.emptyMap();
+    }
+    
+    public AbstractPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+        this.shardingColumnsMap = shardingColumnsMap;
+    }
+    
     /**
      * Get left identifier quote string.
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/sqlbuilder/ScalingSQLBuilderFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
similarity index 60%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/sqlbuilder/ScalingSQLBuilderFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
index fb7796e..e3d9494 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/sqlbuilder/ScalingSQLBuilderFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
@@ -15,33 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.shardingsphere.spi.exception.ServiceProviderNotFoundException;
+import org.apache.shardingsphere.spi.singleton.TypedSingletonSPIHolder;
 
 /**
- * Scaling SQL builder factory.
+ * Pipeline SQL builder factory.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ScalingSQLBuilderFactory {
+public final class PipelineSQLBuilderFactory {
+    
+    private static final TypedSingletonSPIHolder<PipelineSQLBuilder> SQL_BUILDER_SPI_HOLDER = new TypedSingletonSPIHolder<>(PipelineSQLBuilder.class, false);
     
     /**
-     * New instance of SQL builder.
+     * Get SQL builder instance.
      *
      * @param databaseType database type
      * @return SQL builder
      */
-    @SneakyThrows(ReflectiveOperationException.class)
-    public static PipelineSQLBuilder newInstance(final String databaseType) {
-        ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(databaseType);
-        return scalingEntry.getSQLBuilderClass().getConstructor(Map.class).newInstance(new HashMap<>());
+    public static PipelineSQLBuilder getSQLBuilder(final String databaseType) {
+        return SQL_BUILDER_SPI_HOLDER.get(databaseType).orElseThrow(() -> new ServiceProviderNotFoundException(PipelineSQLBuilder.class, databaseType));
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 31500fa..5a4c81a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineMetaDataManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
@@ -41,7 +42,6 @@ import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import org.apache.shardingsphere.infra.yaml.config.pojo.rulealtered.YamlOnRuleAlteredActionConfiguration.YamlInputConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlteredActionConfigurationYamlSwapper.InputConfigurationSwapper;
-import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -180,7 +180,7 @@ public final class InventoryTaskSplitter {
     private Collection<IngestPosition<?>> getPositionByPrimaryKeyRange(final RuleAlteredJobContext jobContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
         Collection<IngestPosition<?>> result = new ArrayList<>();
         JobConfiguration jobConfig = jobContext.getJobConfig();
-        String sql = ScalingSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getSourceDatabaseType())
+        String sql = PipelineSQLBuilderFactory.getSQLBuilder(jobConfig.getHandleConfig().getSourceDatabaseType())
                 .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
         try (Connection connection = dataSource.getConnection();
              PreparedStatement ps = connection.prepareStatement(sql)) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index b923d09..70d8d14 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -21,8 +21,8 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -49,7 +49,7 @@ public final class ScalingEnvironmentManager {
         try (PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
              Connection connection = dataSource.getConnection()) {
             for (String each : tables) {
-                String sql = ScalingSQLBuilderFactory.newInstance(jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
+                String sql = PipelineSQLBuilderFactory.getSQLBuilder(jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
                 try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
                     preparedStatement.execute();
                 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 8474eb2..06b7d9f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeAwareSPI;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
 
@@ -65,11 +64,4 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
      * @return environment checker type
      */
     Class<? extends EnvironmentChecker> getEnvironmentCheckerClass();
-    
-    /**
-     * Get SQL builder class.
-     *
-     * @return SQL builder type
-     */
-    Class<? extends PipelineSQLBuilder> getSQLBuilderClass();
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
index c4302bb..6bb7609 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.data.pipeline.mysql.importer.MySQLImporter;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLPositionInitializer;
-import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
 /**
@@ -55,11 +54,6 @@ public final class MySQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<MySQLPipelineSQLBuilder> getSQLBuilderClass() {
-        return MySQLPipelineSQLBuilder.class;
-    }
-    
-    @Override
     public String getDatabaseType() {
         return "MySQL";
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableData [...]
index 1d20de5..eaad0c9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalcula
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
 import org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 
@@ -31,7 +32,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -42,6 +42,8 @@ public final class CRC32MatchMySQLSingleTableDataCalculator extends AbstractSing
     
     private static final Collection<String> DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getName());
     
+    private static final MySQLPipelineSQLBuilder SQL_BUILDER = (MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.getSQLBuilder("MySQL");
+    
     @Override
     public String getAlgorithmType() {
         return CRC32MatchDataConsistencyCheckAlgorithm.TYPE;
@@ -55,9 +57,8 @@ public final class CRC32MatchMySQLSingleTableDataCalculator extends AbstractSing
     @Override
     public Iterable<Object> calculate(final DataCalculateParameter dataCalculateParameter) {
         String logicTableName = dataCalculateParameter.getLogicTableName();
-        MySQLPipelineSQLBuilder scalingSQLBuilder = new MySQLPipelineSQLBuilder(new HashMap<>());
         List<Long> result = dataCalculateParameter.getColumnNames().stream().map(each -> {
-            String sql = scalingSQLBuilder.buildSumCrc32SQL(logicTableName, each);
+            String sql = SQL_BUILDER.buildSumCrc32SQL(logicTableName, each);
             try {
                 return sumCrc32(dataCalculateParameter.getDataSource(), sql);
             } catch (final SQLException ex) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
index 1be1c85..2316a1c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
@@ -19,7 +19,8 @@ package org.apache.shardingsphere.data.pipeline.mysql.check.datasource;
 
 import org.apache.shardingsphere.data.pipeline.core.check.datasource.AbstractDataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -113,7 +114,7 @@ public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
     }
     
     @Override
-    protected MySQLPipelineSQLBuilder getSQLBuilder() {
-        return new MySQLPipelineSQLBuilder(new HashMap<>());
+    protected PipelineSQLBuilder getSQLBuilder() {
+        return PipelineSQLBuilderFactory.getSQLBuilder("MySQL");
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 43b13be..20667db 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
 
 import java.sql.Connection;
@@ -31,7 +32,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.stream.Collectors;
 
 /**
@@ -40,7 +40,7 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
     
-    private final MySQLPipelineSQLBuilder scalingSQLBuilder = new MySQLPipelineSQLBuilder(Collections.emptyMap());
+    private static final MySQLPipelineSQLBuilder SQL_BUILDER = (MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.getSQLBuilder("MySQL");
     
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
@@ -62,7 +62,7 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
     }
     
     private String getCreateTableSQL(final Connection sourceConnection, final String logicTableName) throws SQLException {
-        String showCreateTableSQL = "SHOW CREATE TABLE " + scalingSQLBuilder.quote(logicTableName);
+        String showCreateTableSQL = "SHOW CREATE TABLE " + SQL_BUILDER.quote(logicTableName);
         try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(showCreateTableSQL)) {
             if (!resultSet.next()) {
                 throw new PipelineJobPrepareFailedException("show create table has no result, sql: " + showCreateTableSQL);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 3a33557..80d2272 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -29,6 +29,9 @@ import java.util.Set;
  */
 public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
+    public MySQLPipelineSQLBuilder() {
+    }
+    
     public MySQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
         super(shardingColumnsMap);
     }
@@ -77,4 +80,9 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     public String buildSumCrc32SQL(final String tableName, final String column) {
         return String.format("SELECT SUM(CRC32(%s)) AS checksum FROM %s", quote(column), quote(tableName));
     }
+    
+    @Override
+    public String getType() {
+        return "MySQL";
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.mys [...]
similarity index 100%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi [...]
similarity index 63%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index c775757..8fae833 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,8 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedTinyintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedSmallintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedMediumintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedIntHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedBigintHandler
+org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
index e7ce005..76eab6e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.opengauss;
 import org.apache.shardingsphere.data.pipeline.opengauss.importer.OpenGaussImporter;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussPositionInitializer;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
-import org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder.OpenGaussPipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
@@ -55,11 +54,6 @@ public final class OpenGaussScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<OpenGaussPipelineSQLBuilder> getSQLBuilderClass() {
-        return OpenGaussPipelineSQLBuilder.class;
-    }
-    
-    @Override
     public String getDatabaseType() {
         return "openGauss";
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 95f333c..1989ff9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -32,7 +32,10 @@ import java.util.Set;
  * OpenGauss pipeline SQL builder.
  */
 public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
-
+    
+    public OpenGaussPipelineSQLBuilder() {
+    }
+    
     public OpenGaussPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
         super(shardingColumnsMap);
     }
@@ -68,4 +71,9 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
         // there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
         return "";
     }
+    
+    @Override
+    public String getType() {
+        return "openGauss";
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline [...]
similarity index 63%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index c775757..3698bcc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,8 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedTinyintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedSmallintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedMediumintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedIntHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedBigintHandler
+org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder.OpenGaussPipelineSQLBuilder
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
index c609192..2467f57 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.data.pipeline.postgresql.importer.PostgreSQLImp
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLPositionInitializer;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
-import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
 /**
@@ -55,11 +54,6 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<PostgreSQLPipelineSQLBuilder> getSQLBuilderClass() {
-        return PostgreSQLPipelineSQLBuilder.class;
-    }
-    
-    @Override
     public String getDatabaseType() {
         return "PostgreSQL";
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
index b015728..beb09f6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.check.datasource;
 
 import org.apache.shardingsphere.data.pipeline.core.check.datasource.AbstractDataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
-import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 
 import javax.sql.DataSource;
@@ -28,7 +28,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.HashMap;
 
 /**
  * PostgreSQL Data source checker.
@@ -68,6 +67,6 @@ public final class PostgreSQLDataSourceChecker extends AbstractDataSourceChecker
     
     @Override
     protected PipelineSQLBuilder getSQLBuilder() {
-        return new PostgreSQLPipelineSQLBuilder(new HashMap<>());
+        return PipelineSQLBuilderFactory.getSQLBuilder("PostgreSQL");
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 29715bd..c4d0851 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -30,6 +30,9 @@ import java.util.Set;
  */
 public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
+    public PostgreSQLPipelineSQLBuilder() {
+    }
+    
     public PostgreSQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
         super(shardingColumnsMap);
     }
@@ -58,4 +61,9 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
         result.append(") DO NOTHING");
         return result.toString();
     }
+    
+    @Override
+    public String getType() {
+        return "PostgreSQL";
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipelin [...]
similarity index 63%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index c775757..fe4298e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,8 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedTinyintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedSmallintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedMediumintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedIntHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedBigintHandler
+org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index e875008..5c01ac5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -19,13 +19,12 @@ package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Test;
 import org.postgresql.replication.LogSequenceNumber;
 
-import java.util.Collections;
-
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -33,7 +32,7 @@ public final class PostgreSQLPipelineSQLBuilderTest {
     
     @Test
     public void assertBuildInsertSQL() {
-        String actual = new PostgreSQLPipelineSQLBuilder(Collections.emptyMap()).buildInsertSQL(mockDataRecord());
+        String actual = PipelineSQLBuilderFactory.getSQLBuilder("PostgreSQL").buildInsertSQL(mockDataRecord());
         assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index b889b94..13a6f1a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -19,17 +19,21 @@ package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.spi.singleton.SingletonSPI;
+import org.apache.shardingsphere.spi.typed.TypedSPI;
 
 import java.util.Collection;
 import java.util.List;
 
 /**
  * Pipeline SQL builder.
+ * It's singleton when it's used as SPI, else not.
  */
-public interface PipelineSQLBuilder {
+public interface PipelineSQLBuilder extends TypedSPI, SingletonSPI {
     
     /**
      * Build insert SQL.
+     * Used in {@linkplain org.apache.shardingsphere.data.pipeline.spi.importer.Importer}.
      *
      * @param dataRecord data record
      * @return insert SQL
@@ -46,11 +50,12 @@ public interface PipelineSQLBuilder {
     String buildUpdateSQL(DataRecord dataRecord, Collection<Column> conditionColumns);
     
     /**
-     * Extract need updated columns.
+     * Extract updated columns.
+     * Used in {@linkplain org.apache.shardingsphere.data.pipeline.spi.importer.Importer}.
      *
-     * @param columns the input columns
-     * @param record the input datarecord
-     * @return the filtered columns.
+     * @param columns columns
+     * @param record data record
+     * @return filtered columns
      */
     List<Column> extractUpdatedColumns(Collection<Column> columns, DataRecord record);
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
index 990d834..70fd920 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
@@ -24,6 +24,9 @@ import java.util.Set;
 
 public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
+    public FixturePipelineSQLBuilder() {
+    }
+    
     public FixturePipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
         super(shardingColumnsMap);
     }
@@ -37,4 +40,9 @@ public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder
     protected String getRightIdentifierQuoteString() {
         return "`";
     }
+    
+    @Override
+    public String getType() {
+        return "H2";
+    }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureScalingEntry.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureScalingEntry.java
index f8de7d3..454616e 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureScalingEntry.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureScalingEntry.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 
@@ -53,11 +52,6 @@ public final class FixtureScalingEntry implements ScalingEntry {
     }
     
     @Override
-    public Class<? extends PipelineSQLBuilder> getSQLBuilderClass() {
-        return FixturePipelineSQLBuilder.class;
-    }
-    
-    @Override
     public String getDatabaseType() {
         return "H2";
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
index 86ab11f..e93f4d2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
@@ -32,7 +32,7 @@ import static org.junit.Assert.assertThat;
 
 public final class PipelineSQLBuilderTest {
     
-    private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder(Collections.emptyMap());
+    private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder();
     
     @Test
     public void assertBuildInsertSQL() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
similarity index 63%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index c775757..3d09e4d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.cdc.mysql.org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.ValueHandler
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,8 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedTinyintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedSmallintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedMediumintHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedIntHandler
-org.apache.shardingsphere.data.pipeline.mysql.ingest.column.value.UnsignedBigintHandler
+org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineSQLBuilder