You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/30 11:35:39 UTC

[shardingsphere] branch master updated: Refactor Importer and PipelineSQLBuilder (#17232)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a56b13a705 Refactor Importer and PipelineSQLBuilder (#17232)
6a56b13a705 is described below

commit 6a56b13a70554ff965eaea6ac527dfa8fb3bf1b4
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Apr 30 19:35:22 2022 +0800

    Refactor Importer and PipelineSQLBuilder (#17232)
    
    * Refactor Importer and PipelineSQLBuilder, use shardingColumnsMap on demand
    
    * Mark PipelineSQLBuilder as stateful since sqlCacheMap
    
    * Unit test
---
 .../spi/sqlbuilder/PipelineSQLBuilder.java         | 20 +++----
 .../pipeline/core/importer/AbstractImporter.java   | 18 ++----
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     | 22 ++------
 .../core/sqlbuilder/PipelineSQLBuilderFactory.java |  2 +-
 .../fixture/FixturePipelineSQLBuilder.java         |  8 ++-
 .../pipeline/mysql/importer/MySQLImporter.java     |  9 ---
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  | 14 ++---
 .../pipeline/mysql/importer/MySQLImporterTest.java | 66 ----------------------
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    |  9 ++-
 .../opengauss/importer/OpenGaussImporter.java      | 10 ----
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    | 22 +++-----
 .../postgresql/importer/PostgreSQLImporter.java    | 10 ----
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   | 14 ++---
 .../importer/PostgreSQLImporterTest.java           | 41 --------------
 .../PostgreSQLPipelineSQLBuilderTest.java          | 14 ++++-
 .../core/fixture/FixturePipelineSQLBuilder.java    | 10 ----
 .../core/importer/AbstractImporterTest.java        | 38 +++----------
 .../spi/sqlbuilder/PipelineSQLBuilderTest.java     | 13 +++--
 18 files changed, 75 insertions(+), 265 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 015ee23813b..75f6dd4744d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -19,46 +19,46 @@ package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.spi.type.typed.StatelessTypedSPI;
+import org.apache.shardingsphere.spi.type.typed.StatefulTypedSPI;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * Pipeline SQL builder.
- * It's singleton when it's used as SPI, else not.
  */
-// TODO refactor to stateless for SPI usage, it's confusing now
-public interface PipelineSQLBuilder extends StatelessTypedSPI {
+public interface PipelineSQLBuilder extends StatefulTypedSPI {
     
     /**
      * Build insert SQL.
-     * Used in {@linkplain org.apache.shardingsphere.data.pipeline.spi.importer.Importer}.
      *
      * @param dataRecord data record
+     * @param shardingColumnsMap sharding columns map
      * @return insert SQL
      */
-    String buildInsertSQL(DataRecord dataRecord);
+    String buildInsertSQL(DataRecord dataRecord, Map<String, Set<String>> shardingColumnsMap);
     
     /**
      * Build update SQL.
      *
      * @param dataRecord data record
      * @param conditionColumns condition columns
+     * @param shardingColumnsMap sharding columns map
      * @return update SQL
      */
-    String buildUpdateSQL(DataRecord dataRecord, Collection<Column> conditionColumns);
+    String buildUpdateSQL(DataRecord dataRecord, Collection<Column> conditionColumns, Map<String, Set<String>> shardingColumnsMap);
     
     /**
      * Extract updated columns.
-     * Used in {@linkplain org.apache.shardingsphere.data.pipeline.spi.importer.Importer}.
      *
-     * @param columns columns
      * @param record data record
+     * @param shardingColumnsMap sharding columns map
      * @return filtered columns
      */
-    List<Column> extractUpdatedColumns(Collection<Column> columns, DataRecord record);
+    List<Column> extractUpdatedColumns(DataRecord record, Map<String, Set<String>> shardingColumnsMap);
     
     /**
      * Build delete SQL.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index 187e2f93d0f..4b1ec0a6efa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
+import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
@@ -39,7 +40,6 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -63,17 +63,9 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
         this.channel = channel;
-        pipelineSqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
+        pipelineSqlBuilder = PipelineSQLBuilderFactory.newInstance(importerConfig.getDataSourceConfig().getDatabaseType().getName());
     }
     
-    /**
-     * Create SQL builder.
-     *
-     * @param shardingColumnsMap sharding columns map
-     * @return SQL builder
-     */
-    protected abstract PipelineSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
-    
     @Override
     protected void doStart() {
         write();
@@ -158,7 +150,7 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
     }
     
     private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
-        String insertSql = pipelineSqlBuilder.buildInsertSQL(dataRecords.get(0));
+        String insertSql = pipelineSqlBuilder.buildInsertSQL(dataRecords.get(0), importerConfig.getShardingColumnsMap());
         try (PreparedStatement ps = connection.prepareStatement(insertSql)) {
             ps.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
@@ -183,8 +175,8 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
             log.error("executeUpdate, could not get shardingColumns, tableName={}, shardingColumnsMap.keySet={}", record.getTableName(), importerConfig.getShardingColumnsMap().keySet());
         }
         List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, shardingColumns);
-        List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record.getColumns(), record);
-        String updateSql = pipelineSqlBuilder.buildUpdateSQL(record, conditionColumns);
+        List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record, importerConfig.getShardingColumnsMap());
+        String updateSql = pipelineSqlBuilder.buildUpdateSQL(record, conditionColumns, importerConfig.getShardingColumnsMap());
         try (PreparedStatement ps = connection.prepareStatement(updateSql)) {
             for (int i = 0; i < updatedColumns.size(); i++) {
                 ps.setObject(i + 1, updatedColumns.get(i).getValue());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 45f2a6f8188..1e61d39f0d2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -18,8 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 
 import com.google.common.base.Preconditions;
-import lombok.AccessLevel;
-import lombok.Getter;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -27,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -47,17 +44,6 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     
     private final ConcurrentMap<String, String> sqlCacheMap = new ConcurrentHashMap<>();
     
-    @Getter(AccessLevel.PROTECTED)
-    private final Map<String, Set<String>> shardingColumnsMap;
-    
-    public AbstractPipelineSQLBuilder() {
-        shardingColumnsMap = Collections.emptyMap();
-    }
-    
-    public AbstractPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        this.shardingColumnsMap = shardingColumnsMap;
-    }
-    
     /**
      * Get left identifier quote string.
      *
@@ -83,7 +69,7 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildInsertSQL(final DataRecord dataRecord) {
+    public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
         String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
             sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord.getTableName(), dataRecord.getColumns()));
@@ -109,13 +95,13 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
+    public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns, final Map<String, Set<String>> shardingColumnsMap) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
             sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
         }
         StringBuilder updatedColumnString = new StringBuilder();
-        for (Column each : extractUpdatedColumns(dataRecord.getColumns(), dataRecord)) {
+        for (Column each : extractUpdatedColumns(dataRecord, shardingColumnsMap)) {
             updatedColumnString.append(String.format("%s = ?,", quote(each.getName())));
         }
         updatedColumnString.setLength(updatedColumnString.length() - 1);
@@ -127,7 +113,7 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public List<Column> extractUpdatedColumns(final Collection<Column> columns, final DataRecord record) {
+    public List<Column> extractUpdatedColumns(final DataRecord record, final Map<String, Set<String>> shardingColumnsMap) {
         return new ArrayList<>(RecordUtil.extractUpdatedColumns(record));
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
index cbf686ac87a..86df193c0e8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
@@ -40,6 +40,6 @@ public final class PipelineSQLBuilderFactory {
      * @return new instance of pipeline SQL builder
      */
     public static PipelineSQLBuilder newInstance(final String databaseType) {
-        return TypedSPIRegistry.getRegisteredService(PipelineSQLBuilder.class, databaseType);
+        return TypedSPIRegistry.getRegisteredService(PipelineSQLBuilder.class, databaseType, null);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index d76a21aaf8e..95fba711111 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -24,22 +24,24 @@ import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
     
     @Override
-    public String buildInsertSQL(final DataRecord dataRecord) {
+    public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
         return "";
     }
     
     @Override
-    public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
+    public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns, final Map<String, Set<String>> shardingColumnsMap) {
         return "";
     }
     
     @Override
-    public List<Column> extractUpdatedColumns(final Collection<Column> columns, final DataRecord record) {
+    public List<Column> extractUpdatedColumns(final DataRecord record, final Map<String, Set<String>> shardingColumnsMap) {
         return Collections.emptyList();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index 8db8f101394..24185e16a94 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -21,12 +21,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterCo
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
-import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 /**
  * MySQL importer.
@@ -39,9 +35,4 @@ public final class MySQLImporter extends AbstractImporter {
         queryProps.setProperty("rewriteBatchedStatements", Boolean.TRUE.toString());
         importerConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
     }
-    
-    @Override
-    protected PipelineSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        return new MySQLPipelineSQLBuilder(shardingColumnsMap);
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 22e7e444ba5..f186f8d8876 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 
-import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
@@ -29,13 +28,8 @@ import java.util.Set;
 /**
  * MySQL pipeline SQL builder.
  */
-@NoArgsConstructor
 public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
-    public MySQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        super(shardingColumnsMap);
-    }
-    
     @Override
     public String getLeftIdentifierQuoteString() {
         return "`";
@@ -47,15 +41,15 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     }
     
     @Override
-    public String buildInsertSQL(final DataRecord dataRecord) {
-        return super.buildInsertSQL(dataRecord) + buildDuplicateUpdateSQL(dataRecord);
+    public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
+        return super.buildInsertSQL(dataRecord, shardingColumnsMap) + buildDuplicateUpdateSQL(dataRecord, shardingColumnsMap);
     }
     
-    private String buildDuplicateUpdateSQL(final DataRecord dataRecord) {
+    private String buildDuplicateUpdateSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
         StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
-            if (column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
+            if (column.isPrimaryKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
                 continue;
             }
             result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporterTest.java
deleted file mode 100644
index 3bbddd2a4a9..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporterTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.mysql.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class MySQLImporterTest {
-    
-    @Mock
-    private ImporterConfiguration importerConfig;
-    
-    @Mock
-    private PipelineDataSourceManager dataSourceManager;
-    
-    @Mock
-    private PipelineChannel channel;
-    
-    @Test
-    public void assertCreateSqlBuilder() {
-        when(importerConfig.getDataSourceConfig()).thenReturn(mock(PipelineDataSourceConfiguration.class));
-        MySQLImporter mysqlImporter = new MySQLImporter(importerConfig, dataSourceManager, channel);
-        String insertSQL = mysqlImporter.createSQLBuilder(Collections.emptyMap()).buildInsertSQL(mockDataRecord());
-        assertThat(insertSQL, is("INSERT INTO `t_order`(`id`,`name`) VALUES(?,?) ON DUPLICATE KEY UPDATE `name`=VALUES(`name`)"));
-    }
-    
-    private DataRecord mockDataRecord() {
-        DataRecord result = new DataRecord(new BinlogPosition("binlog-000001", 4), 2);
-        result.setTableName("t_order");
-        result.addColumn(new Column("id", 1, true, true));
-        result.addColumn(new Column("name", "", true, false));
-        return result;
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index a8594a30119..8db2aca7cfb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -33,17 +34,19 @@ import static org.junit.Assert.assertTrue;
 
 public final class MySQLPipelineSQLBuilderTest {
     
-    private final MySQLPipelineSQLBuilder sqlBuilder = new MySQLPipelineSQLBuilder(ImmutableMap.<String, Set<String>>builder().put("t2", Collections.singleton("sc")).build());
+    private final MySQLPipelineSQLBuilder sqlBuilder = new MySQLPipelineSQLBuilder();
+    
+    private final Map<String, Set<String>> shardingColumnsMap = ImmutableMap.<String, Set<String>>builder().put("t2", Collections.singleton("sc")).build();
     
     @Test
     public void assertBuildInsertSQL() {
-        String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+        String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"), shardingColumnsMap);
         assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE `sc`=VALUES(`sc`),`c1`=VALUES(`c1`),`c2`=VALUES(`c2`),`c3`=VALUES(`c3`)"));
     }
     
     @Test
     public void assertBuildInsertSQLHasShardingColumn() {
-        String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t2"));
+        String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t2"), shardingColumnsMap);
         assertThat(actual, is("INSERT INTO `t2`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE `c1`=VALUES(`c1`),`c2`=VALUES(`c2`),`c3`=VALUES(`c3`)"));
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
index 03a5c855b32..6b86b38ad08 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
@@ -21,11 +21,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterCo
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
-import org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder.OpenGaussPipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-
-import java.util.Map;
-import java.util.Set;
 
 /**
  * Importer of openGauss.
@@ -35,9 +30,4 @@ public final class OpenGaussImporter extends AbstractImporter {
     public OpenGaussImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
         super(importerConfig, dataSourceManager, channel);
     }
-    
-    @Override
-    protected PipelineSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        return new OpenGaussPipelineSQLBuilder(shardingColumnsMap);
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 281cc261b28..4ce83a09121 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -17,28 +17,20 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 
-import com.google.common.collect.Collections2;
-import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Pipeline SQL builder of openGauss.
  */
-@NoArgsConstructor
 public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
-    public OpenGaussPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        super(shardingColumnsMap);
-    }
-    
     @Override
     public String getLeftIdentifierQuoteString() {
         return "";
@@ -50,17 +42,17 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
     }
     
     @Override
-    public String buildInsertSQL(final DataRecord dataRecord) {
-        return super.buildInsertSQL(dataRecord) + buildConflictSQL();
+    public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
+        return super.buildInsertSQL(dataRecord, shardingColumnsMap) + buildConflictSQL(shardingColumnsMap);
     }
     
     @Override
-    public List<Column> extractUpdatedColumns(final Collection<Column> columns, final DataRecord record) {
-        return new ArrayList<>(Collections2.filter(columns, column -> !(column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), record.getTableName(), column.getName()))));
+    public List<Column> extractUpdatedColumns(final DataRecord record, final Map<String, Set<String>> shardingColumnsMap) {
+        return record.getColumns().stream().filter(each -> !(each.isPrimaryKey() || isShardingColumn(shardingColumnsMap, record.getTableName(), each.getName()))).collect(Collectors.toList());
     }
     
-    private String buildConflictSQL() {
-        // there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
+    private String buildConflictSQL(final Map<String, Set<String>> shardingColumnsMap) {
+        // TODO there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
         return "";
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
index 809424a9b7b..f044cba5690 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
@@ -21,11 +21,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterCo
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
-import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-
-import java.util.Map;
-import java.util.Set;
 
 /**
  * PostgreSQL importer.
@@ -35,9 +30,4 @@ public final class PostgreSQLImporter extends AbstractImporter {
     public PostgreSQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
         super(importerConfig, dataSourceManager, channel);
     }
-    
-    @Override
-    protected PipelineSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        return new PostgreSQLPipelineSQLBuilder(shardingColumnsMap);
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index cb1cdda89fe..0a37927d29a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
-import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -29,13 +28,8 @@ import java.util.Set;
 /**
  * PostgreSQL pipeline SQL builder.
  */
-@NoArgsConstructor
 public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
-    public PostgreSQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        super(shardingColumnsMap);
-    }
-    
     @Override
     public String getLeftIdentifierQuoteString() {
         return "\"";
@@ -47,12 +41,12 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
     }
     
     @Override
-    public String buildInsertSQL(final DataRecord dataRecord) {
-        return super.buildInsertSQL(dataRecord) + buildConflictSQL(dataRecord);
+    public String buildInsertSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
+        return super.buildInsertSQL(dataRecord, shardingColumnsMap) + buildConflictSQL(dataRecord, shardingColumnsMap);
     }
     
     // Refer to https://www.postgresql.org/docs/current/sql-insert.html
-    private String buildConflictSQL(final DataRecord dataRecord) {
+    private String buildConflictSQL(final DataRecord dataRecord, final Map<String, Set<String>> shardingColumnsMap) {
         StringBuilder result = new StringBuilder(" ON CONFLICT (");
         for (Column each : RecordUtil.extractPrimaryColumns(dataRecord)) {
             result.append(each.getName()).append(",");
@@ -61,7 +55,7 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
         result.append(") DO UPDATE SET ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
-            if (column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
+            if (column.isPrimaryKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
                 continue;
             }
             result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(",");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
deleted file mode 100644
index edb2cbe4769..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.postgresql.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class PostgreSQLImporterTest {
-    
-    @Test
-    public void assertCreateSQLBuilder() {
-        ImporterConfiguration importerConfig = mock(ImporterConfiguration.class);
-        PipelineSQLBuilder actual = new PostgreSQLImporter(importerConfig, null, null).createSQLBuilder(Collections.emptyMap());
-        assertTrue(actual instanceof PostgreSQLPipelineSQLBuilder);
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index e87046ab9f8..be560f9ca76 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -17,24 +17,32 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Test;
 import org.postgresql.replication.LogSequenceNumber;
 
+import java.util.Map;
+import java.util.Set;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
 public final class PostgreSQLPipelineSQLBuilderTest {
     
+    private final PostgreSQLPipelineSQLBuilder sqlBuilder = new PostgreSQLPipelineSQLBuilder();
+    
+    private final Map<String, Set<String>> shardingColumnsMap = ImmutableMap.<String, Set<String>>builder().put("t_order", Sets.newHashSet("order_id", "user_id")).build();
+    
     @Test
     public void assertBuildInsertSQL() {
-        String actual = PipelineSQLBuilderFactory.newInstance("PostgreSQL").buildInsertSQL(mockDataRecord());
+        String actual = sqlBuilder.buildInsertSQL(mockDataRecord(), shardingColumnsMap);
         assertThat(actual, is("INSERT INTO \"t_order\"(\"order_id\",\"user_id\",\"status\") VALUES(?,?,?) ON CONFLICT (order_id)"
-                + " DO UPDATE SET \"user_id\"=EXCLUDED.\"user_id\",\"status\"=EXCLUDED.\"status\""));
+                + " DO UPDATE SET \"status\"=EXCLUDED.\"status\""));
     }
     
     private DataRecord mockDataRecord() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
index 70fd92081e2..fb6f5d92440 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
@@ -19,18 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
 
-import java.util.Map;
-import java.util.Set;
-
 public final class FixturePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
-    public FixturePipelineSQLBuilder() {
-    }
-    
-    public FixturePipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        super(shardingColumnsMap);
-    }
-    
     @Override
     protected String getLeftIdentifierQuoteString() {
         return "`";
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 979fa005188..db7a53dd4b4 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -18,6 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.core.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
@@ -25,10 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
-import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -57,20 +57,11 @@ public final class AbstractImporterTest {
     
     private static final String TABLE_NAME = "test_table";
     
-    private static final String INSERT_SQL = "INSERT INTO test_table (id,user,status) VALUES(?,?,?)";
-    
-    private static final String DELETE_SQL = "DELETE FROM test_table WHERE id = ? and user = ?";
-    
-    private static final String UPDATE_SQL = "UPDATE test_table SET user = ?,status = ? WHERE id = ? and user = ?";
-    
     @Mock
     private PipelineDataSourceManager dataSourceManager;
     
-    @Mock
-    private PipelineSQLBuilder pipelineSqlBuilder;
-    
-    @Mock
-    private PipelineDataSourceConfiguration dataSourceConfig;
+    private final PipelineDataSourceConfiguration dataSourceConfig = new StandardPipelineDataSourceConfiguration(
+            "jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root", "root", "root");
     
     @Mock
     private PipelineChannel channel;
@@ -89,11 +80,6 @@ public final class AbstractImporterTest {
     @Before
     public void setUp() throws SQLException {
         jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel) {
-            
-            @Override
-            protected PipelineSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-                return pipelineSqlBuilder;
-            }
         };
         when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
         when(dataSource.getConnection()).thenReturn(connection);
@@ -102,8 +88,7 @@ public final class AbstractImporterTest {
     @Test
     public void assertWriteInsertDataRecord() throws SQLException {
         DataRecord insertRecord = getDataRecord("INSERT");
-        when(pipelineSqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
-        when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
+        when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(insertRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 1);
@@ -115,8 +100,7 @@ public final class AbstractImporterTest {
     @Test
     public void assertDeleteDataRecord() throws SQLException {
         DataRecord deleteRecord = getDataRecord("DELETE");
-        when(pipelineSqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
-        when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
+        when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(deleteRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 1);
@@ -127,10 +111,8 @@ public final class AbstractImporterTest {
     @Test
     public void assertUpdateDataRecord() throws SQLException {
         DataRecord updateRecord = getDataRecord("UPDATE");
-        when(pipelineSqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
-        when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
+        when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
-        when(pipelineSqlBuilder.extractUpdatedColumns(any(), any())).thenReturn(RecordUtil.extractUpdatedColumns(updateRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 10);
         verify(preparedStatement).setObject(2, "UPDATE");
@@ -142,10 +124,8 @@ public final class AbstractImporterTest {
     @Test
     public void assertUpdatePrimaryKeyDataRecord() throws SQLException {
         DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
-        when(pipelineSqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
-        when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
+        when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
-        when(pipelineSqlBuilder.extractUpdatedColumns(any(), any())).thenReturn(RecordUtil.extractUpdatedColumns(updateRecord));
         jdbcImporter.run();
         InOrder inOrder = inOrder(preparedStatement);
         inOrder.verify(preparedStatement).setObject(1, 2);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
index e93f4d27f2e..db519865515 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -26,6 +27,8 @@ import org.junit.Test;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -34,22 +37,24 @@ public final class PipelineSQLBuilderTest {
     
     private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder();
     
+    private final Map<String, Set<String>> shardingColumnsMap = ImmutableMap.<String, Set<String>>builder().put("t2", Collections.singleton("sc")).build();
+    
     @Test
     public void assertBuildInsertSQL() {
-        String actual = pipelineSQLBuilder.buildInsertSQL(mockDataRecord("t1"));
-        assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
+        String actual = pipelineSQLBuilder.buildInsertSQL(mockDataRecord("t2"), shardingColumnsMap);
+        assertThat(actual, is("INSERT INTO `t2`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
     }
     
     @Test
     public void assertBuildUpdateSQLWithPrimaryKey() {
-        String actual = pipelineSQLBuilder.buildUpdateSQL(mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
+        String actual = pipelineSQLBuilder.buildUpdateSQL(mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")), shardingColumnsMap);
         assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ?"));
     }
     
     @Test
     public void assertBuildUpdateSQLWithShardingColumns() {
         DataRecord dataRecord = mockDataRecord("t2");
-        String actual = pipelineSQLBuilder.buildUpdateSQL(dataRecord, mockConditionColumns(dataRecord));
+        String actual = pipelineSQLBuilder.buildUpdateSQL(dataRecord, mockConditionColumns(dataRecord), shardingColumnsMap);
         assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
     }