You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/06 08:47:18 UTC

[shardingsphere] branch master updated: Adding xid and csn parse for openGauss's logical replication (#22685)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a75e635bf0b Adding xid and csn parse for openGauss's logical replication (#22685)
a75e635bf0b is described below

commit a75e635bf0b723db42494814103833b31a3e4454
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Dec 6 16:47:09 2022 +0800

    Adding xid and csn parse for openGauss's logical replication (#22685)
    
    * Adding xid and csn parse for openGauss's logical replication
    
    * Improve performance
    
    * Add chinese characters at pipeline e2e test
---
 .../pipeline/api/ingest/record/DataRecord.java     |  2 +
 .../OpenGaussIncrementalDumperCreator.java         |  2 +-
 .../opengauss/ingest/OpenGaussWALDumper.java       | 46 +++++++++++++++++++--
 .../ingest/wal/decode/MppdbDecodingPlugin.java     | 48 +++++++++++++++-------
 .../ingest/wal/decode/MppdbDecodingPluginTest.java | 35 +++++++++++++++-
 .../postgresql/ingest/wal/WALEventConverter.java   |  5 ++-
 .../ingest/wal/event/AbstractRowEvent.java         |  2 +
 .../{AbstractRowEvent.java => BeginTXEvent.java}   | 14 +++----
 .../{AbstractRowEvent.java => CommitTXEvent.java}  | 14 +++----
 .../ingest/wal/WALEventConverterTest.java          | 25 +++++++++++
 .../pipeline/cases/task/MySQLIncrementTask.java    |  5 +--
 .../cases/task/PostgreSQLIncrementTask.java        |  2 +-
 12 files changed, 158 insertions(+), 42 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index 126b8ee17e9..c9cb71f7f6a 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -47,6 +47,8 @@ public final class DataRecord extends Record {
     
     private String tableName;
     
+    private Long csn;
+    
     public DataRecord(final IngestPosition<?> position, final int columnCount) {
         super(position);
         columns = new ArrayList<>(columnCount);
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index 0381f69b1a3..dec16de00cc 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -34,7 +34,7 @@ public final class OpenGaussIncrementalDumperCreator implements IncrementalDumpe
     @Override
     public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
                                                      final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader);
+        return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader, false);
     }
     
     @Override
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index cd0c36f55af..4830429699d 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -33,7 +33,10 @@ import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenG
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.opengauss.jdbc.PgConnection;
@@ -41,6 +44,8 @@ import org.opengauss.replication.PGReplicationStream;
 
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * WAL dumper of openGauss.
@@ -57,8 +62,12 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
     
     private final OpenGaussLogicalReplication logicalReplication;
     
+    private final boolean decodeWithTX;
+    
+    private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
+    
     public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition<WALPosition> position,
-                              final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+                              final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader, final boolean decodeWithTX) {
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
@@ -66,6 +75,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
         this.channel = channel;
         walEventConverter = new WALEventConverter(dumperConfig, metaDataLoader);
         logicalReplication = new OpenGaussLogicalReplication();
+        this.decodeWithTX = decodeWithTX;
     }
     
     @Override
@@ -73,7 +83,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
         PGReplicationStream stream = null;
         try (PgConnection connection = getReplicationConnectionUnwrap()) {
             stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
-            DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()));
+            DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX);
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
@@ -81,7 +91,11 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
                     continue;
                 }
                 AbstractWALEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
-                channel.pushRecord(walEventConverter.convert(event));
+                if (decodeWithTX) {
+                    processEventWithTX(event);
+                } else {
+                    processEventIgnoreTX(event);
+                }
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
@@ -99,6 +113,32 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen
         return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
     }
     
+    private void processEventWithTX(final AbstractWALEvent event) {
+        if (event instanceof AbstractRowEvent) {
+            rowEvents.add((AbstractRowEvent) event);
+            return;
+        }
+        if (event instanceof BeginTXEvent) {
+            rowEvents.clear();
+            return;
+        }
+        if (event instanceof CommitTXEvent) {
+            Long csn = ((CommitTXEvent) event).getCsn();
+            for (AbstractRowEvent each : rowEvents) {
+                each.setCsn(csn);
+                channel.pushRecord(walEventConverter.convert(each));
+            }
+        }
+        channel.pushRecord(walEventConverter.convert(event));
+    }
+    
+    private void processEventIgnoreTX(final AbstractWALEvent event) {
+        if (event instanceof BeginTXEvent) {
+            return;
+        }
+        channel.pushRecord(walEventConverter.convert(event));
+    }
+    
     @Override
     protected void doStop() {
     }
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 68d81a2817f..05755bf3657 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -30,6 +30,8 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.Deco
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -39,6 +41,7 @@ import org.opengauss.util.PGobject;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -59,30 +62,45 @@ public final class MppdbDecodingPlugin implements DecodingPlugin {
     
     private final BaseTimestampUtils timestampUtils;
     
+    private final boolean decodeWithTX;
+    
+    public MppdbDecodingPlugin(final BaseTimestampUtils timestampUtils) {
+        this.timestampUtils = timestampUtils;
+        decodeWithTX = false;
+    }
+    
     @Override
     public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) {
         AbstractWALEvent result;
-        char eventType = readOneChar(data);
-        result = '{' == eventType ? readTableEvent(readMppData(data)) : new PlaceholderEvent();
+        byte[] bytes = new byte[data.remaining()];
+        data.get(bytes);
+        String dataText = new String(bytes, StandardCharsets.UTF_8);
+        if (decodeWithTX) {
+            result = decodeDataWithTX(dataText);
+        } else {
+            result = decodeDataIgnoreTX(dataText);
+        }
         result.setLogSequenceNumber(logSequenceNumber);
         return result;
     }
     
