You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/06/15 13:26:21 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #18375: Fix scaling increment task consumed incomplete data

sandynz commented on code in PR #18375:
URL: https://github.com/apache/shardingsphere/pull/18375#discussion_r897885545


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -54,39 +55,48 @@ public MySQLBinlogEventPacketDecoder(final int checksumLength) {
     
     @Override
     protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
-        MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
-        skipSequenceId(payload);
-        checkError(payload);
-        MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload);
-        removeChecksum(binlogEventHeader.getEventType(), in);
+        // readable bytes must grete + seqId(1b) + statusCode(1b) + header-length(19b) + 
+        while (in.readableBytes() >= 2 + MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
+            in.markReaderIndex();
+            MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
+            skipSequenceId(payload);
+            checkError(payload);
+            MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
+            // make sure event has complete body
+            if (in.readableBytes() < binlogEventHeader.getEventSize() - MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
+                log.debug("the event body is not complete, event size={}, readable bytes={}", binlogEventHeader.getEventSize(), in.readableBytes());
+                in.resetReaderIndex();
+                break;
+            }
+            out.add(decodeEvent(payload, binlogEventHeader));
+            skipChecksum(binlogEventHeader.getEventType(), in);
+        }
+    }
+    
+    private Object decodeEvent(final MySQLPacketPayload payload, final MySQLBinlogEventHeader binlogEventHeader) {
         switch (MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType())) {
             case ROTATE_EVENT:
-                decodeRotateEvent(binlogEventHeader, payload);
-                break;
+                return decodeRotateEvent(binlogEventHeader, payload);
             case FORMAT_DESCRIPTION_EVENT:
-                new MySQLBinlogFormatDescriptionEventPacket(binlogEventHeader, payload);
-                break;
+                return new MySQLBinlogFormatDescriptionEventPacket(binlogEventHeader, payload);
             case TABLE_MAP_EVENT:
-                decodeTableMapEvent(binlogEventHeader, payload);
-                break;
+                return decodeTableMapEvent(binlogEventHeader, payload);

Review Comment:
   Some types of event didn't write packet to `out` before, need to verify there's no side effect if we write more packet in `out`.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -54,39 +56,51 @@ public MySQLBinlogEventPacketDecoder(final int checksumLength) {
     
     @Override
     protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
-        MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
-        skipSequenceId(payload);
-        checkError(payload);
-        MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload);
-        removeChecksum(binlogEventHeader.getEventType(), in);
+        // readable bytes must grete + seqId(1b) + statusCode(1b) + header-length(19b) +
+        while (in.readableBytes() >= 2 + MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
+            in.markReaderIndex();
+            MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
+            skipSequenceId(payload);
+            checkError(payload);
+            MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload, binlogContext.getChecksumLength());
+            // make sure event has complete body
+            if (in.readableBytes() < binlogEventHeader.getEventSize() - MySQLBinlogEventHeader.MYSQL_BINLOG_EVENT_HEADER_LENGTH) {
+                log.debug("the event body is not complete, event size={}, readable bytes={}", binlogEventHeader.getEventSize(), in.readableBytes());
+                in.resetReaderIndex();
+                break;
+            }
+            Object event = decodeEvent(payload, binlogEventHeader);
+            if (event instanceof AbstractBinlogEvent) {
+                out.add(event);
+            }

Review Comment:
   Seems PlaceholderEvent is instance of AbstractBinlogEvent, could it be improved?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java:
##########
@@ -54,39 +56,51 @@ public MySQLBinlogEventPacketDecoder(final int checksumLength) {
     
     @Override
     protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
-        MySQLPacketPayload payload = new MySQLPacketPayload(in, ctx.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get());
-        skipSequenceId(payload);
-        checkError(payload);
-        MySQLBinlogEventHeader binlogEventHeader = new MySQLBinlogEventHeader(payload);
-        removeChecksum(binlogEventHeader.getEventType(), in);
+        // readable bytes must grete + seqId(1b) + statusCode(1b) + header-length(19b) +

Review Comment:
   `grete` means greater than?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org