You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/08 09:03:53 UTC
[shardingsphere] branch master updated: Refactor DataRecord type and tableName fields as final (#26168)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 140c48120bc Refactor DataRecord type and tableName fields as final (#26168)
140c48120bc is described below
commit 140c48120bc6aff5627e87188a4b0fdc7ca8121a
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Thu Jun 8 17:03:44 2023 +0800
Refactor DataRecord type and tableName fields as final (#26168)
---
.../data/pipeline/api/ingest/record/DataRecord.java | 12 +++++++-----
.../cdc/util/DataRecordResultConvertUtilsTest.java | 4 +---
.../pipeline/core/ingest/dumper/InventoryDumper.java | 4 +---
.../pipeline/core/importer/DataRecordMergerTest.java | 12 +++---------
.../memory/MultiplexMemoryPipelineChannelTest.java | 3 ++-
.../data/pipeline/core/ingest/record/DataRecordTest.java | 13 +++++--------
.../data/pipeline/core/record/RecordUtilsTest.java | 4 ++--
.../pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java | 7 +++----
.../pipeline/mysql/ingest/MySQLIncrementalDumper.java | 16 +++++++---------
.../mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java | 4 ++--
.../sqlbuilder/OpenGaussPipelineSQLBuilderTest.java | 4 ++--
.../postgresql/ingest/wal/WALEventConverter.java | 15 ++++++---------
.../sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java | 4 ++--
.../core/importer/PipelineDataSourceSinkTest.java | 9 +++------
14 files changed, 46 insertions(+), 65 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index 6b60eef04e7..b0f0fa5622b 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -37,20 +37,22 @@ import java.util.List;
@ToString
public final class DataRecord extends Record {
+ private final String type;
+
+ private final String tableName;
+
private final List<Column> columns;
private final List<Object> uniqueKeyValue = new LinkedList<>();
private final List<Object> oldUniqueKeyValues = new ArrayList<>();
- private String type;
-
- private String tableName;
-
private Long csn;
- public DataRecord(final IngestPosition position, final int columnCount) {
+ public DataRecord(final String type, final String tableName, final IngestPosition position, final int columnCount) {
super(position);
+ this.type = type;
+ this.tableName = tableName;
columns = new ArrayList<>(columnCount);
}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 1da75e1d7e6..7280fdddac6 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -54,7 +54,7 @@ class DataRecordResultConvertUtilsTest {
@Test
void assertConvertDataRecordToRecord() throws InvalidProtocolBufferException, SQLException {
- DataRecord dataRecord = new DataRecord(new IntegerPrimaryKeyPosition(0, 1), 2);
+ DataRecord dataRecord = new DataRecord("INSERT", "t_order", new IntegerPrimaryKeyPosition(0, 1), 2);
dataRecord.addColumn(new Column("order_id", BigInteger.ONE, false, true));
dataRecord.addColumn(new Column("price", BigDecimal.valueOf(123), false, false));
dataRecord.addColumn(new Column("user_id", Long.MAX_VALUE, false, false));
@@ -73,8 +73,6 @@ class DataRecordResultConvertUtilsTest {
when(mockedClob.getSubString(anyLong(), anyInt())).thenReturn("clob\n");
dataRecord.addColumn(new Column("text_clob", mockedClob, false, false));
dataRecord.addColumn(new Column("update_time", new Timestamp(System.currentTimeMillis()), false, false));
- dataRecord.setTableName("t_order");
- dataRecord.setType("INSERT");
TypeRegistry registry = TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes())
.add(WrappersProto.getDescriptor().getMessageTypes()).build();
Record expectedRecord = DataRecordResultConvertUtils.convertDataRecordToRecord("test", null, dataRecord);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 576b437728f..59fa207e192 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -198,9 +198,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
int columnCount = resultSetMetaData.getColumnCount();
- DataRecord result = new DataRecord(newPosition(resultSet), columnCount);
- result.setType(IngestDataChangeType.INSERT);
- result.setTableName(dumperConfig.getLogicTableName());
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, dumperConfig.getLogicTableName(), newPosition(resultSet), columnCount);
List<String> insertColumnNames = Optional.ofNullable(dumperConfig.getInsertColumnNames()).orElse(Collections.emptyList());
ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
() -> new PipelineInvalidParameterException("Insert colum names count not equals ResultSet column count"));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
index 93f6cc62a77..75bb3b1657e 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
@@ -165,9 +165,7 @@ class DataRecordMergerTest {
}
private DataRecord mockInsertDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
- result.setType(IngestDataChangeType.INSERT);
- result.setTableName(tableName);
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 3);
result.addColumn(new Column("id", id, true, true));
result.addColumn(new Column("user_id", userId, true, false));
result.addColumn(new Column("total_price", totalPrice, true, false));
@@ -187,9 +185,7 @@ class DataRecordMergerTest {
}
private DataRecord mockUpdateDataRecord(final String tableName, final Integer oldId, final int id, final int userId, final int totalPrice) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
- result.setType(IngestDataChangeType.UPDATE);
- result.setTableName(tableName);
+ DataRecord result = new DataRecord(IngestDataChangeType.UPDATE, tableName, new PlaceholderPosition(), 3);
result.addColumn(new Column("id", oldId, id, null != oldId, true));
result.addColumn(new Column("user_id", userId, true, false));
result.addColumn(new Column("total_price", totalPrice, true, false));
@@ -201,9 +197,7 @@ class DataRecordMergerTest {
}
private DataRecord mockDeleteDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
- result.setType(IngestDataChangeType.DELETE);
- result.setTableName(tableName);
+ DataRecord result = new DataRecord(IngestDataChangeType.DELETE, tableName, new PlaceholderPosition(), 3);
result.addColumn(new Column("id", id, null, true, true));
result.addColumn(new Column("user_id", userId, null, true, false));
result.addColumn(new Column("total_price", totalPrice, null, true, false));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 144ef6d9808..35cefe84c8a 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.junit.jupiter.api.Test;
import java.security.SecureRandom;
@@ -97,7 +98,7 @@ class MultiplexMemoryPipelineChannelTest {
private Record[] mockRecords() {
Record[] result = new Record[100];
for (int i = 1; i <= result.length; i++) {
- result[i - 1] = random.nextBoolean() ? new DataRecord(new IntPosition(i), 0) : new PlaceholderRecord(new IntPosition(i));
+ result[i - 1] = random.nextBoolean() ? new DataRecord(IngestDataChangeType.INSERT, "t1", new IntPosition(i), 0) : new PlaceholderRecord(new IntPosition(i));
}
return result;
}
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
index 2d4ca0075d3..96345e69f47 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.record;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -33,12 +34,10 @@ class DataRecordTest {
@Test
void assertKeyEqual() {
- beforeDataRecord = new DataRecord(new PlaceholderPosition(), 2);
- beforeDataRecord.setTableName("t1");
+ beforeDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", new PlaceholderPosition(), 2);
beforeDataRecord.addColumn(new Column("id", 1, true, true));
beforeDataRecord.addColumn(new Column("name", "1", true, false));
- afterDataRecord = new DataRecord(new PlaceholderPosition(), 2);
- afterDataRecord.setTableName("t1");
+ afterDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", new PlaceholderPosition(), 2);
afterDataRecord.addColumn(new Column("id", 1, true, true));
afterDataRecord.addColumn(new Column("name", "2", true, false));
assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
@@ -46,12 +45,10 @@ class DataRecordTest {
@Test
void assertOldKeyEqual() {
- beforeDataRecord = new DataRecord(new PlaceholderPosition(), 2);
- beforeDataRecord.setTableName("t1");
+ beforeDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", new PlaceholderPosition(), 2);
beforeDataRecord.addColumn(new Column("id", 1, true, true));
beforeDataRecord.addColumn(new Column("name", "1", true, false));
- afterDataRecord = new DataRecord(new PlaceholderPosition(), 2);
- afterDataRecord.setTableName("t1");
+ afterDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", new PlaceholderPosition(), 2);
afterDataRecord.addColumn(new Column("id", 1, 2, true, true));
afterDataRecord.addColumn(new Column("name", "2", true, false));
assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getOldKey()));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
index 0f5c00a80e9..ef9cdd7f2d9 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.record;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -57,8 +58,7 @@ class RecordUtilsTest {
}
private DataRecord mockDataRecord(final String tableName) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
- result.setTableName(tableName);
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
result.addColumn(new Column("id", "", false, true));
result.addColumn(new Column("sc", "", false, true));
result.addColumn(new Column("c1", "", true, false));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
index 7481a806933..af3ff93b167 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.junit.jupiter.api.Test;
@@ -112,8 +113,7 @@ class PipelineSQLBuilderTest {
}
private DataRecord mockDataRecord(final String tableName) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
- result.setTableName(tableName);
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
result.addColumn(new Column("id", "", false, true));
result.addColumn(new Column("sc", "", false, false));
result.addColumn(new Column("c1", "", true, false));
@@ -137,8 +137,7 @@ class PipelineSQLBuilderTest {
}
private DataRecord mockDataRecordWithoutUniqueKey(final String tableName) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
- result.setTableName(tableName);
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
result.addColumn(new Column("id", "", false, false));
result.addColumn(new Column("name", "", true, false));
return result;
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 9a5ba016e04..93014201fa7 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -161,8 +161,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
- DataRecord dataRecord = createDataRecord(event, each.length);
- dataRecord.setType(IngestDataChangeType.INSERT);
+ DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
@@ -185,8 +184,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Serializable[] afterValues = event.getAfterRows().get(i);
- DataRecord dataRecord = createDataRecord(event, beforeValues.length);
- dataRecord.setType(IngestDataChangeType.UPDATE);
+ DataRecord dataRecord = createDataRecord(IngestDataChangeType.UPDATE, event, beforeValues.length);
for (int j = 0; j < beforeValues.length; j++) {
Serializable oldValue = beforeValues[j];
Serializable newValue = afterValues[j];
@@ -208,8 +206,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
- DataRecord dataRecord = createDataRecord(event, each.length);
- dataRecord.setType(IngestDataChangeType.DELETE);
+ DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
@@ -233,9 +230,10 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
return dataTypeHandler.isPresent() ? dataTypeHandler.get().handle(value) : value;
}
- private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
- DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
- result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal());
+ private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) {
+ String tableName = dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal();
+ IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId());
+ DataRecord result = new DataRecord(type, tableName, position, columnCount);
result.setCommitTime(rowsEvent.getTimestamp() * 1000);
return result;
}
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 9acb3db3303..818fdc41e9a 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.junit.jupiter.api.Test;
import java.util.Optional;
@@ -52,8 +53,7 @@ class MySQLPipelineSQLBuilderTest {
}
private DataRecord mockDataRecord(final String tableName) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
- result.setTableName(tableName);
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
result.addColumn(new Column("id", "", false, true));
result.addColumn(new Column("sc", "", false, false));
result.addColumn(new Column("c1", "", true, false));
diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index a134cf6d28f..4bdfccc27bf 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.junit.jupiter.api.Test;
import java.util.Optional;
@@ -39,8 +40,7 @@ class OpenGaussPipelineSQLBuilderTest {
}
private DataRecord mockDataRecord(final String tableName) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
- result.setTableName(tableName);
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
result.addColumn(new Column("id", "", false, true));
result.addColumn(new Column("c0", "", false, false));
result.addColumn(new Column("c1", "", true, false));
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 00cb9e6da3c..1968de02f07 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -96,15 +96,13 @@ public final class WALEventConverter {
}
private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {
- DataRecord result = createDataRecord(writeRowEvent, writeRowEvent.getAfterRow().size());
- result.setType(IngestDataChangeType.INSERT);
+ DataRecord result = createDataRecord(IngestDataChangeType.INSERT, writeRowEvent, writeRowEvent.getAfterRow().size());
putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getTableName(), writeRowEvent.getAfterRow());
return result;
}
private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) {
- DataRecord result = createDataRecord(updateRowEvent, updateRowEvent.getAfterRow().size());
- result.setType(IngestDataChangeType.UPDATE);
+ DataRecord result = createDataRecord(IngestDataChangeType.UPDATE, updateRowEvent, updateRowEvent.getAfterRow().size());
String actualTableName = updateRowEvent.getTableName();
putColumnsIntoDataRecord(result, tableMetaData, actualTableName, updateRowEvent.getAfterRow());
return result;
@@ -112,8 +110,7 @@ public final class WALEventConverter {
private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final PipelineTableMetaData tableMetaData) {
// TODO completion columns
- DataRecord result = createDataRecord(event, event.getPrimaryKeys().size());
- result.setType(IngestDataChangeType.DELETE);
+ DataRecord result = createDataRecord(IngestDataChangeType.DELETE, event, event.getPrimaryKeys().size());
// TODO Unique key may be a column within unique index
List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
@@ -122,9 +119,9 @@ public final class WALEventConverter {
return result;
}
- private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
- DataRecord result = new DataRecord(new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
- result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal());
+ private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) {
+ String tableName = dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal();
+ DataRecord result = new DataRecord(type, tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
result.setCsn(rowsEvent.getCsn());
return result;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index 1ea59d6693e..21f19f05742 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.junit.jupiter.api.Test;
@@ -42,8 +43,7 @@ class PostgreSQLPipelineSQLBuilderTest {
}
private DataRecord mockDataRecord() {
- DataRecord result = new DataRecord(new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
- result.setTableName("t_order");
+ DataRecord result = new DataRecord(IngestDataChangeType.INSERT, "t_order", new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
result.addColumn(new Column("order_id", 1, true, true));
result.addColumn(new Column("user_id", 2, true, false));
result.addColumn(new Column("status", "ok", true, false));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 07ad96e23ea..67ef9ec0918 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.junit.jupiter.api.BeforeEach;
@@ -145,9 +146,7 @@ class PipelineDataSourceSinkTest {
}
private DataRecord getUpdatePrimaryKeyDataRecord() {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
- result.setTableName(TABLE_NAME);
- result.setType("UPDATE");
+ DataRecord result = new DataRecord(IngestDataChangeType.UPDATE, TABLE_NAME, new PlaceholderPosition(), 3);
result.addColumn(new Column("id", 1, 2, true, true));
result.addColumn(new Column("user", 0, 10, true, false));
result.addColumn(new Column("status", null, "UPDATE", true, false));
@@ -162,9 +161,6 @@ class PipelineDataSourceSinkTest {
}
private DataRecord getDataRecord(final String recordType) {
- DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
- result.setTableName(TABLE_NAME);
- result.setType(recordType);
Integer idOldValue = null;
Integer userOldValue = null;
Integer idValue = null;
@@ -188,6 +184,7 @@ class PipelineDataSourceSinkTest {
userOldValue = 10;
statusOldValue = recordType;
}
+ DataRecord result = new DataRecord(recordType, TABLE_NAME, new PlaceholderPosition(), 3);
result.addColumn(new Column("id", idOldValue, idValue, false, true));
result.addColumn(new Column("user", userOldValue, userValue, true, false));
result.addColumn(new Column("status", statusOldValue, statusValue, true, false));