You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/06/16 05:40:20 UTC

[shardingsphere] branch master updated: Improve scaling increment task mysql binlog decoding (#18375)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b8224c8f2 Improve scaling increment task mysql binlog decoding (#18375)
f0b8224c8f2 is described below

commit f0b8224c8f2c1d50e1a99265a7c89e59eb6c46b0
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Thu Jun 16 13:40:12 2022 +0800

    Improve scaling increment task mysql binlog decoding (#18375)
    
    * Improve MySQLBinlogEventPacketDecoder decode method, use fix length read binlog
    
    * Improve ignore checksums, fix unit test
    
    * Fix codestyle
---
 .../binlog/AbstractMySQLBinlogEventPacket.java     |  11 ++
 .../packet/binlog/MySQLBinlogEventHeader.java      |   8 +-
 .../management/MySQLBinlogRotateEventPacket.java   |   2 +-
 .../binlog/row/MySQLBinlogRowsEventPacket.java     |   6 +-
 .../binlog/row/MySQLBinlogTableMapEventPacket.java |   5 +-
 .../packet/binlog/MySQLBinlogEventHeaderTest.java  |   4 +-
 .../MySQLBinlogRotateEventPacketTest.java          |   5 +-
 .../binlog/row/MySQLBinlogRowsEventPacketTest.java | 105 +++----------------
 .../netty/MySQLBinlogEventPacketDecoder.java       |  56 ++++++----
 .../netty/MySQLBinlogEventPacketDecoderTest.java   | 115 +++++++++++++++------
 10 files changed, 159 insertions(+), 158 deletions(-)

diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/AbstractMySQLBinlogEventPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/AbstractMySQLBinlogEventPacket.java
index 363e18a3f0d..e87533c9c12 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/AbstractMySQLBinlogEventPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/AbstractMySQLBinlogEventPacket.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.db.protocol.mysql.packet.binlog;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
@@ -28,6 +29,7 @@ import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
  */
 @RequiredArgsConstructor(access = AccessLevel.PROTECTED)
 @Getter
+@Slf4j
 public abstract class AbstractMySQLBinlogEventPacket implements MySQLPacket, MySQLBinlogEventPacket {
     
     private final MySQLBinlogEventHeader binlogEventHeader;
@@ -45,6 +47,15 @@ public abstract class AbstractMySQLBinlogEventPacket implements MySQLPacket, MyS
      */
     protected abstract void writeEvent(MySQLPacketPayload payload);
     
+    protected int getRemainBytesLength(final MySQLPacketPayload payload) {
+        // minus checksum bytes, add seq id 1 byte, statusCode 1 byte(not include at event size)
+        int alreadyReadIndex = binlogEventHeader.getEventSize() + 2 - binlogEventHeader.getChecksumLength();
+        if (payload.getByteBuf().readerIndex() > alreadyReadIndex) {
+            return -1;
+        }
+        return alreadyReadIndex - payload.getByteBuf().readerIndex();
+    }
+    
     @Override
     public final int getSequenceId() {
         return 0;
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeader.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeader.java
index 1221fb9de9d..2501070c9bc 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeader.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeader.java
@@ -44,19 +44,25 @@ public final class MySQLBinlogEventHeader implements MySQLPacket {
     
     private final int serverId;
     
+    /**
+     * Size of the event (header, post-header, body).
+     */
     private final int eventSize;
     
     private final int logPos;
     
     private final int flags;
     
-    public MySQLBinlogEventHeader(final MySQLPacketPayload payload) {
+    private final int checksumLength;
+    
+    public MySQLBinlogEventHeader(final MySQLPacketPayload payload, final int checksumLength) {
         timestamp = payload.readInt4();
         eventType = payload.readInt1();
         serverId = payload.readInt4();
         eventSize = payload.readInt4();
         logPos = payload.readInt4();
         flags = payload.readInt2();
+        this.checksumLength = checksumLength;
     }
     
     @Override
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacket.java
index 0ce642bb105..6de434c5b67 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacket.java
@@ -43,7 +43,7 @@ public final class MySQLBinlogRotateEventPacket extends AbstractMySQLBinlogEvent
     public MySQLBinlogRotateEventPacket(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
         super(binlogEventHeader);
         position = payload.readInt8();
-        nextBinlogName = payload.readStringEOF();
+        nextBinlogName = payload.readStringFix(getRemainBytesLength(payload));
     }
     
     @Override
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacket.java
index e18bb9ba526..e548a7907bb 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacket.java
@@ -90,7 +90,7 @@ public final class MySQLBinlogRowsEventPacket extends AbstractMySQLBinlogEventPa
      */
     public void readRows(final MySQLBinlogTableMapEventPacket tableMapEventPacket, final MySQLPacketPayload payload) {
         List<MySQLBinlogColumnDef> columnDefs = tableMapEventPacket.getColumnDefs();
-        while (hasNextRow(payload)) {
+        while (getRemainBytesLength(payload) > 0) {
             rows.add(readRow(columnDefs, payload));
             if (isUpdateRowsEvent(getBinlogEventHeader().getEventType())) {
                 rows2.add(readRow(columnDefs, payload));
@@ -98,10 +98,6 @@ public final class MySQLBinlogRowsEventPacket extends AbstractMySQLBinlogEventPa
         }
     }
     
-    private boolean hasNextRow(final MySQLPacketPayload payload) {
-        return payload.getByteBuf().isReadable();
-    }
-    
     private Serializable[] readRow(final List<MySQLBinlogColumnDef> columnDefs, final MySQLPacketPayload payload) {
         MySQLNullBitmap nullBitmap = new MySQLNullBitmap(columnNumber, payload);
         Serializable[] result = new Serializable[columnNumber];
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogTableMapEventPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogTableMapEventPacket.java
index 88b1746f6b9..c367560f850 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogTableMapEventPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogTableMapEventPacket.java
@@ -66,7 +66,10 @@ public final class MySQLBinlogTableMapEventPacket extends AbstractMySQLBinlogEve
         // mysql 8 binlog table map event include column type def
         // for support lower than 8, read column type def by sql query
         // so skip readable bytes here
-        payload.getByteBuf().skipBytes(payload.getByteBuf().readableBytes());
+        int remainBytesLength = getRemainBytesLength(payload);
+        if (remainBytesLength > 0) {
+            payload.skipReserved(remainBytesLength);
+        }
     }
     
     private void readColumnDefs(final MySQLPacketPayload payload) {
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeaderTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeaderTest.java
index 242fc34d30c..66bf26912e5 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeaderTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeaderTest.java
@@ -41,7 +41,7 @@ public final class MySQLBinlogEventHeaderTest {
         when(payload.readInt4()).thenReturn(1234567890, 123456, 19, 4);
         when(payload.readInt1()).thenReturn(MySQLBinlogEventType.UNKNOWN_EVENT.getValue());
         when(payload.readInt2()).thenReturn(MySQLBinlogEventFlag.LOG_EVENT_BINLOG_IN_USE_F.getValue());
-        MySQLBinlogEventHeader actual = new MySQLBinlogEventHeader(payload);
+        MySQLBinlogEventHeader actual = new MySQLBinlogEventHeader(payload, 4);
         assertThat(actual.getSequenceId(), is(0));
         assertThat(actual.getTimestamp(), is(1234567890));
         assertThat(actual.getEventType(), is(MySQLBinlogEventType.UNKNOWN_EVENT.getValue()));
@@ -53,7 +53,7 @@ public final class MySQLBinlogEventHeaderTest {
     
     @Test
     public void assertWrite() {
-        new MySQLBinlogEventHeader(1234567890, MySQLBinlogEventType.UNKNOWN_EVENT.getValue(), 123456, 19, 4, MySQLBinlogEventFlag.LOG_EVENT_BINLOG_IN_USE_F.getValue()).write(payload);
+        new MySQLBinlogEventHeader(1234567890, MySQLBinlogEventType.UNKNOWN_EVENT.getValue(), 123456, 19, 4, MySQLBinlogEventFlag.LOG_EVENT_BINLOG_IN_USE_F.getValue(), 4).write(payload);
         verify(payload).writeInt4(1234567890);
         verify(payload).writeInt1(MySQLBinlogEventType.UNKNOWN_EVENT.getValue());
         verify(payload).writeInt4(123456);
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacketTest.java
index 4fc95a2b606..6073b538cd7 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacketTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.db.protocol.mysql.packet.binlog.management;
 
+import io.netty.buffer.Unpooled;
 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 import org.junit.Test;
@@ -26,6 +27,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -41,7 +43,8 @@ public final class MySQLBinlogRotateEventPacketTest {
     @Test
     public void assertNew() {
         when(payload.readInt8()).thenReturn(4L);
-        when(payload.readStringEOF()).thenReturn("binlog-000001");
+        when(payload.readStringFix(anyInt())).thenReturn("binlog-000001");
+        when(payload.getByteBuf()).thenReturn(Unpooled.buffer());
         MySQLBinlogRotateEventPacket actual = new MySQLBinlogRotateEventPacket(binlogEventHeader, payload);
         assertThat(actual.getSequenceId(), is(0));
         assertThat(actual.getBinlogEventHeader(), is(binlogEventHeader));
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacketTest.java
index c897dfd30fb..059fd8b4a03 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacketTest.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
@@ -29,6 +28,10 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -36,7 +39,6 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -53,20 +55,14 @@ public final class MySQLBinlogRowsEventPacketTest {
     @Mock
     private MySQLBinlogEventHeader binlogEventHeader;
     
-    @Mock
-    private MySQLBinlogTableMapEventPacket tableMapEventPacket;
-    
     private List<MySQLBinlogColumnDef> columnDefs;
     
     @Before
     public void setUp() {
         mockColumnDefs();
-        when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         when(payload.readInt6()).thenReturn(1L);
         when(payload.readInt2()).thenReturn(2);
         when(payload.readIntLenenc()).thenReturn(1L);
-        when(payload.getByteBuf()).thenReturn(byteBuf);
-        when(byteBuf.isReadable()).thenReturn(true, false);
     }
     
     private void mockColumnDefs() {
@@ -75,46 +71,14 @@ public final class MySQLBinlogRowsEventPacketTest {
     }
     
     @Test
-    public void assertReadWriteRowV1WithoutNullValue() {
-        when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv1.getValue());
-        when(payload.readInt8()).thenReturn(Long.MAX_VALUE);
+    public void assertReadWriteRowV1WithoutNullValue() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
         MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        actual.readRows(tableMapEventPacket, payload);
         assertBinlogRowsEventV1BeforeRows(actual);
         assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
         assertNull(actual.getColumnsPresentBitmap2());
-        assertThat(actual.getRows().size(), is(1));
-        assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
-        assertTrue(actual.getRows2().isEmpty());
-    }
-    
-    @Test
-    public void assertReadWriteRowV1WithNullValue() {
-        when(payload.readInt1()).thenReturn(0x01);
-        when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv1.getValue());
-        MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        actual.readRows(tableMapEventPacket, payload);
-        assertBinlogRowsEventV1BeforeRows(actual);
-        assertTrue(actual.getColumnsPresentBitmap().isNullParameter(0));
-        assertNull(actual.getColumnsPresentBitmap2());
-        assertThat(actual.getRows().size(), is(1));
-        assertNull(actual.getRows().get(0)[0]);
-        assertTrue(actual.getRows2().isEmpty());
-    }
-    
-    @Test
-    public void assertReadUpdateRowV1WithoutNullValue() {
-        when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.UPDATE_ROWS_EVENTv1.getValue());
-        when(payload.readInt8()).thenReturn(Long.MAX_VALUE, Long.MIN_VALUE);
-        MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        actual.readRows(tableMapEventPacket, payload);
-        assertBinlogRowsEventV1BeforeRows(actual);
-        assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
-        assertFalse(actual.getColumnsPresentBitmap2().isNullParameter(0));
-        assertThat(actual.getRows().size(), is(1));
-        assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
-        assertThat(actual.getRows2().size(), is(1));
-        assertThat(actual.getRows2().get(0)[0], is(Long.MIN_VALUE));
+        MySQLPacketPayload packetPayload = new MySQLPacketPayload(byteBuf, StandardCharsets.UTF_8);
+        Serializable[] result = (Serializable[]) invokeMethod(actual, "readRow", new Class[]{List.class, MySQLPacketPayload.class}, new Object[]{columnDefs, packetPayload});
+        assertThat(result[0], is(0L));
     }
     
     private void assertBinlogRowsEventV1BeforeRows(final MySQLBinlogRowsEventPacket actual) {
@@ -124,53 +88,10 @@ public final class MySQLBinlogRowsEventPacketTest {
         assertThat(actual.getColumnNumber(), is(1));
     }
     
-    @Test
-    public void assertReadWriteRowV2WithoutNullValue() {
-        when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue());
-        when(payload.readInt8()).thenReturn(Long.MAX_VALUE);
-        MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        actual.readRows(tableMapEventPacket, payload);
-        assertBinlogRowsEventV2BeforeRows(actual);
-        assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
-        assertNull(actual.getColumnsPresentBitmap2());
-        assertThat(actual.getRows().size(), is(1));
-        assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
-        assertTrue(actual.getRows2().isEmpty());
-    }
-    
-    @Test
-    public void assertReadWriteRowV2WithNullValue() {
-        when(payload.readInt1()).thenReturn(0x01);
-        when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue());
-        MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        actual.readRows(tableMapEventPacket, payload);
-        assertBinlogRowsEventV2BeforeRows(actual);
-        assertTrue(actual.getColumnsPresentBitmap().isNullParameter(0));
-        assertNull(actual.getColumnsPresentBitmap2());
-        assertThat(actual.getRows().size(), is(1));
-        assertNull(actual.getRows().get(0)[0]);
-        assertTrue(actual.getRows2().isEmpty());
-    }
-    
-    @Test
-    public void assertReadUpdateRowV2WithoutNullValue() {
-        when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.UPDATE_ROWS_EVENTv2.getValue());
-        when(payload.readInt8()).thenReturn(Long.MAX_VALUE, Long.MIN_VALUE);
-        MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
-        actual.readRows(tableMapEventPacket, payload);
-        assertBinlogRowsEventV2BeforeRows(actual);
-        assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
-        assertFalse(actual.getColumnsPresentBitmap2().isNullParameter(0));
-        assertThat(actual.getRows().size(), is(1));
-        assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
-        assertThat(actual.getRows2().size(), is(1));
-        assertThat(actual.getRows2().get(0)[0], is(Long.MIN_VALUE));
-    }
-    
-    private void assertBinlogRowsEventV2BeforeRows(final MySQLBinlogRowsEventPacket actual) {
-        assertThat(actual.getTableId(), is(1L));
-        assertThat(actual.getFlags(), is(2));
-        verify(payload).skipReserved(0);
-        assertThat(actual.getColumnNumber(), is(1));
+    private static Object invokeMethod(final Object target, final String methodName, final Class<?>[] parameterTypes,
+                                       final Object[] parameterValues) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Method method = target.getClass().getDeclaredMethod(methodName, parameterTypes);
+        method.setAccessible(true);
+        return method.invoke(target, parameterValues);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 850594cc7d3..b449e164be2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
+import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
@@ -38,6 +39,7 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlog
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
 
 import java.util.List;
+import java.util.Optional;
 
 /**
  * MySQL binlog event packet decoder.
@@ -54,39 +56,51 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
     
     @Override
     protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
-        MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
-        skipSequenceId(payload);
-        checkError(payload);
-        MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload);
-        removeChecksum(binlogEventHeader.getEventType(), in);
+        // readable bytes must greater + seqId(1b) + statusCode(1b) + header-length(19b) +
+        while (in.readableBytes() >= 2 + MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
+            in.markReaderIndex();
+            MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
+            skipSequenceId(payload);
+            checkError(payload);
+            MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
+            // make sure event has complete body
+            if (in.readableBytes() < binlogEventHeader.getEventSize() - MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
+                log.debug("the event body is not complete, event size={}, readable bytes={}", binlogEventHeader.getEventSize(), in.readableBytes());
+                in.resetReaderIndex();
+                break;
+            }
+            Optional.ofNullable(decodeEvent(payload, binlogEventHeader)).ifPresent(out::add);
+            skipChecksum(binlogEventHeader.getEventType(), in);
+        }
+    }
+    
+    private AbstractBinlogEvent decodeEvent(final MySQLPacketPayload payload, final MySQLBinlogEventHeader binlogEventHeader) {
         switch (MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType())) {
             case ROTATE_EVENT:
                 decodeRotateEvent(binlogEventHeader, payload);
-                break;
+                return null;
             case FORMAT_DESCRIPTION_EVENT:
                 new MySQLBinlogFormatDescriptionEventPacket(binlogEventHeader, payload);
-                break;
+                return null;
             case TABLE_MAP_EVENT:
                 decodeTableMapEvent(binlogEventHeader, payload);
-                break;
+                return null;
             case WRITE_ROWS_EVENTv1:
             case WRITE_ROWS_EVENTv2:
-                out.add(decodeWriteRowsEventV2(binlogEventHeader, payload));
-                break;
+                return decodeWriteRowsEventV2(binlogEventHeader, payload);
             case UPDATE_ROWS_EVENTv1:
             case UPDATE_ROWS_EVENTv2:
-                out.add(decodeUpdateRowsEventV2(binlogEventHeader, payload));
-                break;
+                return decodeUpdateRowsEventV2(binlogEventHeader, payload);
             case DELETE_ROWS_EVENTv1:
             case DELETE_ROWS_EVENTv2:
-                out.add(decodeDeleteRowsEventV2(binlogEventHeader, payload));
-                break;
+                return decodeDeleteRowsEventV2(binlogEventHeader, payload);
             default:
-                out.add(createPlaceholderEvent(binlogEventHeader));
-                payload.skipReserved(payload.getByteBuf().readableBytes());
-        }
-        if (in.isReadable()) {
-            throw new UnsupportedOperationException(String.format("Do not parse binlog event fully, eventHeader: %s, remaining packet %s", binlogEventHeader, readRemainPacket(payload)));
+                PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader);
+                int remainDataLength = binlogEventHeader.getEventSize() + 2 - binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
+                if (remainDataLength > 0) {
+                    payload.skipReserved(remainDataLength);
+                }
+                return result;
         }
     }
     
@@ -112,9 +126,9 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
         return ByteBufUtil.hexDump(payload.readStringFixByBytes(payload.getByteBuf().readableBytes()));
     }
     
-    private void removeChecksum(final int eventType, final ByteBuf in) {
+    private void skipChecksum(final int eventType, final ByteBuf in) {
         if (0 < binlogContext.getChecksumLength() && MySQLBinlogEventType.FORMAT_DESCRIPTION_EVENT.getValue() != eventType) {
-            in.writerIndex(in.writerIndex() - binlogContext.getChecksumLength());
+            in.skipBytes(binlogContext.getChecksumLength());
         }
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 97ead238669..c9424aaf3dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -17,16 +17,21 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty;
 
+import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.internal.StringUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
 import org.apache.shardingsphere.db.protocol.CommonConstants;
-import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,15 +39,14 @@ import org.mockito.Answers;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -51,9 +55,6 @@ public final class MySQLBinlogEventPacketDecoderTest {
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ChannelHandlerContext channelHandlerContext;
     
-    @Mock
-    private ByteBuf byteBuf;
-    
     @Mock
     private MySQLBinlogTableMapEventPacket tableMapEventPacket;
     
@@ -61,87 +62,133 @@ public final class MySQLBinlogEventPacketDecoderTest {
     
     private MySQLBinlogEventPacketDecoder binlogEventPacketDecoder;
     
+    private List<MySQLBinlogColumnDef> columnDefs;
+    
     @Before
     public void setUp() throws NoSuchFieldException, IllegalAccessException {
         binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4);
         binlogContext = ReflectionUtil.getFieldValue(binlogEventPacketDecoder, "binlogContext", BinlogContext.class);
         when(channelHandlerContext.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()).thenReturn(StandardCharsets.UTF_8);
+        columnDefs = Lists.newArrayList(new MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONGLONG), new MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONG),
+                new MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_VARCHAR), new MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_NEWDECIMAL));
     }
     
     @Test(expected = RuntimeException.class)
     public void assertDecodeWithPacketError() {
-        when(byteBuf.readUnsignedByte()).thenReturn((short) 255);
+        ByteBuf byteBuf = Unpooled.buffer();
+        byteBuf.writeByte(1);
+        byteBuf.writeByte(255);
+        byteBuf.writeBytes(new byte[20]);
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, null);
     }
     
-    @Test(expected = UnsupportedOperationException.class)
-    public void assertDecodeWithReadError() {
-        when(byteBuf.isReadable()).thenReturn(true);
-        binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, new LinkedList<>());
-    }
-    
     @Test
     public void assertDecodeRotateEvent() {
-        when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.ROTATE_EVENT.getValue());
+        ByteBuf byteBuf = Unpooled.buffer();
+        byteBuf.writeBytes(StringUtil.decodeHexDump("01000000000004010000002c0000000000000020001a9100000000000062696e6c6f672e3030303032394af65c24"));
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
-        assertTrue(decodedEvents.isEmpty());
-        assertThat(binlogContext.getFileName(), is(""));
+        assertThat(decodedEvents.size(), is(0));
+        assertThat(binlogContext.getFileName(), is("binlog.000029"));
     }
     
     @Test
     public void assertDecodeFormatDescriptionEvent() {
-        when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.FORMAT_DESCRIPTION_EVENT.getValue(), (short) 19);
-        when(byteBuf.readUnsignedShortLE()).thenReturn(4);
+        ByteBuf byteBuf = Unpooled.buffer();
+        byteBuf.writeBytes(StringUtil.decodeHexDump("0200513aa8620f01000000790000000000000000000400382e302e323700000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
+                + "000000000013000d0008000000000400040000006100041a08000000080808020000000a0a0a2a2a001234000a280140081396"));
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
-        assertTrue(decodedEvents.isEmpty());
+        assertThat(decodedEvents.size(), is(0));
         assertThat(binlogContext.getChecksumLength(), is(4));
     }
     
     @Test
     public void assertDecodeTableMapEvent() {
-        when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.TABLE_MAP_EVENT.getValue(), (short) 0);
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+        // the hex data is from binlog data, The first event used in Row Based Replication
+        byteBuf.writeBytes(StringUtil.decodeHexDump("3400cb38a962130100000041000000be7d000000007b000000000001000464735f310009745f6f726465725f31000408030ff604c8000a020c0101000201e0ff0a9b3a"));
+        binlogContext.getTableMap().put(123L, tableMapEventPacket);
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
-        assertTrue(decodedEvents.isEmpty());
         assertThat(binlogContext.getTableMap().size(), is(1));
-        assertThat(binlogContext.getTableMap().get(0L), instanceOf(MySQLBinlogTableMapEventPacket.class));
+        assertThat(binlogContext.getTableMap().get(123L), instanceOf(MySQLBinlogTableMapEventPacket.class));
     }
     
     @Test
     public void assertDecodeWriteRowEvent() {
-        when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue(), (short) 0);
-        when(byteBuf.readUnsignedShortLE()).thenReturn(2);
-        binlogContext.getTableMap().put(0L, tableMapEventPacket);
-        when(tableMapEventPacket.getColumnDefs()).thenReturn(Collections.emptyList());
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+        // the hex data is from INSERT INTO t_order(order_id, user_id, status, t_numeric) VALUES (1, 1, 'SUCCESS',null);
+        byteBuf.writeBytes(StringUtil.decodeHexDump("30007a36a9621e0100000038000000bb7c000000007b00000000000100020004ff08010000000000000001000000075355434345535365eff9ff"));
+        binlogContext.getTableMap().put(123L, tableMapEventPacket);
+        when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
         assertThat(decodedEvents.size(), is(1));
         assertThat(decodedEvents.get(0), instanceOf(WriteRowsEvent.class));
+        WriteRowsEvent actual = (WriteRowsEvent) decodedEvents.get(0);
+        assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, "SUCCESS", null}));
     }
     
     @Test
     public void assertDecodeUpdateRowEvent() {
-        when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.UPDATE_ROWS_EVENTv2.getValue(), (short) 0);
-        when(byteBuf.readUnsignedShortLE()).thenReturn(2);
-        binlogContext.getTableMap().put(0L, tableMapEventPacket);
-        when(tableMapEventPacket.getColumnDefs()).thenReturn(Collections.emptyList());
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+        // the hex data is from update t_order set status = 'updated' where order_id = 1;
+        byteBuf.writeBytes(StringUtil.decodeHexDump("3500cb38a9621f010000004e0000000c7e000000007b00000000000100020004ffff08010000000000000001000000075355434345535308010000000000000001000000077570"
+                + "6461746564e78cee6c"));
+        binlogContext.getTableMap().put(123L, tableMapEventPacket);
+        when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
         assertThat(decodedEvents.size(), is(1));
         assertThat(decodedEvents.get(0), instanceOf(UpdateRowsEvent.class));
+        UpdateRowsEvent actual = (UpdateRowsEvent) decodedEvents.get(0);
+        assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, "SUCCESS", null}));
+        assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, "updated", null}));
     }
     
     @Test
     public void assertDecodeDeleteRowEvent() {
-        when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.DELETE_ROWS_EVENTv2.getValue(), (short) 0);
-        when(byteBuf.readUnsignedShortLE()).thenReturn(2);
-        binlogContext.getTableMap().put(0L, tableMapEventPacket);
-        when(tableMapEventPacket.getColumnDefs()).thenReturn(Collections.emptyList());
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+        // delete from t_order where order_id = 1;
+        byteBuf.writeBytes(StringUtil.decodeHexDump("51002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5"));
+        binlogContext.getTableMap().put(116L, tableMapEventPacket);
+        when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
         List<Object> decodedEvents = new LinkedList<>();
         binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
         assertThat(decodedEvents.size(), is(1));
         assertThat(decodedEvents.get(0), instanceOf(DeleteRowsEvent.class));
+        DeleteRowsEvent actual = (DeleteRowsEvent) decodedEvents.get(0);
+        assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, "SUCCESS", null}));
+    }
+    
+    @Test
+    public void assertBinlogEventHeaderIncomplete() {
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+        byte[] completeData = StringUtil.decodeHexDump("51002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5");
+        byteBuf.writeBytes(completeData);
+        // write incomplete event data
+        byteBuf.writeBytes(StringUtil.decodeHexDump("3400"));
+        List<Object> decodedEvents = new LinkedList<>();
+        binlogContext.getTableMap().put(116L, tableMapEventPacket);
+        when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
+        binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
+        assertThat(decodedEvents.size(), is(1));
+        assertThat(byteBuf.readerIndex(), is(completeData.length));
+    }
+    
+    @Test
+    public void assertBinlogEventBodyIncomplete() {
+        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+        byte[] completeData = StringUtil.decodeHexDump("51002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5");
+        byteBuf.writeBytes(completeData);
+        byte[] notCompleteData = StringUtil.decodeHexDump("3400cb38a962130100000041000000be7d000000007b000000000001000464735f310009745f6f726465725f31000408030f");
+        byteBuf.writeBytes(notCompleteData);
+        List<Object> decodedEvents = new LinkedList<>();
+        binlogContext.getTableMap().put(116L, tableMapEventPacket);
+        when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
+        binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
+        assertThat(decodedEvents.size(), is(1));
+        assertThat(byteBuf.readerIndex(), is(completeData.length));
     }
 }