You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/15 01:59:48 UTC

[shardingsphere] branch master updated: Support PostgreSQL insert on conflict do update in scaling job (#16837)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c44052c170 Support PostgreSQL insert on conflict do update in scaling job (#16837)
2c44052c170 is described below

commit 2c44052c1701e7e87980e643111c6e895d6d27fa
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Apr 15 09:59:32 2022 +0800

    Support PostgreSQL insert on conflict do update in scaling job (#16837)
---
 .../DataMatchSingleTableDataCalculator.java        |  2 ++
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |  5 +++
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  2 +-
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  | 11 ++-----
 .../wal/decode/OpenGaussLogSequenceNumber.java     |  2 ++
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    |  4 ---
 .../wal/decode/PostgreSQLLogSequenceNumber.java    |  2 ++
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   | 16 +++++++---
 .../importer/PostgreSQLImporterTest.java           | 36 +++++-----------------
 .../PostgreSQLPipelineSQLBuilderTest.java          |  8 +++--
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |  1 +
 11 files changed, 39 insertions(+), 50 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
index 14540e4c957..dffeb2bbd46 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
@@ -148,6 +148,7 @@ public final class DataMatchSingleTableDataCalculator extends AbstractStreamingS
                 Collection<Object> thisNext = thisIterator.next();
                 Collection<Object> thatNext = thatIterator.next();
                 if (thisNext.size() != thatNext.size()) {
+                    log.info("record column size not match, size1={}, size2={}, record1={}, record2={}", thisNext.size(), thatNext.size(), thisNext, thatNext);
                     return false;
                 }
                 Iterator<Object> thisNextIterator = thisNext.iterator();
@@ -159,6 +160,7 @@ public final class DataMatchSingleTableDataCalculator extends AbstractStreamingS
                         return ((SQLXML) thisResult).getString().equals(((SQLXML) thatResult).getString());
                     }
                     if (!new EqualsBuilder().append(thisResult, thatResult).isEquals()) {
+                        log.info("record column value not match, value1={}, value2={}, record1={}, record2={}", thisResult, thatResult, thisNext, thatNext);
                         return false;
                     }
                 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 4614e196aa9..45f2a6f8188 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -103,6 +103,11 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
         return String.format("INSERT INTO %s(%s) VALUES(%s)", quote(tableName), columnsLiteral, holder);
     }
     
+    // TODO seems sharding column could be updated for insert statement on conflict by kernel now
+    protected final boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap, final String tableName, final String columnName) {
+        return shardingColumnsMap.containsKey(tableName) && shardingColumnsMap.get(tableName).contains(columnName);
+    }
+    
     @Override
     public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index e1c5a93fbd2..07dc0beb12e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -171,7 +171,7 @@ public final class RuleAlteredJobWorker {
     public void start(final StartScalingEvent event) {
         log.info("Start scaling job by {}", event);
         if (!isUncompletedJobOfSameSchemaInJobList(event.getSchemaName())) {
-            log.warn("There is an outstanding job with the same schema name");
+            log.warn("There is uncompleted job with the same schema name, please handle it first, current job will be ignored");
             return;
         }
         Optional<JobConfiguration> jobConfigOptional = createJobConfig(event);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index cf8409ce475..4bd09360922 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 
+import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
@@ -27,11 +28,9 @@ import java.util.Set;
 /**
  * MySQL pipeline SQL builder.
  */
+@NoArgsConstructor
 public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
-    public MySQLPipelineSQLBuilder() {
-    }
-    
     public MySQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
         super(shardingColumnsMap);
     }
@@ -64,12 +63,6 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
         return result.toString();
     }
     
-    private boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap,
-                                     final String tableName, final String columnName) {
-        return shardingColumnsMap.containsKey(tableName)
-                && shardingColumnsMap.get(tableName).contains(columnName);
-    }
-    
     /**
      * Build CRC32 SQL.
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
index ac7ac1de6aa..581efaaf4f0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode;
 
 import lombok.RequiredArgsConstructor;
+import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
 import org.opengauss.replication.LogSequenceNumber;
 
@@ -25,6 +26,7 @@ import org.opengauss.replication.LogSequenceNumber;
  * Log sequence number of openGauss.
  */
 @RequiredArgsConstructor
+@ToString
 public final class OpenGaussLogSequenceNumber implements BaseLogSequenceNumber {
     
     private final LogSequenceNumber logSequenceNumber;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 0e3567dbbbd..281cc261b28 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -59,10 +59,6 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
         return new ArrayList<>(Collections2.filter(columns, column -> !(column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), record.getTableName(), column.getName()))));
     }
     
