You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/06/16 05:40:20 UTC
[shardingsphere] branch master updated: Improve scaling increment task mysql binlog decoding (#18375)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f0b8224c8f2 Improve scaling increment task mysql binlog decoding (#18375)
f0b8224c8f2 is described below
commit f0b8224c8f2c1d50e1a99265a7c89e59eb6c46b0
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Thu Jun 16 13:40:12 2022 +0800
Improve scaling increment task mysql binlog decoding (#18375)
* Improve MySQLBinlogEventPacketDecoder decode method, use fix length read binlog
* Improve ignore checksums, fix unit test
* Fix codestyle
---
.../binlog/AbstractMySQLBinlogEventPacket.java | 11 ++
.../packet/binlog/MySQLBinlogEventHeader.java | 8 +-
.../management/MySQLBinlogRotateEventPacket.java | 2 +-
.../binlog/row/MySQLBinlogRowsEventPacket.java | 6 +-
.../binlog/row/MySQLBinlogTableMapEventPacket.java | 5 +-
.../packet/binlog/MySQLBinlogEventHeaderTest.java | 4 +-
.../MySQLBinlogRotateEventPacketTest.java | 5 +-
.../binlog/row/MySQLBinlogRowsEventPacketTest.java | 105 +++----------------
.../netty/MySQLBinlogEventPacketDecoder.java | 56 ++++++----
.../netty/MySQLBinlogEventPacketDecoderTest.java | 115 +++++++++++++++------
10 files changed, 159 insertions(+), 158 deletions(-)
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/AbstractMySQLBinlogEventPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/AbstractMySQLBinlogEventPacket.java
index 363e18a3f0d..e87533c9c12 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/AbstractMySQLBinlogEventPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/AbstractMySQLBinlogEventPacket.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.db.protocol.mysql.packet.binlog;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
@@ -28,6 +29,7 @@ import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
*/
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
+@Slf4j
public abstract class AbstractMySQLBinlogEventPacket implements MySQLPacket, MySQLBinlogEventPacket {
private final MySQLBinlogEventHeader binlogEventHeader;
@@ -45,6 +47,15 @@ public abstract class AbstractMySQLBinlogEventPacket implements MySQLPacket, MyS
*/
protected abstract void writeEvent(MySQLPacketPayload payload);
+ protected int getRemainBytesLength(final MySQLPacketPayload payload) {
+ // minus checksum bytes, add seq id 1 byte, statusCode 1 byte(not include at event size)
+ int alreadyReadIndex = binlogEventHeader.getEventSize() + 2 - binlogEventHeader.getChecksumLength();
+ if (payload.getByteBuf().readerIndex() > alreadyReadIndex) {
+ return -1;
+ }
+ return alreadyReadIndex - payload.getByteBuf().readerIndex();
+ }
+
@Override
public final int getSequenceId() {
return 0;
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeader.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeader.java
index 1221fb9de9d..2501070c9bc 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeader.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeader.java
@@ -44,19 +44,25 @@ public final class MySQLBinlogEventHeader implements MySQLPacket {
private final int serverId;
+ /**
+ * Size of the event (header, post-header, body).
+ */
private final int eventSize;
private final int logPos;
private final int flags;
- public MySQLBinlogEventHeader(final MySQLPacketPayload payload) {
+ private final int checksumLength;
+
+ public MySQLBinlogEventHeader(final MySQLPacketPayload payload, final int checksumLength) {
timestamp = payload.readInt4();
eventType = payload.readInt1();
serverId = payload.readInt4();
eventSize = payload.readInt4();
logPos = payload.readInt4();
flags = payload.readInt2();
+ this.checksumLength = checksumLength;
}
@Override
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacket.java
index 0ce642bb105..6de434c5b67 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacket.java
@@ -43,7 +43,7 @@ public final class MySQLBinlogRotateEventPacket extends AbstractMySQLBinlogEvent
public MySQLBinlogRotateEventPacket(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
super(binlogEventHeader);
position = payload.readInt8();
- nextBinlogName = payload.readStringEOF();
+ nextBinlogName = payload.readStringFix(getRemainBytesLength(payload));
}
@Override
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacket.java
index e18bb9ba526..e548a7907bb 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacket.java
@@ -90,7 +90,7 @@ public final class MySQLBinlogRowsEventPacket extends AbstractMySQLBinlogEventPa
*/
public void readRows(final MySQLBinlogTableMapEventPacket tableMapEventPacket, final MySQLPacketPayload payload) {
List<MySQLBinlogColumnDef> columnDefs = tableMapEventPacket.getColumnDefs();
- while (hasNextRow(payload)) {
+ while (getRemainBytesLength(payload) > 0) {
rows.add(readRow(columnDefs, payload));
if (isUpdateRowsEvent(getBinlogEventHeader().getEventType())) {
rows2.add(readRow(columnDefs, payload));
@@ -98,10 +98,6 @@ public final class MySQLBinlogRowsEventPacket extends AbstractMySQLBinlogEventPa
}
}
- private boolean hasNextRow(final MySQLPacketPayload payload) {
- return payload.getByteBuf().isReadable();
- }
-
private Serializable[] readRow(final List<MySQLBinlogColumnDef> columnDefs, final MySQLPacketPayload payload) {
MySQLNullBitmap nullBitmap = new MySQLNullBitmap(columnNumber, payload);
Serializable[] result = new Serializable[columnNumber];
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogTableMapEventPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogTableMapEventPacket.java
index 88b1746f6b9..c367560f850 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogTableMapEventPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogTableMapEventPacket.java
@@ -66,7 +66,10 @@ public final class MySQLBinlogTableMapEventPacket extends AbstractMySQLBinlogEve
// mysql 8 binlog table map event include column type def
// for support lower than 8, read column type def by sql query
// so skip readable bytes here
- payload.getByteBuf().skipBytes(payload.getByteBuf().readableBytes());
+ int remainBytesLength = getRemainBytesLength(payload);
+ if (remainBytesLength > 0) {
+ payload.skipReserved(remainBytesLength);
+ }
}
private void readColumnDefs(final MySQLPacketPayload payload) {
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeaderTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeaderTest.java
index 242fc34d30c..66bf26912e5 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeaderTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/MySQLBinlogEventHeaderTest.java
@@ -41,7 +41,7 @@ public final class MySQLBinlogEventHeaderTest {
when(payload.readInt4()).thenReturn(1234567890, 123456, 19, 4);
when(payload.readInt1()).thenReturn(MySQLBinlogEventType.UNKNOWN_EVENT.getValue());
when(payload.readInt2()).thenReturn(MySQLBinlogEventFlag.LOG_EVENT_BINLOG_IN_USE_F.getValue());
- MySQLBinlogEventHeader actual = new MySQLBinlogEventHeader(payload);
+ MySQLBinlogEventHeader actual = new MySQLBinlogEventHeader(payload, 4);
assertThat(actual.getSequenceId(), is(0));
assertThat(actual.getTimestamp(), is(1234567890));
assertThat(actual.getEventType(), is(MySQLBinlogEventType.UNKNOWN_EVENT.getValue()));
@@ -53,7 +53,7 @@ public final class MySQLBinlogEventHeaderTest {
@Test
public void assertWrite() {
- new MySQLBinlogEventHeader(1234567890, MySQLBinlogEventType.UNKNOWN_EVENT.getValue(), 123456, 19, 4, MySQLBinlogEventFlag.LOG_EVENT_BINLOG_IN_USE_F.getValue()).write(payload);
+ new MySQLBinlogEventHeader(1234567890, MySQLBinlogEventType.UNKNOWN_EVENT.getValue(), 123456, 19, 4, MySQLBinlogEventFlag.LOG_EVENT_BINLOG_IN_USE_F.getValue(), 4).write(payload);
verify(payload).writeInt4(1234567890);
verify(payload).writeInt1(MySQLBinlogEventType.UNKNOWN_EVENT.getValue());
verify(payload).writeInt4(123456);
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacketTest.java
index 4fc95a2b606..6073b538cd7 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/management/MySQLBinlogRotateEventPacketTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.db.protocol.mysql.packet.binlog.management;
+import io.netty.buffer.Unpooled;
import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import org.junit.Test;
@@ -26,6 +27,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -41,7 +43,8 @@ public final class MySQLBinlogRotateEventPacketTest {
@Test
public void assertNew() {
when(payload.readInt8()).thenReturn(4L);
- when(payload.readStringEOF()).thenReturn("binlog-000001");
+ when(payload.readStringFix(anyInt())).thenReturn("binlog-000001");
+ when(payload.getByteBuf()).thenReturn(Unpooled.buffer());
MySQLBinlogRotateEventPacket actual = new MySQLBinlogRotateEventPacket(binlogEventHeader, payload);
assertThat(actual.getSequenceId(), is(0));
assertThat(actual.getBinlogEventHeader(), is(binlogEventHeader));
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacketTest.java
index c897dfd30fb..059fd8b4a03 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/MySQLBinlogRowsEventPacketTest.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row;
import io.netty.buffer.ByteBuf;
-import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader;
import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
@@ -29,6 +28,10 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -36,7 +39,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -53,20 +55,14 @@ public final class MySQLBinlogRowsEventPacketTest {
@Mock
private MySQLBinlogEventHeader binlogEventHeader;
- @Mock
- private MySQLBinlogTableMapEventPacket tableMapEventPacket;
-
private List<MySQLBinlogColumnDef> columnDefs;
@Before
public void setUp() {
mockColumnDefs();
- when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
when(payload.readInt6()).thenReturn(1L);
when(payload.readInt2()).thenReturn(2);
when(payload.readIntLenenc()).thenReturn(1L);
- when(payload.getByteBuf()).thenReturn(byteBuf);
- when(byteBuf.isReadable()).thenReturn(true, false);
}
private void mockColumnDefs() {
@@ -75,46 +71,14 @@ public final class MySQLBinlogRowsEventPacketTest {
}
@Test
- public void assertReadWriteRowV1WithoutNullValue() {
- when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv1.getValue());
- when(payload.readInt8()).thenReturn(Long.MAX_VALUE);
+ public void assertReadWriteRowV1WithoutNullValue() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- actual.readRows(tableMapEventPacket, payload);
assertBinlogRowsEventV1BeforeRows(actual);
assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
assertNull(actual.getColumnsPresentBitmap2());
- assertThat(actual.getRows().size(), is(1));
- assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
- assertTrue(actual.getRows2().isEmpty());
- }
-
- @Test
- public void assertReadWriteRowV1WithNullValue() {
- when(payload.readInt1()).thenReturn(0x01);
- when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv1.getValue());
- MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- actual.readRows(tableMapEventPacket, payload);
- assertBinlogRowsEventV1BeforeRows(actual);
- assertTrue(actual.getColumnsPresentBitmap().isNullParameter(0));
- assertNull(actual.getColumnsPresentBitmap2());
- assertThat(actual.getRows().size(), is(1));
- assertNull(actual.getRows().get(0)[0]);
- assertTrue(actual.getRows2().isEmpty());
- }
-
- @Test
- public void assertReadUpdateRowV1WithoutNullValue() {
- when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.UPDATE_ROWS_EVENTv1.getValue());
- when(payload.readInt8()).thenReturn(Long.MAX_VALUE, Long.MIN_VALUE);
- MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- actual.readRows(tableMapEventPacket, payload);
- assertBinlogRowsEventV1BeforeRows(actual);
- assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
- assertFalse(actual.getColumnsPresentBitmap2().isNullParameter(0));
- assertThat(actual.getRows().size(), is(1));
- assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
- assertThat(actual.getRows2().size(), is(1));
- assertThat(actual.getRows2().get(0)[0], is(Long.MIN_VALUE));
+ MySQLPacketPayload packetPayload = new MySQLPacketPayload(byteBuf, StandardCharsets.UTF_8);
+ Serializable[] result = (Serializable[]) invokeMethod(actual, "readRow", new Class[]{List.class, MySQLPacketPayload.class}, new Object[]{columnDefs, packetPayload});
+ assertThat(result[0], is(0L));
}
private void assertBinlogRowsEventV1BeforeRows(final MySQLBinlogRowsEventPacket actual) {
@@ -124,53 +88,10 @@ public final class MySQLBinlogRowsEventPacketTest {
assertThat(actual.getColumnNumber(), is(1));
}
- @Test
- public void assertReadWriteRowV2WithoutNullValue() {
- when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue());
- when(payload.readInt8()).thenReturn(Long.MAX_VALUE);
- MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- actual.readRows(tableMapEventPacket, payload);
- assertBinlogRowsEventV2BeforeRows(actual);
- assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
- assertNull(actual.getColumnsPresentBitmap2());
- assertThat(actual.getRows().size(), is(1));
- assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
- assertTrue(actual.getRows2().isEmpty());
- }
-
- @Test
- public void assertReadWriteRowV2WithNullValue() {
- when(payload.readInt1()).thenReturn(0x01);
- when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue());
- MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- actual.readRows(tableMapEventPacket, payload);
- assertBinlogRowsEventV2BeforeRows(actual);
- assertTrue(actual.getColumnsPresentBitmap().isNullParameter(0));
- assertNull(actual.getColumnsPresentBitmap2());
- assertThat(actual.getRows().size(), is(1));
- assertNull(actual.getRows().get(0)[0]);
- assertTrue(actual.getRows2().isEmpty());
- }
-
- @Test
- public void assertReadUpdateRowV2WithoutNullValue() {
- when(binlogEventHeader.getEventType()).thenReturn(MySQLBinlogEventType.UPDATE_ROWS_EVENTv2.getValue());
- when(payload.readInt8()).thenReturn(Long.MAX_VALUE, Long.MIN_VALUE);
- MySQLBinlogRowsEventPacket actual = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload);
- actual.readRows(tableMapEventPacket, payload);
- assertBinlogRowsEventV2BeforeRows(actual);
- assertFalse(actual.getColumnsPresentBitmap().isNullParameter(0));
- assertFalse(actual.getColumnsPresentBitmap2().isNullParameter(0));
- assertThat(actual.getRows().size(), is(1));
- assertThat(actual.getRows().get(0)[0], is(Long.MAX_VALUE));
- assertThat(actual.getRows2().size(), is(1));
- assertThat(actual.getRows2().get(0)[0], is(Long.MIN_VALUE));
- }
-
- private void assertBinlogRowsEventV2BeforeRows(final MySQLBinlogRowsEventPacket actual) {
- assertThat(actual.getTableId(), is(1L));
- assertThat(actual.getFlags(), is(2));
- verify(payload).skipReserved(0);
- assertThat(actual.getColumnNumber(), is(1));
+ private static Object invokeMethod(final Object target, final String methodName, final Class<?>[] parameterTypes,
+ final Object[] parameterValues) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method method = target.getClass().getDeclaredMethod(methodName, parameterTypes);
+ method.setAccessible(true);
+ return method.invoke(target, parameterValues);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
index 850594cc7d3..b449e164be2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
+import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
@@ -38,6 +39,7 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlog
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
import java.util.List;
+import java.util.Optional;
/**
* MySQL binlog event packet decoder.
@@ -54,39 +56,51 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
- MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
- skipSequenceId(payload);
- checkError(payload);
- MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload);
- removeChecksum(binlogEventHeader.getEventType(), in);
+ // readable bytes must greater + seqId(1b) + statusCode(1b) + header-length(19b) +
+ while (in.readableBytes() >= 2 + MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
+ in.markReaderIndex();
+ MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
+ skipSequenceId(payload);
+ checkError(payload);
+ MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
+ // make sure event has complete body
+ if (in.readableBytes() < binlogEventHeader.getEventSize() - MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
+ log.debug("the event body is not complete, event size={}, readable bytes={}", binlogEventHeader.getEventSize(), in.readableBytes());
+ in.resetReaderIndex();
+ break;
+ }
+ Optional.ofNullable(decodeEvent(payload, binlogEventHeader)).ifPresent(out::add);
+ skipChecksum(binlogEventHeader.getEventType(), in);
+ }
+ }
+
+ private AbstractBinlogEvent decodeEvent(final MySQLPacketPayload payload, final MySQLBinlogEventHeader binlogEventHeader) {
switch (MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType())) {
case ROTATE_EVENT:
decodeRotateEvent(binlogEventHeader, payload);
- break;
+ return null;
case FORMAT_DESCRIPTION_EVENT:
new MySQLBinlogFormatDescriptionEventPacket(binlogEventHeader, payload);
- break;
+ return null;
case TABLE_MAP_EVENT:
decodeTableMapEvent(binlogEventHeader, payload);
- break;
+ return null;
case WRITE_ROWS_EVENTv1:
case WRITE_ROWS_EVENTv2:
- out.add(decodeWriteRowsEventV2(binlogEventHeader, payload));
- break;
+ return decodeWriteRowsEventV2(binlogEventHeader, payload);
case UPDATE_ROWS_EVENTv1:
case UPDATE_ROWS_EVENTv2:
- out.add(decodeUpdateRowsEventV2(binlogEventHeader, payload));
- break;
+ return decodeUpdateRowsEventV2(binlogEventHeader, payload);
case DELETE_ROWS_EVENTv1:
case DELETE_ROWS_EVENTv2:
- out.add(decodeDeleteRowsEventV2(binlogEventHeader, payload));
- break;
+ return decodeDeleteRowsEventV2(binlogEventHeader, payload);
default:
- out.add(createPlaceholderEvent(binlogEventHeader));
- payload.skipReserved(payload.getByteBuf().readableBytes());
- }
- if (in.isReadable()) {
- throw new UnsupportedOperationException(String.format("Do not parse binlog event fully, eventHeader: %s, remaining packet %s", binlogEventHeader, readRemainPacket(payload)));
+ PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader);
+ int remainDataLength = binlogEventHeader.getEventSize() + 2 - binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex();
+ if (remainDataLength > 0) {
+ payload.skipReserved(remainDataLength);
+ }
+ return result;
}
}
@@ -112,9 +126,9 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
return ByteBufUtil.hexDump(payload.readStringFixByBytes(payload.getByteBuf().readableBytes()));
}
- private void removeChecksum(final int eventType, final ByteBuf in) {
+ private void skipChecksum(final int eventType, final ByteBuf in) {
if (0 < binlogContext.getChecksumLength() && MySQLBinlogEventType.FORMAT_DESCRIPTION_EVENT.getValue() != eventType) {
- in.writerIndex(in.writerIndex() - binlogContext.getChecksumLength());
+ in.skipBytes(binlogContext.getChecksumLength());
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
index 97ead238669..c9424aaf3dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -17,16 +17,21 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty;
+import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.internal.StringUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
import org.apache.shardingsphere.db.protocol.CommonConstants;
-import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.column.MySQLBinlogColumnDef;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -34,15 +39,14 @@ import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.io.Serializable;
import java.nio.charset.StandardCharsets;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -51,9 +55,6 @@ public final class MySQLBinlogEventPacketDecoderTest {
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ChannelHandlerContext channelHandlerContext;
- @Mock
- private ByteBuf byteBuf;
-
@Mock
private MySQLBinlogTableMapEventPacket tableMapEventPacket;
@@ -61,87 +62,133 @@ public final class MySQLBinlogEventPacketDecoderTest {
private MySQLBinlogEventPacketDecoder binlogEventPacketDecoder;
+ private List<MySQLBinlogColumnDef> columnDefs;
+
@Before
public void setUp() throws NoSuchFieldException, IllegalAccessException {
binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(4);
binlogContext = ReflectionUtil.getFieldValue(binlogEventPacketDecoder, "binlogContext", BinlogContext.class);
when(channelHandlerContext.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()).thenReturn(StandardCharsets.UTF_8);
+ columnDefs = Lists.newArrayList(new MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONGLONG), new MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_LONG),
+ new MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_VARCHAR), new MySQLBinlogColumnDef(MySQLBinaryColumnType.MYSQL_TYPE_NEWDECIMAL));
}
@Test(expected = RuntimeException.class)
public void assertDecodeWithPacketError() {
- when(byteBuf.readUnsignedByte()).thenReturn((short) 255);
+ ByteBuf byteBuf = Unpooled.buffer();
+ byteBuf.writeByte(1);
+ byteBuf.writeByte(255);
+ byteBuf.writeBytes(new byte[20]);
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, null);
}
- @Test(expected = UnsupportedOperationException.class)
- public void assertDecodeWithReadError() {
- when(byteBuf.isReadable()).thenReturn(true);
- binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, new LinkedList<>());
- }
-
@Test
public void assertDecodeRotateEvent() {
- when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.ROTATE_EVENT.getValue());
+ ByteBuf byteBuf = Unpooled.buffer();
+ byteBuf.writeBytes(StringUtil.decodeHexDump("01000000000004010000002c0000000000000020001a9100000000000062696e6c6f672e3030303032394af65c24"));
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
- assertTrue(decodedEvents.isEmpty());
- assertThat(binlogContext.getFileName(), is(""));
+ assertThat(decodedEvents.size(), is(0));
+ assertThat(binlogContext.getFileName(), is("binlog.000029"));
}
@Test
public void assertDecodeFormatDescriptionEvent() {
- when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.FORMAT_DESCRIPTION_EVENT.getValue(), (short) 19);
- when(byteBuf.readUnsignedShortLE()).thenReturn(4);
+ ByteBuf byteBuf = Unpooled.buffer();
+ byteBuf.writeBytes(StringUtil.decodeHexDump("0200513aa8620f01000000790000000000000000000400382e302e323700000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
+ + "000000000013000d0008000000000400040000006100041a08000000080808020000000a0a0a2a2a001234000a280140081396"));
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
- assertTrue(decodedEvents.isEmpty());
+ assertThat(decodedEvents.size(), is(0));
assertThat(binlogContext.getChecksumLength(), is(4));
}
@Test
public void assertDecodeTableMapEvent() {
- when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.TABLE_MAP_EVENT.getValue(), (short) 0);
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+ // the hex data is from binlog data, The first event used in Row Based Replication
+ byteBuf.writeBytes(StringUtil.decodeHexDump("3400cb38a962130100000041000000be7d000000007b000000000001000464735f310009745f6f726465725f31000408030ff604c8000a020c0101000201e0ff0a9b3a"));
+ binlogContext.getTableMap().put(123L, tableMapEventPacket);
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
- assertTrue(decodedEvents.isEmpty());
assertThat(binlogContext.getTableMap().size(), is(1));
- assertThat(binlogContext.getTableMap().get(0L), instanceOf(MySQLBinlogTableMapEventPacket.class));
+ assertThat(binlogContext.getTableMap().get(123L), instanceOf(MySQLBinlogTableMapEventPacket.class));
}
@Test
public void assertDecodeWriteRowEvent() {
- when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.WRITE_ROWS_EVENTv2.getValue(), (short) 0);
- when(byteBuf.readUnsignedShortLE()).thenReturn(2);
- binlogContext.getTableMap().put(0L, tableMapEventPacket);
- when(tableMapEventPacket.getColumnDefs()).thenReturn(Collections.emptyList());
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+ // the hex data is from INSERT INTO t_order(order_id, user_id, status, t_numeric) VALUES (1, 1, 'SUCCESS',null);
+ byteBuf.writeBytes(StringUtil.decodeHexDump("30007a36a9621e0100000038000000bb7c000000007b00000000000100020004ff08010000000000000001000000075355434345535365eff9ff"));
+ binlogContext.getTableMap().put(123L, tableMapEventPacket);
+ when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
assertThat(decodedEvents.size(), is(1));
assertThat(decodedEvents.get(0), instanceOf(WriteRowsEvent.class));
+ WriteRowsEvent actual = (WriteRowsEvent) decodedEvents.get(0);
+ assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, "SUCCESS", null}));
}
@Test
public void assertDecodeUpdateRowEvent() {
- when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.UPDATE_ROWS_EVENTv2.getValue(), (short) 0);
- when(byteBuf.readUnsignedShortLE()).thenReturn(2);
- binlogContext.getTableMap().put(0L, tableMapEventPacket);
- when(tableMapEventPacket.getColumnDefs()).thenReturn(Collections.emptyList());
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+ // the hex data is from update t_order set status = 'updated' where order_id = 1;
+ byteBuf.writeBytes(StringUtil.decodeHexDump("3500cb38a9621f010000004e0000000c7e000000007b00000000000100020004ffff08010000000000000001000000075355434345535308010000000000000001000000077570"
+ + "6461746564e78cee6c"));
+ binlogContext.getTableMap().put(123L, tableMapEventPacket);
+ when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
assertThat(decodedEvents.size(), is(1));
assertThat(decodedEvents.get(0), instanceOf(UpdateRowsEvent.class));
+ UpdateRowsEvent actual = (UpdateRowsEvent) decodedEvents.get(0);
+ assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, "SUCCESS", null}));
+ assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, "updated", null}));
}
@Test
public void assertDecodeDeleteRowEvent() {
- when(byteBuf.readUnsignedByte()).thenReturn((short) 0, (short) 0, (short) MySQLBinlogEventType.DELETE_ROWS_EVENTv2.getValue(), (short) 0);
- when(byteBuf.readUnsignedShortLE()).thenReturn(2);
- binlogContext.getTableMap().put(0L, tableMapEventPacket);
- when(tableMapEventPacket.getColumnDefs()).thenReturn(Collections.emptyList());
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+ // delete from t_order where order_id = 1;
+ byteBuf.writeBytes(StringUtil.decodeHexDump("51002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5"));
+ binlogContext.getTableMap().put(116L, tableMapEventPacket);
+ when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
assertThat(decodedEvents.size(), is(1));
assertThat(decodedEvents.get(0), instanceOf(DeleteRowsEvent.class));
+ DeleteRowsEvent actual = (DeleteRowsEvent) decodedEvents.get(0);
+ assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, "SUCCESS", null}));
+ }
+
+ @Test
+ public void assertBinlogEventHeaderIncomplete() {
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+ byte[] completeData = StringUtil.decodeHexDump("51002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5");
+ byteBuf.writeBytes(completeData);
+ // write incomplete event data
+ byteBuf.writeBytes(StringUtil.decodeHexDump("3400"));
+ List<Object> decodedEvents = new LinkedList<>();
+ binlogContext.getTableMap().put(116L, tableMapEventPacket);
+ when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
+ binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
+ assertThat(decodedEvents.size(), is(1));
+ assertThat(byteBuf.readerIndex(), is(completeData.length));
+ }
+
+ @Test
+ public void assertBinlogEventBodyIncomplete() {
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
+ byte[] completeData = StringUtil.decodeHexDump("51002a80a862200100000038000000c569000000007400000000000100020004ff0801000000000000000100000007535543434553531c9580c5");
+ byteBuf.writeBytes(completeData);
+ byte[] notCompleteData = StringUtil.decodeHexDump("3400cb38a962130100000041000000be7d000000007b000000000001000464735f310009745f6f726465725f31000408030f");
+ byteBuf.writeBytes(notCompleteData);
+ List<Object> decodedEvents = new LinkedList<>();
+ binlogContext.getTableMap().put(116L, tableMapEventPacket);
+ when(tableMapEventPacket.getColumnDefs()).thenReturn(columnDefs);
+ binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
+ assertThat(decodedEvents.size(), is(1));
+ assertThat(byteBuf.readerIndex(), is(completeData.length));
}
}