-    private char readOneChar(final ByteBuffer data) {
-        return (char) data.get();
+    private AbstractWALEvent decodeDataWithTX(final String dataText) {
+        AbstractWALEvent result = new PlaceholderEvent();
+        if (dataText.startsWith("BEGIN")) {
+            int beginIndex = dataText.indexOf("BEGIN") + "BEGIN".length() + 1;
+            result = new BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)));
+        } else if (dataText.startsWith("COMMIT")) {
+            int commitBeginIndex = dataText.indexOf("COMMIT") + "COMMIT".length() + 1;
+            int csnBeginIndex = dataText.indexOf("CSN") + "CSN".length() + 1;
+            result = new CommitTXEvent(Long.parseLong(dataText.substring(commitBeginIndex, dataText.indexOf(" ", commitBeginIndex))), Long.parseLong(dataText.substring(csnBeginIndex)));
+        } else if (dataText.startsWith("{")) {
+            result = readTableEvent(dataText);
+        }
+        return result;
     }
     
-    private String readMppData(final ByteBuffer data) {
-        StringBuilder mppData = new StringBuilder();
-        mppData.append('{');
-        int depth = 1;
-        while (0 != depth && data.hasRemaining()) {
-            char next = (char) data.get();
-            mppData.append(next);
-            int optDepth = '{' == next ? 1 : ('}' == next ? -1 : 0);
-            depth += optDepth;
-        }
-        return mppData.toString();
+    private AbstractWALEvent decodeDataIgnoreTX(final String dataText) {
+        return dataText.startsWith("{") ? readTableEvent(dataText) : new PlaceholderEvent();
     }
     
     private AbstractRowEvent readTableEvent(final String mppData) {
diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
index c84035fbd02..d26c789655f 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
@@ -17,10 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingException;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -34,11 +38,15 @@ import java.nio.ByteBuffer;
 import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -224,6 +232,31 @@ public final class MppdbDecodingPluginTest {
         TimestampUtils timestampUtils = mock(TimestampUtils.class);
         when(timestampUtils.toTime(null, "1 2 3")).thenThrow(new SQLException(""));
         ByteBuffer data = ByteBuffer.wrap(toJSON(tableData).getBytes());
-        new MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils)).decode(data, logSequenceNumber);
+        new MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), true).decode(data, logSequenceNumber);
+    }
+    
+    @Test
+    public void assertDecodeWithXid() throws JsonProcessingException {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"raw"});
+        tableData.setColumnsVal(new String[]{"'7D'"});
+        List<String> dataList = Arrays.asList("BEGIN 1", OBJECT_MAPPER.writeValueAsString(tableData), OBJECT_MAPPER.writeValueAsString(tableData),
+                "COMMIT 1 (at 2022-10-27 04:19:39.476261+00) CSN 3468");
+        MppdbDecodingPlugin mppdbDecodingPlugin = new MppdbDecodingPlugin(null, true);
+        List<AbstractWALEvent> expectedEvent = new LinkedList<>();
+        for (String each : dataList) {
+            expectedEvent.add(mppdbDecodingPlugin.decode(ByteBuffer.wrap(each.getBytes()), logSequenceNumber));
+        }
+        assertThat(expectedEvent.size(), is(4));
+        AbstractWALEvent actualFirstEvent = expectedEvent.get(0);
+        assertTrue(actualFirstEvent instanceof BeginTXEvent);
+        assertThat(((BeginTXEvent) actualFirstEvent).getXid(), is(1L));
+        AbstractWALEvent actualLastEvent = expectedEvent.get(expectedEvent.size() - 1);
+        assertTrue(actualLastEvent instanceof CommitTXEvent);
+        assertThat(((CommitTXEvent) actualLastEvent).getCsn(), is(3468L));
+        assertThat(((CommitTXEvent) actualLastEvent).getXid(), is(1L));
     }
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 694de8ecf75..bbd59daa59a 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -28,6 +28,8 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableM
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -69,7 +71,7 @@ public final class WALEventConverter {
         if (event instanceof DeleteRowEvent) {
             return handleDeleteRowsEvent((DeleteRowEvent) event);
         }
-        if (event instanceof PlaceholderEvent) {
+        if (event instanceof PlaceholderEvent || event instanceof BeginTXEvent || event instanceof CommitTXEvent) {
             return createPlaceholderRecord(event);
         }
         throw new UnsupportedSQLOperationException("");
@@ -124,6 +126,7 @@ public final class WALEventConverter {
     private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
         DataRecord result = new DataRecord(new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
         result.setTableName(dumperConfig.getLogicTableName(rowsEvent.getTableName()).getLowercase());
+        result.setCsn(rowsEvent.getCsn());
         return result;
     }
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
index a88bd48de11..6cba665d907 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
@@ -32,4 +32,6 @@ public abstract class AbstractRowEvent extends AbstractWALEvent {
     private String databaseName;
     
     private String tableName;
+    
+    private Long csn;
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
similarity index 79%
copy from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
copy to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
index a88bd48de11..b608e04e5c5 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java
@@ -18,18 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event;
 
 import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.RequiredArgsConstructor;
 
 /**
- * Abstract row event.
+ * Begin TX event.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-@ToString(callSuper = true)
-public abstract class AbstractRowEvent extends AbstractWALEvent {
+public final class BeginTXEvent extends AbstractWALEvent {
     
-    private String databaseName;
-    
-    private String tableName;
+    private final long xid;
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/CommitTXEvent.java
similarity index 79%
copy from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
copy to kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/CommitTXEvent.java
index a88bd48de11..db40ca05306 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/CommitTXEvent.java
@@ -18,18 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event;
 
 import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
+import lombok.RequiredArgsConstructor;
 
 /**
- * Abstract row event.
+ * Commit TX event.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-@ToString(callSuper = true)
-public abstract class AbstractRowEvent extends AbstractWALEvent {
+public final class CommitTXEvent extends AbstractWALEvent {
     
-    private String databaseName;
+    private final long xid;
     
-    private String tableName;
+    private final Long csn;
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index f4bdcda550d..a516a76b312 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -30,7 +30,10 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
+import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.CommitTXEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
@@ -39,6 +42,7 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.postgresql.replication.LogSequenceNumber;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -50,6 +54,7 @@ import java.util.Collections;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public final class WALEventConverterTest {
     
@@ -57,6 +62,8 @@ public final class WALEventConverterTest {
     
     private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
     
+    private final LogSequenceNumber logSequenceNumber = LogSequenceNumber.valueOf("0/14EFDB8");
+    
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
@@ -89,6 +96,24 @@ public final class WALEventConverterTest {
         }
     }
     
+    @Test
+    public void assertConvertBeginTXEvent() {
+        BeginTXEvent beginTXEvent = new BeginTXEvent(100);
+        beginTXEvent.setLogSequenceNumber(new PostgreSQLLogSequenceNumber(logSequenceNumber));
+        Record record = walEventConverter.convert(beginTXEvent);
+        assertTrue(record instanceof PlaceholderRecord);
+        assertThat(((WALPosition) record.getPosition()).getLogSequenceNumber().asLong(), is(21953976L));
+    }
+    
+    @Test
+    public void assertConvertCommitTXEvent() {
+        CommitTXEvent commitTXEvent = new CommitTXEvent(1, 3468L);
+        commitTXEvent.setLogSequenceNumber(new PostgreSQLLogSequenceNumber(logSequenceNumber));
+        Record record = walEventConverter.convert(commitTXEvent);
+        assertTrue(record instanceof PlaceholderRecord);
+        assertThat(((WALPosition) record.getPosition()).getLogSequenceNumber().asLong(), is(21953976L));
+    }
+    
     @Test
     public void assertConvertWriteRowEvent() {
         Record record = walEventConverter.convert(mockWriteRowEvent());
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
index 8c8ae23290c..28cf8be0d21 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -19,10 +19,9 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 import java.time.Instant;
@@ -61,7 +60,7 @@ public final class MySQLIncrementTask extends BaseIncrementTask {
     private Object insertOrder() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         Object[] orderInsertDate = new Object[]{primaryKeyGenerateAlgorithm.generateKey(), random.nextInt(0, 6),
-                random.nextInt(1, 99), RandomStringUtils.randomAlphabetic(10)};
+                random.nextInt(1, 99), "中文测试"};
         jdbcTemplate.update(String.format("INSERT INTO %s (order_id,user_id,t_unsigned_int,status) VALUES (?, ?, ?, ?)", orderTableName), orderInsertDate);
         return orderInsertDate[0];
     }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index 4b274e1d9a9..b18ba3a5583 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -70,7 +70,7 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
     
     private Object insertOrder() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        String status = 0 == random.nextInt() % 2 ? null : "NOT-NULL";
+        String status = 0 == random.nextInt() % 2 ? null : "中文测试";
         Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), status};
         String insertSQL = String.format("INSERT INTO %s (order_id,user_id,status) VALUES (?, ?, ?)", getTableNameWithSchema(orderTableName));
         log.info("insert order sql:{}", insertSQL);