You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/04/15 01:59:48 UTC
[shardingsphere] branch master updated: Support PostgreSQL insert on conflict do update in scaling job (#16837)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2c44052c170 Support PostgreSQL insert on conflict do update in scaling job (#16837)
2c44052c170 is described below
commit 2c44052c1701e7e87980e643111c6e895d6d27fa
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Apr 15 09:59:32 2022 +0800
Support PostgreSQL insert on conflict do update in scaling job (#16837)
---
.../DataMatchSingleTableDataCalculator.java | 2 ++
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 5 +++
.../scenario/rulealtered/RuleAlteredJobWorker.java | 2 +-
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 11 ++-----
.../wal/decode/OpenGaussLogSequenceNumber.java | 2 ++
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 4 ---
.../wal/decode/PostgreSQLLogSequenceNumber.java | 2 ++
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 16 +++++++---
.../importer/PostgreSQLImporterTest.java | 36 +++++-----------------
.../PostgreSQLPipelineSQLBuilderTest.java | 8 +++--
.../spi/sqlbuilder/PipelineSQLBuilder.java | 1 +
11 files changed, 39 insertions(+), 50 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
index 14540e4c957..dffeb2bbd46 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
@@ -148,6 +148,7 @@ public final class DataMatchSingleTableDataCalculator extends AbstractStreamingS
Collection<Object> thisNext = thisIterator.next();
Collection<Object> thatNext = thatIterator.next();
if (thisNext.size() != thatNext.size()) {
+ log.info("record column size not match, size1={}, size2={}, record1={}, record2={}", thisNext.size(), thatNext.size(), thisNext, thatNext);
return false;
}
Iterator<Object> thisNextIterator = thisNext.iterator();
@@ -159,6 +160,7 @@ public final class DataMatchSingleTableDataCalculator extends AbstractStreamingS
return ((SQLXML) thisResult).getString().equals(((SQLXML) thatResult).getString());
}
if (!new EqualsBuilder().append(thisResult, thatResult).isEquals()) {
+ log.info("record column value not match, value1={}, value2={}, record1={}, record2={}", thisResult, thatResult, thisNext, thatNext);
return false;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 4614e196aa9..45f2a6f8188 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -103,6 +103,11 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
return String.format("INSERT INTO %s(%s) VALUES(%s)", quote(tableName), columnsLiteral, holder);
}
+ // TODO seems sharding column could be updated for insert statement on conflict by kernel now
+ protected final boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap, final String tableName, final String columnName) {
+ return shardingColumnsMap.containsKey(tableName) && shardingColumnsMap.get(tableName).contains(columnName);
+ }
+
@Override
public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index e1c5a93fbd2..07dc0beb12e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -171,7 +171,7 @@ public final class RuleAlteredJobWorker {
public void start(final StartScalingEvent event) {
log.info("Start scaling job by {}", event);
if (!isUncompletedJobOfSameSchemaInJobList(event.getSchemaName())) {
- log.warn("There is an outstanding job with the same schema name");
+ log.warn("There is uncompleted job with the same schema name, please handle it first, current job will be ignored");
return;
}
Optional<JobConfiguration> jobConfigOptional = createJobConfig(event);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index cf8409ce475..4bd09360922 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
@@ -27,11 +28,9 @@ import java.util.Set;
/**
* MySQL pipeline SQL builder.
*/
+@NoArgsConstructor
public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
- public MySQLPipelineSQLBuilder() {
- }
-
public MySQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
super(shardingColumnsMap);
}
@@ -64,12 +63,6 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
return result.toString();
}
- private boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap,
- final String tableName, final String columnName) {
- return shardingColumnsMap.containsKey(tableName)
- && shardingColumnsMap.get(tableName).contains(columnName);
- }
-
/**
* Build CRC32 SQL.
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
index ac7ac1de6aa..581efaaf4f0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode;
import lombok.RequiredArgsConstructor;
+import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
import org.opengauss.replication.LogSequenceNumber;
@@ -25,6 +26,7 @@ import org.opengauss.replication.LogSequenceNumber;
* Log sequence number of openGauss.
*/
@RequiredArgsConstructor
+@ToString
public final class OpenGaussLogSequenceNumber implements BaseLogSequenceNumber {
private final LogSequenceNumber logSequenceNumber;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 0e3567dbbbd..281cc261b28 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -59,10 +59,6 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
return new ArrayList<>(Collections2.filter(columns, column -> !(column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), record.getTableName(), column.getName()))));
}
- private boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap, final String tableName, final String columnName) {
- return shardingColumnsMap.containsKey(tableName) && shardingColumnsMap.get(tableName).contains(columnName);
- }
-
private String buildConflictSQL() {
// there need return ON DUPLICATE KEY UPDATE NOTHING after support this syntax.
return "";
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNu [...]
index 2ba4e67c781..edd54e81eba 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
@@ -18,12 +18,14 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
import lombok.AllArgsConstructor;
+import lombok.ToString;
import org.postgresql.replication.LogSequenceNumber;
/**
* PostgreSQL sequence.
*/
@AllArgsConstructor
+@ToString
public final class PostgreSQLLogSequenceNumber implements BaseLogSequenceNumber {
private final LogSequenceNumber logSequenceNumber;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index c4d0851fbeb..cb1cdda89fe 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -28,11 +29,9 @@ import java.util.Set;
/**
* PostgreSQL pipeline SQL builder.
*/
+@NoArgsConstructor
public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
- public PostgreSQLPipelineSQLBuilder() {
- }
-
public PostgreSQLPipelineSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
super(shardingColumnsMap);
}
@@ -52,13 +51,22 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
return super.buildInsertSQL(dataRecord) + buildConflictSQL(dataRecord);
}
+ // Refer to https://www.postgresql.org/docs/current/sql-insert.html
private String buildConflictSQL(final DataRecord dataRecord) {
StringBuilder result = new StringBuilder(" ON CONFLICT (");
for (Column each : RecordUtil.extractPrimaryColumns(dataRecord)) {
result.append(each.getName()).append(",");
}
result.setLength(result.length() - 1);
- result.append(") DO NOTHING");
+ result.append(") DO UPDATE SET ");
+ for (int i = 0; i < dataRecord.getColumnCount(); i++) {
+ Column column = dataRecord.getColumn(i);
+ if (column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
+ continue;
+ }
+ result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(",");
+ }
+ result.setLength(result.length() - 1);
return result.toString();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
index 441a04e4da8..edb2cbe4769 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporterTest.java
@@ -18,46 +18,24 @@
package org.apache.shardingsphere.data.pipeline.postgresql.importer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
+import org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.postgresql.replication.LogSequenceNumber;
import java.util.Collections;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public final class PostgreSQLImporterTest {
- @Mock
- private ImporterConfiguration importerConfig;
-
- @Mock
- private PipelineDataSourceManager dataSourceManager;
-
- @Mock
- private PipelineChannel channel;
-
@Test
public void assertCreateSQLBuilder() {
- String insertSQL = new PostgreSQLImporter(importerConfig, dataSourceManager, channel).createSQLBuilder(Collections.emptyMap()).buildInsertSQL(mockDataRecord());
- assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
- }
-
- private DataRecord mockDataRecord() {
- DataRecord result = new DataRecord(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
- result.setTableName("t_order");
- result.addColumn(new Column("id", 1, true, true));
- result.addColumn(new Column("name", "", true, false));
- return result;
+ ImporterConfiguration importerConfig = mock(ImporterConfiguration.class);
+ PipelineSQLBuilder actual = new PostgreSQLImporter(importerConfig, null, null).createSQLBuilder(Collections.emptyMap());
+ assertTrue(actual instanceof PostgreSQLPipelineSQLBuilder);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index 5c01ac5cc78..c7f2d5cb902 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -33,14 +33,16 @@ public final class PostgreSQLPipelineSQLBuilderTest {
@Test
public void assertBuildInsertSQL() {
String actual = PipelineSQLBuilderFactory.getSQLBuilder("PostgreSQL").buildInsertSQL(mockDataRecord());
- assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+ assertThat(actual, is("INSERT INTO \"t_order\"(\"order_id\",\"user_id\",\"status\") VALUES(?,?,?) ON CONFLICT (order_id)"
+ + " DO UPDATE SET \"user_id\"=EXCLUDED.\"user_id\",\"status\"=EXCLUDED.\"status\""));
}
private DataRecord mockDataRecord() {
DataRecord result = new DataRecord(new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
result.setTableName("t_order");
- result.addColumn(new Column("id", 1, true, true));
- result.addColumn(new Column("name", "", true, false));
+ result.addColumn(new Column("order_id", 1, true, true));
+ result.addColumn(new Column("user_id", 2, true, false));
+ result.addColumn(new Column("status", "ok", true, false));
return result;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 3b37506ff0b..ae9627c0d42 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -28,6 +28,7 @@ import java.util.List;
* Pipeline SQL builder.
* It's singleton when it's used as SPI, else not.
*/
+// TODO refactor to stateless for SPI usage, it's confusing now
public interface PipelineSQLBuilder extends StatelessTypedSPI {
/**