-    private boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap, final String tableName, final String columnName) {
-        return shardingColumnsMap.containsKey(tableName) && shardingColumnsMap.get(tableName).contains(columnName);
-    }
-    
     private String buildConflictSQL() {
         // there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
         return "";
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNu [...]
index 2ba4e67c781..edd54e81eba 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
@@ -18,12 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
 
 import lombok.AllArgsConstructor;
+import lombok.ToString;
 import org.postgresql.replication.LogSequenceNumber;
 
 /**
  * PostgreSQL sequence.
  */
 @AllArgsConstructor
+@ToString
 public final class PostgreSQLLogSequenceNumber implements BaseLogSequenceNumber {
     
     private final LogSequenceNumber logSequenceNumber;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index c4d0851fbeb..cb1cdda89fe 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
+import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -28,11 +29,9 @@ import java.util.Set;
 /**
  * PostgreSQL pipeline SQL builder.
  */
+@NoArgsConstructor
 public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     
-    public PostgreSQLPipelineSQLBuilder() {
-    }
-    
     public PostgreSQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
         super(shardingColumnsMap);
     }
@@ -52,13 +51,22 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
         return super.buildInsertSQL(dataRecord) + buildConflictSQL(dataRecord);
     }
     
+    // Refer to https://www.postgresql.org/docs/current/sql-insert.html
     private String buildConflictSQL(final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON CONFLICT (");
         for (Column each : RecordUtil.extractPrimaryColumns(dataRecord)) {
             result.append(each.getName()).append(",");
         }
         result.setLength(result.length() - 1);
-        result.append(") DO NOTHING");
+        result.append(") DO UPDATE SET ");
+        for (int i = 0; i < dataRecord.getColumnCount(); i++) {
+            Column column = dataRecord.getColumn(i);
+            if (column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
+                continue;
+            }
+            result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(",");
+        }
+        result.setLength(result.length() - 1);
         return result.toString();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
index 441a04e4da8..edb2cbe4769 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
@@ -18,46 +18,24 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
+import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.postgresql.replication.LogSequenceNumber;
 
 import java.util.Collections;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class PostgreSQLImporterTest {
     
-    @Mock
-    private ImporterConfiguration importerConfig;
-    
-    @Mock
-    private PipelineDataSourceManager dataSourceManager;
-    
-    @Mock
-    private PipelineChannel channel;
-    
     @Test
     public void assertCreateSQLBuilder() {
-        String insertSQL = new PostgreSQLImporter(importerConfig, dataSourceManager, channel).createSQLBuilder(Collections.emptyMap()).buildInsertSQL(mockDataRecord());
-        assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
-    }
-    
-    private DataRecord mockDataRecord() {
-        DataRecord result = new DataRecord(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
-        result.setTableName("t_order");
-        result.addColumn(new Column("id", 1, true, true));
-        result.addColumn(new Column("name", "", true, false));
-        return result;
+        ImporterConfiguration importerConfig = mock(ImporterConfiguration.class);
+        PipelineSQLBuilder actual = new PostgreSQLImporter(importerConfig, null, null).createSQLBuilder(Collections.emptyMap());
+        assertTrue(actual instanceof PostgreSQLPipelineSQLBuilder);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index 5c01ac5cc78..c7f2d5cb902 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -33,14 +33,16 @@ public final class PostgreSQLPipelineSQLBuilderTest {
     @Test
     public void assertBuildInsertSQL() {
         String actual = PipelineSQLBuilderFactory.getSQLBuilder("PostgreSQL").buildInsertSQL(mockDataRecord());
-        assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+        assertThat(actual, is("INSERT INTO \"t_order\"(\"order_id\",\"user_id\",\"status\") VALUES(?,?,?) ON CONFLICT (order_id)"
+                + " DO UPDATE SET \"user_id\"=EXCLUDED.\"user_id\",\"status\"=EXCLUDED.\"status\""));
     }
     
     private DataRecord mockDataRecord() {
         DataRecord result = new DataRecord(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
         result.setTableName("t_order");
-        result.addColumn(new Column("id", 1, true, true));
-        result.addColumn(new Column("name", "", true, false));
+        result.addColumn(new Column("order_id", 1, true, true));
+        result.addColumn(new Column("user_id", 2, true, false));
+        result.addColumn(new Column("status", "ok", true, false));
         return result;
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 3b37506ff0b..ae9627c0d42 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -28,6 +28,7 @@ import java.util.List;
  * Pipeline SQL builder.
  * It's singleton when it's used as SPI, else not.
  */
+// TODO refactor to stateless for SPI usage, it's confusing now
 public interface PipelineSQLBuilder extends StatelessTypedSPI {
     
     /**