You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2021/03/04 09:12:16 UTC

[ratis] branch master updated: RATIS-1328. Avoid parse proto for each packet (#434)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b3d9200  RATIS-1328. Avoid parse proto for each packet (#434)
b3d9200 is described below

commit b3d9200ea11721a2711fd06c43c9aa8fe9e0fdb3
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Mar 4 17:12:08 2021 +0800

    RATIS-1328. Avoid parse proto for each packet (#434)
---
 .../java/org/apache/ratis/protocol/DataStreamPacketHeader.java    | 5 +++++
 .../java/org/apache/ratis/protocol/DataStreamRequestHeader.java   | 8 +++++++-
 .../main/java/org/apache/ratis/netty/NettyDataStreamUtils.java    | 6 ++++++
 3 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 3bd7512..842cf28 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -25,6 +25,7 @@ import org.apache.ratis.util.SizeInBytes;
 /** The header format is streamId, streamOffset, dataLength. */
 public class DataStreamPacketHeader extends DataStreamPacketImpl {
   private static final SizeInBytes SIZE_OF_HEADER_LEN = SizeInBytes.valueOf(4);
+  private static final SizeInBytes SIZE_OF_HEADER_BODY_LEN = SizeInBytes.valueOf(8);
 
   private final long dataLength;
 
@@ -41,4 +42,8 @@ public class DataStreamPacketHeader extends DataStreamPacketImpl {
   public static int getSizeOfHeaderLen() {
     return SIZE_OF_HEADER_LEN.getSizeInt();
   }
+
+  public static int getSizeOfHeaderBodyLen() {
+    return SIZE_OF_HEADER_BODY_LEN.getSizeInt();
+  }
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 8abbe3b..c8f8cfc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -37,7 +37,13 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
   private static final Logger LOG = LoggerFactory.getLogger(DataStreamRequestHeader.class);
 
   public static DataStreamRequestHeader read(ByteBuf buf) {
-    if (getSizeOfHeaderLen() > buf.readableBytes()) {
+    if (getSizeOfHeaderBodyLen() > buf.readableBytes()) {
+      return null;
+    }
+
+    long headerBodyBufLen = buf.readLong();
+    if (headerBodyBufLen > buf.readableBytes()) {
+      buf.resetReaderIndex();
       return null;
     }
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 1f4f7c5..d463b07 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -83,9 +83,15 @@ public interface NettyDataStreamUtils {
   static void encodeDataStreamRequestHeader(DataStreamRequest request, Consumer<Object> out,
       ByteBufAllocator allocator) {
     final ByteBuffer headerBuf = getDataStreamRequestHeaderProtoByteBuffer(request);
+
+    final ByteBuf headerBodyLenBuf = allocator.directBuffer(DataStreamPacketHeader.getSizeOfHeaderBodyLen());
+    headerBodyLenBuf.writeLong(headerBuf.remaining() + request.getDataLength());
+    out.accept(headerBodyLenBuf);
+
     final ByteBuf headerLenBuf = allocator.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
     headerLenBuf.writeInt(headerBuf.remaining());
     out.accept(headerLenBuf);
+
     out.accept(Unpooled.wrappedBuffer(headerBuf));
   }