You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/08 09:03:53 UTC

[shardingsphere] branch master updated: Refactor DataRecord type and tableName fields as final (#26168)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 140c48120bc Refactor DataRecord type and tableName fields as final (#26168)
140c48120bc is described below

commit 140c48120bc6aff5627e87188a4b0fdc7ca8121a
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Thu Jun 8 17:03:44 2023 +0800

    Refactor DataRecord type and tableName fields as final (#26168)
---
 .../data/pipeline/api/ingest/record/DataRecord.java      | 12 +++++++-----
 .../cdc/util/DataRecordResultConvertUtilsTest.java       |  4 +---
 .../pipeline/core/ingest/dumper/InventoryDumper.java     |  4 +---
 .../pipeline/core/importer/DataRecordMergerTest.java     | 12 +++---------
 .../memory/MultiplexMemoryPipelineChannelTest.java       |  3 ++-
 .../data/pipeline/core/ingest/record/DataRecordTest.java | 13 +++++--------
 .../data/pipeline/core/record/RecordUtilsTest.java       |  4 ++--
 .../pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java |  7 +++----
 .../pipeline/mysql/ingest/MySQLIncrementalDumper.java    | 16 +++++++---------
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java    |  4 ++--
 .../sqlbuilder/OpenGaussPipelineSQLBuilderTest.java      |  4 ++--
 .../postgresql/ingest/wal/WALEventConverter.java         | 15 ++++++---------
 .../sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java     |  4 ++--
 .../core/importer/PipelineDataSourceSinkTest.java        |  9 +++------
 14 files changed, 46 insertions(+), 65 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index 6b60eef04e7..b0f0fa5622b 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -37,20 +37,22 @@ import java.util.List;
 @ToString
 public final class DataRecord extends Record {
     
+    private final String type;
+    
+    private final String tableName;
+    
     private final List<Column> columns;
     
     private final List<Object> uniqueKeyValue = new LinkedList<>();
     
     private final List<Object> oldUniqueKeyValues = new ArrayList<>();
     
-    private String type;
-    
-    private String tableName;
-    
     private Long csn;
     
-    public DataRecord(final IngestPosition position, final int columnCount) {
+    public DataRecord(final String type, final String tableName, final IngestPosition position, final int columnCount) {
         super(position);
+        this.type = type;
+        this.tableName = tableName;
         columns = new ArrayList<>(columnCount);
     }
     
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 1da75e1d7e6..7280fdddac6 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -54,7 +54,7 @@ class DataRecordResultConvertUtilsTest {
     
     @Test
     void assertConvertDataRecordToRecord() throws InvalidProtocolBufferException, SQLException {
-        DataRecord dataRecord = new DataRecord(new IntegerPrimaryKeyPosition(0, 1), 2);
+        DataRecord dataRecord = new DataRecord("INSERT", "t_order", new IntegerPrimaryKeyPosition(0, 1), 2);
         dataRecord.addColumn(new Column("order_id", BigInteger.ONE, false, true));
         dataRecord.addColumn(new Column("price", BigDecimal.valueOf(123), false, false));
         dataRecord.addColumn(new Column("user_id", Long.MAX_VALUE, false, false));
@@ -73,8 +73,6 @@ class DataRecordResultConvertUtilsTest {
         when(mockedClob.getSubString(anyLong(), anyInt())).thenReturn("clob\n");
         dataRecord.addColumn(new Column("text_clob", mockedClob, false, false));
         dataRecord.addColumn(new Column("update_time", new Timestamp(System.currentTimeMillis()), false, false));
-        dataRecord.setTableName("t_order");
-        dataRecord.setType("INSERT");
         TypeRegistry registry = TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes())
                 .add(WrappersProto.getDescriptor().getMessageTypes()).build();
         Record expectedRecord = DataRecordResultConvertUtils.convertDataRecordToRecord("test", null, dataRecord);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 576b437728f..59fa207e192 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -198,9 +198,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
     
     private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
         int columnCount = resultSetMetaData.getColumnCount();
-        DataRecord result = new DataRecord(newPosition(resultSet), columnCount);
-        result.setType(IngestDataChangeType.INSERT);
-        result.setTableName(dumperConfig.getLogicTableName());
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, dumperConfig.getLogicTableName(), newPosition(resultSet), columnCount);
         List<String> insertColumnNames = Optional.ofNullable(dumperConfig.getInsertColumnNames()).orElse(Collections.emptyList());
         ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || insertColumnNames.size() == resultSetMetaData.getColumnCount(),
                 () -> new PipelineInvalidParameterException("Insert colum names count not equals ResultSet column count"));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
index 93f6cc62a77..75bb3b1657e 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
@@ -165,9 +165,7 @@ class DataRecordMergerTest {
     }
     
     private DataRecord mockInsertDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
-        result.setType(IngestDataChangeType.INSERT);
-        result.setTableName(tableName);
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 3);
         result.addColumn(new Column("id", id, true, true));
         result.addColumn(new Column("user_id", userId, true, false));
         result.addColumn(new Column("total_price", totalPrice, true, false));
@@ -187,9 +185,7 @@ class DataRecordMergerTest {
     }
     
     private DataRecord mockUpdateDataRecord(final String tableName, final Integer oldId, final int id, final int userId, final int totalPrice) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
-        result.setType(IngestDataChangeType.UPDATE);
-        result.setTableName(tableName);
+        DataRecord result = new DataRecord(IngestDataChangeType.UPDATE, tableName, new PlaceholderPosition(), 3);
         result.addColumn(new Column("id", oldId, id, null != oldId, true));
         result.addColumn(new Column("user_id", userId, true, false));
         result.addColumn(new Column("total_price", totalPrice, true, false));
@@ -201,9 +197,7 @@ class DataRecordMergerTest {
     }
     
     private DataRecord mockDeleteDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
-        result.setType(IngestDataChangeType.DELETE);
-        result.setTableName(tableName);
+        DataRecord result = new DataRecord(IngestDataChangeType.DELETE, tableName, new PlaceholderPosition(), 3);
         result.addColumn(new Column("id", id, null, true, true));
         result.addColumn(new Column("user_id", userId, null, true, false));
         result.addColumn(new Column("total_price", totalPrice, null, true, false));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
index 144ef6d9808..35cefe84c8a 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannelTest.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.junit.jupiter.api.Test;
 
 import java.security.SecureRandom;
@@ -97,7 +98,7 @@ class MultiplexMemoryPipelineChannelTest {
     private Record[] mockRecords() {
         Record[] result = new Record[100];
         for (int i = 1; i <= result.length; i++) {
-            result[i - 1] = random.nextBoolean() ? new DataRecord(new IntPosition(i), 0) : new PlaceholderRecord(new IntPosition(i));
+            result[i - 1] = random.nextBoolean() ? new DataRecord(IngestDataChangeType.INSERT, "t1", new IntPosition(i), 0) : new PlaceholderRecord(new IntPosition(i));
         }
         return result;
     }
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
index 2d4ca0075d3..96345e69f47 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecordTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.record;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.junit.jupiter.api.Test;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -33,12 +34,10 @@ class DataRecordTest {
     
     @Test
     void assertKeyEqual() {
-        beforeDataRecord = new DataRecord(new PlaceholderPosition(), 2);
-        beforeDataRecord.setTableName("t1");
+        beforeDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", new PlaceholderPosition(), 2);
         beforeDataRecord.addColumn(new Column("id", 1, true, true));
         beforeDataRecord.addColumn(new Column("name", "1", true, false));
-        afterDataRecord = new DataRecord(new PlaceholderPosition(), 2);
-        afterDataRecord.setTableName("t1");
+        afterDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", new PlaceholderPosition(), 2);
         afterDataRecord.addColumn(new Column("id", 1, true, true));
         afterDataRecord.addColumn(new Column("name", "2", true, false));
         assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
@@ -46,12 +45,10 @@ class DataRecordTest {
     
     @Test
     void assertOldKeyEqual() {
-        beforeDataRecord = new DataRecord(new PlaceholderPosition(), 2);
-        beforeDataRecord.setTableName("t1");
+        beforeDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", new PlaceholderPosition(), 2);
         beforeDataRecord.addColumn(new Column("id", 1, true, true));
         beforeDataRecord.addColumn(new Column("name", "1", true, false));
-        afterDataRecord = new DataRecord(new PlaceholderPosition(), 2);
-        afterDataRecord.setTableName("t1");
+        afterDataRecord = new DataRecord(IngestDataChangeType.UPDATE, "t1", new PlaceholderPosition(), 2);
         afterDataRecord.addColumn(new Column("id", 1, 2, true, true));
         afterDataRecord.addColumn(new Column("name", "2", true, false));
         assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getOldKey()));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
index 0f5c00a80e9..ef9cdd7f2d9 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.record;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -57,8 +58,7 @@ class RecordUtilsTest {
     }
     
     private DataRecord mockDataRecord(final String tableName) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
-        result.setTableName(tableName);
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
         result.addColumn(new Column("id", "", false, true));
         result.addColumn(new Column("sc", "", false, true));
         result.addColumn(new Column("c1", "", true, false));
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
index 7481a806933..af3ff93b167 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.junit.jupiter.api.Test;
@@ -112,8 +113,7 @@ class PipelineSQLBuilderTest {
     }
     
     private DataRecord mockDataRecord(final String tableName) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
-        result.setTableName(tableName);
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
         result.addColumn(new Column("id", "", false, true));
         result.addColumn(new Column("sc", "", false, false));
         result.addColumn(new Column("c1", "", true, false));
@@ -137,8 +137,7 @@ class PipelineSQLBuilderTest {
     }
     
     private DataRecord mockDataRecordWithoutUniqueKey(final String tableName) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
-        result.setTableName(tableName);
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
         result.addColumn(new Column("id", "", false, false));
         result.addColumn(new Column("name", "", true, false));
         return result;
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 9a5ba016e04..93014201fa7 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -161,8 +161,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
         Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
         List<DataRecord> result = new LinkedList<>();
         for (Serializable[] each : event.getAfterRows()) {
-            DataRecord dataRecord = createDataRecord(event, each.length);
-            dataRecord.setType(IngestDataChangeType.INSERT);
+            DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length);
             for (int i = 0; i < each.length; i++) {
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
                 if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
@@ -185,8 +184,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
         for (int i = 0; i < event.getBeforeRows().size(); i++) {
             Serializable[] beforeValues = event.getBeforeRows().get(i);
             Serializable[] afterValues = event.getAfterRows().get(i);
-            DataRecord dataRecord = createDataRecord(event, beforeValues.length);
-            dataRecord.setType(IngestDataChangeType.UPDATE);
+            DataRecord dataRecord = createDataRecord(IngestDataChangeType.UPDATE, event, beforeValues.length);
             for (int j = 0; j < beforeValues.length; j++) {
                 Serializable oldValue = beforeValues[j];
                 Serializable newValue = afterValues[j];
@@ -208,8 +206,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
         Set<ColumnName> columnNameSet = dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
         List<DataRecord> result = new LinkedList<>();
         for (Serializable[] each : event.getBeforeRows()) {
-            DataRecord dataRecord = createDataRecord(event, each.length);
-            dataRecord.setType(IngestDataChangeType.DELETE);
+            DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length);
             for (int i = 0, length = each.length; i < length; i++) {
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
                 if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
@@ -233,9 +230,10 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
         return dataTypeHandler.isPresent() ? dataTypeHandler.get().handle(value) : value;
     }
     
-    private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
-        DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
-        result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal());
+    private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) {
+        String tableName = dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal();
+        IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId());
+        DataRecord result = new DataRecord(type, tableName, position, columnCount);
         result.setCommitTime(rowsEvent.getTimestamp() * 1000);
         return result;
     }
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 9acb3db3303..818fdc41e9a 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.junit.jupiter.api.Test;
 
 import java.util.Optional;
@@ -52,8 +53,7 @@ class MySQLPipelineSQLBuilderTest {
     }
     
     private DataRecord mockDataRecord(final String tableName) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
-        result.setTableName(tableName);
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
         result.addColumn(new Column("id", "", false, true));
         result.addColumn(new Column("sc", "", false, false));
         result.addColumn(new Column("c1", "", true, false));
diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index a134cf6d28f..4bdfccc27bf 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.junit.jupiter.api.Test;
 
 import java.util.Optional;
@@ -39,8 +40,7 @@ class OpenGaussPipelineSQLBuilderTest {
     }
     
     private DataRecord mockDataRecord(final String tableName) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 4);
-        result.setTableName(tableName);
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, tableName, new PlaceholderPosition(), 4);
         result.addColumn(new Column("id", "", false, true));
         result.addColumn(new Column("c0", "", false, false));
         result.addColumn(new Column("c1", "", true, false));
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 00cb9e6da3c..1968de02f07 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -96,15 +96,13 @@ public final class WALEventConverter {
     }
     
     private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) {
-        DataRecord result = createDataRecord(writeRowEvent, writeRowEvent.getAfterRow().size());
-        result.setType(IngestDataChangeType.INSERT);
+        DataRecord result = createDataRecord(IngestDataChangeType.INSERT, writeRowEvent, writeRowEvent.getAfterRow().size());
         putColumnsIntoDataRecord(result, tableMetaData, writeRowEvent.getTableName(), writeRowEvent.getAfterRow());
         return result;
     }
     
     private DataRecord handleUpdateRowEvent(final UpdateRowEvent updateRowEvent, final PipelineTableMetaData tableMetaData) {
-        DataRecord result = createDataRecord(updateRowEvent, updateRowEvent.getAfterRow().size());
-        result.setType(IngestDataChangeType.UPDATE);
+        DataRecord result = createDataRecord(IngestDataChangeType.UPDATE, updateRowEvent, updateRowEvent.getAfterRow().size());
         String actualTableName = updateRowEvent.getTableName();
         putColumnsIntoDataRecord(result, tableMetaData, actualTableName, updateRowEvent.getAfterRow());
         return result;
@@ -112,8 +110,7 @@ public final class WALEventConverter {
     
     private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final PipelineTableMetaData tableMetaData) {
         // TODO completion columns
-        DataRecord result = createDataRecord(event, event.getPrimaryKeys().size());
-        result.setType(IngestDataChangeType.DELETE);
+        DataRecord result = createDataRecord(IngestDataChangeType.DELETE, event, event.getPrimaryKeys().size());
         // TODO Unique key may be a column within unique index
         List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
         for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
@@ -122,9 +119,9 @@ public final class WALEventConverter {
         return result;
     }
     
-    private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
-        DataRecord result = new DataRecord(new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
-        result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal());
+    private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) {
+        String tableName = dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal();
+        DataRecord result = new DataRecord(type, tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
         result.setCsn(rowsEvent.getCsn());
         return result;
     }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index 1ea59d6693e..21f19f05742 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.junit.jupiter.api.Test;
@@ -42,8 +43,7 @@ class PostgreSQLPipelineSQLBuilderTest {
     }
     
     private DataRecord mockDataRecord() {
-        DataRecord result = new DataRecord(new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
-        result.setTableName("t_order");
+        DataRecord result = new DataRecord(IngestDataChangeType.INSERT, "t_order", new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))), 2);
         result.addColumn(new Column("order_id", 1, true, true));
         result.addColumn(new Column("user_id", 2, true, false));
         result.addColumn(new Column("status", "ok", true, false));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 07ad96e23ea..67ef9ec0918 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
+import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.junit.jupiter.api.BeforeEach;
@@ -145,9 +146,7 @@ class PipelineDataSourceSinkTest {
     }
     
     private DataRecord getUpdatePrimaryKeyDataRecord() {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
-        result.setTableName(TABLE_NAME);
-        result.setType("UPDATE");
+        DataRecord result = new DataRecord(IngestDataChangeType.UPDATE, TABLE_NAME, new PlaceholderPosition(), 3);
         result.addColumn(new Column("id", 1, 2, true, true));
         result.addColumn(new Column("user", 0, 10, true, false));
         result.addColumn(new Column("status", null, "UPDATE", true, false));
@@ -162,9 +161,6 @@ class PipelineDataSourceSinkTest {
     }
     
     private DataRecord getDataRecord(final String recordType) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
-        result.setTableName(TABLE_NAME);
-        result.setType(recordType);
         Integer idOldValue = null;
         Integer userOldValue = null;
         Integer idValue = null;
@@ -188,6 +184,7 @@ class PipelineDataSourceSinkTest {
             userOldValue = 10;
             statusOldValue = recordType;
         }
+        DataRecord result = new DataRecord(recordType, TABLE_NAME, new PlaceholderPosition(), 3);
         result.addColumn(new Column("id", idOldValue, idValue, false, true));
         result.addColumn(new Column("user", userOldValue, userValue, true, false));
         result.addColumn(new Column("status", statusOldValue, statusValue, true, false));