You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/11/04 12:38:20 UTC

[shardingsphere] branch master updated: sharding column could be updated in pipeline (#21949)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bd9487cd95 sharding column could be updated in pipeline (#21949)
8bd9487cd95 is described below

commit 8bd9487cd9553f8bf00e548490b7478b126bb29e
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Nov 4 20:38:06 2022 +0800

    sharding column could be updated in pipeline (#21949)
    
    * Remove shardingColumnsMap in SQL builder
    
    * Unit test
    
    * infra/executor trigger scaling IT
---
 .github/workflows/it-scaling.yml                        |  2 ++
 .../pipeline/spi/sqlbuilder/PipelineSQLBuilder.java     | 12 +++---------
 .../data/pipeline/core/importer/DefaultImporter.java    |  6 +++---
 .../core/sqlbuilder/AbstractPipelineSQLBuilder.java     | 17 ++++-------------
 .../core/sqlbuilder/OraclePipelineSQLBuilder.java       |  8 ++------
 .../algorithm/fixture/FixturePipelineSQLBuilder.java    |  9 +++------
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java       | 15 ++++++++-------
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java   | 13 +++----------
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java         | 15 ++++++---------
 .../sqlbuilder/OpenGaussPipelineSQLBuilderTest.java     |  4 +---
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java        | 11 ++++-------
 .../sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java    | 14 ++------------
 .../pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java | 12 +++---------
 13 files changed, 44 insertions(+), 94 deletions(-)

diff --git a/.github/workflows/it-scaling.yml b/.github/workflows/it-scaling.yml
index 586f1db2a56..abd66306610 100644
--- a/.github/workflows/it-scaling.yml
+++ b/.github/workflows/it-scaling.yml
@@ -23,6 +23,7 @@ on:
     paths:
       - '.github/workflows/it-scaling.yml'
       - 'infra/common/src/main/**'
+      - 'infra/executor/src/main/**'
       - 'mode/**/src/main/**'
       - 'proxy/**/src/main/**'
       - 'jdbc/core/src/main/**'
@@ -42,6 +43,7 @@ on:
     paths:
       - '.github/workflows/it-scaling.yml'
       - 'infra/common/src/main/**'
+      - 'infra/executor/src/main/**'
       - 'mode/**/src/main/**'
       - 'proxy/**/src/main/**'
       - 'jdbc/core/src/main/**'
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index c5663ba699b..71bba785eaf 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -19,15 +19,12 @@ package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * Pipeline SQL builder.
@@ -73,10 +70,9 @@ public interface PipelineSQLBuilder extends TypedSPI, RequiredSPI {
      *
      * @param schemaName schema name
      * @param dataRecord data record
-     * @param shardingColumnsMap sharding columns map
      * @return insert SQL
      */
-    String buildInsertSQL(String schemaName, DataRecord dataRecord, Map<LogicTableName, Set<String>> shardingColumnsMap);
+    String buildInsertSQL(String schemaName, DataRecord dataRecord);
     
     /**
      * Build update SQL.
@@ -84,19 +80,17 @@ public interface PipelineSQLBuilder extends TypedSPI, RequiredSPI {
      * @param schemaName schema name
      * @param dataRecord data record
      * @param conditionColumns condition columns
-     * @param shardingColumnsMap sharding columns map
      * @return update SQL
      */
-    String buildUpdateSQL(String schemaName, DataRecord dataRecord, Collection<Column> conditionColumns, Map<LogicTableName, Set<String>> shardingColumnsMap);
+    String buildUpdateSQL(String schemaName, DataRecord dataRecord, Collection<Column> conditionColumns);
     
     /**
      * Extract updated columns.
      *
      * @param record data record
-     * @param shardingColumnsMap sharding columns map
      * @return filtered columns
      */
-    List<Column> extractUpdatedColumns(DataRecord record, Map<LogicTableName, Set<String>> shardingColumnsMap);
+    List<Column> extractUpdatedColumns(DataRecord record);
     
     /**
      * Build delete SQL.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 2c89cfce650..2a60564a4e0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -190,7 +190,7 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
     
     private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
-        String insertSql = pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord, importerConfig.getShardingColumnsMap());
+        String insertSql = pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord);
         try (PreparedStatement ps = connection.prepareStatement(insertSql)) {
             batchInsertStatement = ps;
             ps.setQueryTimeout(30);
@@ -222,8 +222,8 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
             log.error("executeUpdate, could not get shardingColumns, tableName={}, logicTableNames={}", record.getTableName(), importerConfig.getLogicTableNames());
         }
         List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, shardingColumns);
-        List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record, importerConfig.getShardingColumnsMap());
-        String updateSql = pipelineSqlBuilder.buildUpdateSQL(getSchemaName(record.getTableName()), record, conditionColumns, importerConfig.getShardingColumnsMap());
+        List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record);
+        String updateSql = pipelineSqlBuilder.buildUpdateSQL(getSchemaName(record.getTableName()), record, conditionColumns);
         try (PreparedStatement ps = connection.prepareStatement(updateSql)) {
             updateStatement = ps;
             for (int i = 0; i < updatedColumns.size(); i++) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 75bd659b63c..e14051863b8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -21,7 +21,6 @@ import com.google.common.base.Strings;
 import lombok.NonNull;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -29,8 +28,6 @@ import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -83,7 +80,7 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
         String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
             sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(schemaName, dataRecord.getTableName(), dataRecord.getColumns()));
@@ -103,20 +100,14 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
         return String.format("INSERT INTO %s(%s) VALUES(%s)", getQualifiedTableName(schemaName, tableName), columnsLiteral, holder);
     }
     
-    // TODO seems sharding column could be updated for insert statement on conflict by kernel now
-    protected final boolean isShardingColumn(final Map<LogicTableName, Set<String>> shardingColumnsMap, final String tableName, final String columnName) {
-        Set<String> shardingColumns = shardingColumnsMap.get(new LogicTableName(tableName));
-        return null != shardingColumns && shardingColumns.contains(columnName);
-    }
-    
     @Override
-    public String buildUpdateSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    public String buildUpdateSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
             sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(schemaName, dataRecord.getTableName(), conditionColumns));
         }
         StringBuilder updatedColumnString = new StringBuilder();
-        for (Column each : extractUpdatedColumns(dataRecord, shardingColumnsMap)) {
+        for (Column each : extractUpdatedColumns(dataRecord)) {
             updatedColumnString.append(String.format("%s = ?,", quote(each.getName())));
         }
         updatedColumnString.setLength(updatedColumnString.length() - 1);
@@ -128,7 +119,7 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public List<Column> extractUpdatedColumns(final DataRecord record, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    public List<Column> extractUpdatedColumns(final DataRecord record) {
         return new ArrayList<>(RecordUtil.extractUpdatedColumns(record));
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
index 67a4e4716eb..601f650b30d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
@@ -19,10 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 
 import lombok.NonNull;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-
-import java.util.Map;
-import java.util.Set;
 
 /**
  * Oracle pipeline SQL builder.
@@ -45,8 +41,8 @@ public final class OraclePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     }
     
     @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
-        return super.buildInsertSQL(schemaName, dataRecord, shardingColumnsMap);
+    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
+        return super.buildInsertSQL(schemaName, dataRecord);
         // TODO buildInsertSQL and buildConflictSQL, need 2 round parameters set
         // TODO refactor PipelineSQLBuilder to combine SQL building and parameters set
     }
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 71b61214ffe..9db1da50dce 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -19,15 +19,12 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
     
@@ -42,17 +39,17 @@ public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
     }
     
     @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
         return "";
     }
     
     @Override
-    public String buildUpdateSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    public String buildUpdateSQL(final String schemaName, final DataRecord dataRecord, final Collection<Column> conditionColumns) {
         return "";
     }
     
     @Override
-    public List<Column> extractUpdatedColumns(final DataRecord record, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    public List<Column> extractUpdatedColumns(final DataRecord record) {
         return Collections.emptyList();
     }
     
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 286de508afe..9a1327c86fa 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -19,12 +19,9 @@ package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
 
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * MySQL pipeline SQL builder.
@@ -32,15 +29,19 @@ import java.util.Set;
 public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
     @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
-        return super.buildInsertSQL(schemaName, dataRecord, shardingColumnsMap) + buildDuplicateUpdateSQL(dataRecord, shardingColumnsMap);
+    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
+        return super.buildInsertSQL(schemaName, dataRecord) + buildDuplicateUpdateSQL(dataRecord);
     }
     
-    private String buildDuplicateUpdateSQL(final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    private String buildDuplicateUpdateSQL(final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
-            if (column.isUniqueKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
+            if (!column.isUpdated()) {
+                continue;
+            }
+            // TOOD not skip unique key
+            if (column.isUniqueKey()) {
                 continue;
             }
             result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index f2b276fefde..19938c17803 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -17,17 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -37,17 +32,15 @@ public final class MySQLPipelineSQLBuilderTest {
     
     private final MySQLPipelineSQLBuilder sqlBuilder = new MySQLPipelineSQLBuilder();
     
-    private final Map<LogicTableName, Set<String>> shardingColumnsMap = ImmutableMap.<LogicTableName, Set<String>>builder().put(new LogicTableName("t2"), Collections.singleton("sc")).build();
-    
     @Test
     public void assertBuildInsertSQL() {
-        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t1"), shardingColumnsMap);
-        assertThat(actual, is("INSERT INTO t1(id,sc,c1,c2,c3) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE sc=VALUES(sc),c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
+        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t1"));
+        assertThat(actual, is("INSERT INTO t1(id,sc,c1,c2,c3) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
     }
     
     @Test
     public void assertBuildInsertSQLHasShardingColumn() {
-        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t2"), shardingColumnsMap);
+        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t2"));
         assertThat(actual, is("INSERT INTO t2(id,sc,c1,c2,c3) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE c1=VALUES(c1),c2=VALUES(c2),c3=VALUES(c3)"));
     }
     
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index cadb19b69c5..56fe4dd2f5b 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -19,13 +19,10 @@ package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -39,20 +36,20 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
     }
     
     @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
-        return super.buildInsertSQL(schemaName, dataRecord, shardingColumnsMap) + buildConflictSQL(dataRecord, shardingColumnsMap);
+    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
+        return super.buildInsertSQL(schemaName, dataRecord) + buildConflictSQL(dataRecord);
     }
     
     @Override
-    public List<Column> extractUpdatedColumns(final DataRecord record, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
-        return record.getColumns().stream().filter(each -> !(each.isUniqueKey() || isShardingColumn(shardingColumnsMap, record.getTableName(), each.getName()))).collect(Collectors.toList());
+    public List<Column> extractUpdatedColumns(final DataRecord record) {
+        return record.getColumns().stream().filter(each -> !(each.isUniqueKey())).collect(Collectors.toList());
     }
     
-    private String buildConflictSQL(final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    private String buildConflictSQL(final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
-            if (column.isUniqueKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
+            if (column.isUniqueKey()) {
                 continue;
             }
             result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(",");
diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index 4bac9b4daaf..dc94e901c50 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -22,8 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.junit.Test;
 
-import java.util.Collections;
-
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -33,7 +31,7 @@ public final class OpenGaussPipelineSQLBuilderTest {
     
     @Test
     public void assertBuildInsertSQL() {
-        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t1"), Collections.emptyMap());
+        String actual = sqlBuilder.buildInsertSQL(null, mockDataRecord("t1"));
         assertThat(actual, is("INSERT INTO t1(id,c0,c1,c2,c3) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE c0=EXCLUDED.c0,c1=EXCLUDED.c1,c2=EXCLUDED.c2,c3=EXCLUDED.c3"));
     }
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 828551fa538..f56d42345fe 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -19,13 +19,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
 
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * PostgreSQL pipeline SQL builder.
@@ -38,12 +35,12 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
     }
     
     @Override
-    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
-        return super.buildInsertSQL(schemaName, dataRecord, shardingColumnsMap) + buildConflictSQL(dataRecord, shardingColumnsMap);
+    public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
+        return super.buildInsertSQL(schemaName, dataRecord) + buildConflictSQL(dataRecord);
     }
     
     // Refer to https://www.postgresql.org/docs/current/sql-insert.html
-    private String buildConflictSQL(final DataRecord dataRecord, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+    private String buildConflictSQL(final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON CONFLICT (");
         for (Column each : RecordUtil.extractPrimaryColumns(dataRecord)) {
             result.append(each.getName()).append(",");
@@ -52,7 +49,7 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
         result.append(") DO UPDATE SET ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
-            if (column.isUniqueKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
+            if (column.isUniqueKey()) {
                 continue;
             }
             result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(",");
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index bf02a8e84fe..18a0d8d2ad5 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -17,20 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.Test;
 import org.postgresql.replication.LogSequenceNumber;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -38,14 +31,11 @@ public final class PostgreSQLPipelineSQLBuilderTest {
     
     private final PostgreSQLPipelineSQLBuilder sqlBuilder = new PostgreSQLPipelineSQLBuilder();
     
-    private final Map<LogicTableName, Set<String>> shardingColumnsMap = ImmutableMap.<LogicTableName, Set<String>>builder()
-            .put(new LogicTableName("t_order"), new HashSet<>(Arrays.asList("order_id", "user_id"))).build();
-    
     @Test
     public void assertBuildInsertSQL() {
-        String actual = sqlBuilder.buildInsertSQL("schema1", mockDataRecord(), shardingColumnsMap);
+        String actual = sqlBuilder.buildInsertSQL("schema1", mockDataRecord());
         assertThat(actual, is("INSERT INTO schema1.t_order(order_id,user_id,status) VALUES(?,?,?) ON CONFLICT (order_id)"
-                + " DO UPDATE SET status=EXCLUDED.status"));
+                + " DO UPDATE SET user_id=EXCLUDED.user_id,status=EXCLUDED.status"));
     }
     
     private DataRecord mockDataRecord() {
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
index ee48d13c808..83b0c2cc4f5 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilderTest.java
@@ -17,19 +17,15 @@
 
 package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.junit.Test;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -38,24 +34,22 @@ public final class PipelineSQLBuilderTest {
     
     private final PipelineSQLBuilder pipelineSQLBuilder = new FixturePipelineSQLBuilder();
     
-    private final Map<LogicTableName, Set<String>> shardingColumnsMap = ImmutableMap.<LogicTableName, Set<String>>builder().put(new LogicTableName("t2"), Collections.singleton("sc")).build();
-    
     @Test
     public void assertBuildInsertSQL() {
-        String actual = pipelineSQLBuilder.buildInsertSQL(null, mockDataRecord("t2"), shardingColumnsMap);
+        String actual = pipelineSQLBuilder.buildInsertSQL(null, mockDataRecord("t2"));
         assertThat(actual, is("INSERT INTO t2(id,sc,c1,c2,c3) VALUES(?,?,?,?,?)"));
     }
     
     @Test
     public void assertBuildUpdateSQLWithPrimaryKey() {
-        String actual = pipelineSQLBuilder.buildUpdateSQL(null, mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")), shardingColumnsMap);
+        String actual = pipelineSQLBuilder.buildUpdateSQL(null, mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
         assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = ?"));
     }
     
     @Test
     public void assertBuildUpdateSQLWithShardingColumns() {
         DataRecord dataRecord = mockDataRecord("t2");
-        String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord, mockConditionColumns(dataRecord), shardingColumnsMap);
+        String actual = pipelineSQLBuilder.buildUpdateSQL(null, dataRecord, mockConditionColumns(dataRecord));
         assertThat(actual, is("UPDATE t2 SET c1 = ?,c2 = ?,c3 = ? WHERE id = ? and sc = ?"));
     }