You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2021/11/30 09:45:04 UTC

[ratis] branch master updated: RATIS-1449. Move netty related code out from ratis-common. (#543)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 10a8311  RATIS-1449. Move netty related code out from ratis-common. (#543)
10a8311 is described below

commit 10a8311b9ef425fa841a3b520cec5420b92797c5
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Nov 30 17:44:59 2021 +0800

    RATIS-1449. Move netty related code out from ratis-common. (#543)
---
 .../ratis/protocol/DataStreamReplyHeader.java      | 39 ----------
 .../ratis/protocol/DataStreamRequestHeader.java    | 50 -------------
 .../apache/ratis/netty/NettyDataStreamUtils.java   | 83 +++++++++++++++++++++-
 3 files changed, 81 insertions(+), 91 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index 9e3d4d4..f0dee6a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -18,49 +18,10 @@
 
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** The header format is {@link DataStreamPacketHeader}, bytesWritten and flags. */
 public class DataStreamReplyHeader extends DataStreamPacketHeader implements DataStreamReply {
-  private static final Logger LOG = LoggerFactory.getLogger(DataStreamReplyHeader.class);
-
-  public static DataStreamReplyHeader read(ByteBuf buf) {
-    if (getSizeOfHeaderLen() > buf.readableBytes()) {
-      return null;
-    }
-
-    int headerBufLen = buf.readInt();
-    if (headerBufLen > buf.readableBytes()) {
-      buf.resetReaderIndex();
-      return null;
-    }
-
-    try {
-      ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
-      DataStreamReplyHeaderProto header = DataStreamReplyHeaderProto.parseFrom(headerBuf.nioBuffer());
-
-      final DataStreamPacketHeaderProto h = header.getPacketHeader();
-      if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
-        buf.readerIndex(buf.readerIndex() + headerBufLen);
-        return new DataStreamReplyHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
-            h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(), header.getSuccess());
-      } else {
-        buf.resetReaderIndex();
-        return null;
-      }
-    } catch (InvalidProtocolBufferException e) {
-      LOG.error("Fail to decode reply header:", e);
-      buf.resetReaderIndex();
-      return null;
-    }
-  }
-
   private final long bytesWritten;
   private final boolean success;
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index c8f8cfc..f2fbc15 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -19,64 +19,14 @@
 package org.apache.ratis.protocol;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.io.WriteOption;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The header format is the same {@link DataStreamPacketHeader}
  * since there are no additional fields.
  */
 public class DataStreamRequestHeader extends DataStreamPacketHeader implements DataStreamRequest {
-  private static final Logger LOG = LoggerFactory.getLogger(DataStreamRequestHeader.class);
-
-  public static DataStreamRequestHeader read(ByteBuf buf) {
-    if (getSizeOfHeaderBodyLen() > buf.readableBytes()) {
-      return null;
-    }
-
-    long headerBodyBufLen = buf.readLong();
-    if (headerBodyBufLen > buf.readableBytes()) {
-      buf.resetReaderIndex();
-      return null;
-    }
-
-    int headerBufLen = buf.readInt();
-    if (headerBufLen > buf.readableBytes()) {
-      buf.resetReaderIndex();
-      return null;
-    }
-
-    try {
-      ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
-      DataStreamRequestHeaderProto header = DataStreamRequestHeaderProto.parseFrom(headerBuf.nioBuffer());
-
-      final DataStreamPacketHeaderProto h = header.getPacketHeader();
-      if (h.getDataLength() + headerBufLen <= buf.readableBytes()) {
-        buf.readerIndex(buf.readerIndex() + headerBufLen);
-        WriteOption[] options = new WriteOption[h.getOptionsCount()];
-        for (int i = 0; i < options.length; i++) {
-          options[i] = StandardWriteOption.values()[h.getOptions(i).ordinal()];
-        }
-
-        return new DataStreamRequestHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
-            h.getStreamOffset(), h.getDataLength(), options);
-      } else {
-        buf.resetReaderIndex();
-        return null;
-      }
-    } catch (InvalidProtocolBufferException e) {
-      LOG.error("Fail to decode request header:", e);
-      buf.resetReaderIndex();
-      return null;
-    }
-  }
 
   private final WriteOption[] options;
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index d463b07..5c93f2a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -27,14 +27,18 @@ import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
 import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamPacketHeader;
 import org.apache.ratis.protocol.DataStreamReplyHeader;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.ratis.thirdparty.io.netty.channel.DefaultFileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.Optional;
@@ -42,6 +46,8 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 public interface NettyDataStreamUtils {
+  Logger LOG = LoggerFactory.getLogger(NettyDataStreamUtils.class);
+
   static ByteBuffer getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
     DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
         .newBuilder()
@@ -119,12 +125,54 @@ public interface NettyDataStreamUtils {
   }
 
   static DataStreamRequestByteBuf decodeDataStreamRequestByteBuf(ByteBuf buf) {
-    return Optional.ofNullable(DataStreamRequestHeader.read(buf))
+    return Optional.ofNullable(decodeDataStreamRequestHeader(buf))
         .map(header -> checkHeader(header, buf))
         .map(header -> new DataStreamRequestByteBuf(header, decodeData(buf, header, ByteBuf::retain)))
         .orElse(null);
   }
 
+  static DataStreamRequestHeader decodeDataStreamRequestHeader(ByteBuf buf) {
+    if (DataStreamPacketHeader.getSizeOfHeaderBodyLen() > buf.readableBytes()) {
+      return null;
+    }
+
+    long headerBodyBufLen = buf.readLong();
+    if (headerBodyBufLen > buf.readableBytes()) {
+      buf.resetReaderIndex();
+      return null;
+    }
+
+    int headerBufLen = buf.readInt();
+    if (headerBufLen > buf.readableBytes()) {
+      buf.resetReaderIndex();
+      return null;
+    }
+
+    try {
+      ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
+      DataStreamRequestHeaderProto header = DataStreamRequestHeaderProto.parseFrom(headerBuf.nioBuffer());
+
+      final DataStreamPacketHeaderProto h = header.getPacketHeader();
+      if (h.getDataLength() + headerBufLen <= buf.readableBytes()) {
+        buf.readerIndex(buf.readerIndex() + headerBufLen);
+        WriteOption[] options = new WriteOption[h.getOptionsCount()];
+        for (int i = 0; i < options.length; i++) {
+          options[i] = StandardWriteOption.values()[h.getOptions(i).ordinal()];
+        }
+
+        return new DataStreamRequestHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
+            h.getStreamOffset(), h.getDataLength(), options);
+      } else {
+        buf.resetReaderIndex();
+        return null;
+      }
+    } catch (InvalidProtocolBufferException e) {
+      LOG.error("Fail to decode request header:", e);
+      buf.resetReaderIndex();
+      return null;
+    }
+  }
+
   static ByteBuffer copy(ByteBuf buf) {
     final byte[] bytes = new byte[buf.readableBytes()];
     buf.readBytes(bytes);
@@ -132,7 +180,7 @@ public interface NettyDataStreamUtils {
   }
 
   static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf buf) {
-    return Optional.ofNullable(DataStreamReplyHeader.read(buf))
+    return Optional.ofNullable(decodeDataStreamReplyHeader(buf))
         .map(header -> checkHeader(header, buf))
         .map(header -> DataStreamReplyByteBuffer.newBuilder()
             .setDataStreamReplyHeader(header)
@@ -141,6 +189,37 @@ public interface NettyDataStreamUtils {
         .orElse(null);
   }
 
+  static DataStreamReplyHeader decodeDataStreamReplyHeader(ByteBuf buf) {
+    if (DataStreamPacketHeader.getSizeOfHeaderLen() > buf.readableBytes()) {
+      return null;
+    }
+
+    int headerBufLen = buf.readInt();
+    if (headerBufLen > buf.readableBytes()) {
+      buf.resetReaderIndex();
+      return null;
+    }
+
+    try {
+      ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
+      DataStreamReplyHeaderProto header = DataStreamReplyHeaderProto.parseFrom(headerBuf.nioBuffer());
+
+      final DataStreamPacketHeaderProto h = header.getPacketHeader();
+      if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
+        buf.readerIndex(buf.readerIndex() + headerBufLen);
+        return new DataStreamReplyHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
+            h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(), header.getSuccess());
+      } else {
+        buf.resetReaderIndex();
+        return null;
+      }
+    } catch (InvalidProtocolBufferException e) {
+      LOG.error("Fail to decode reply header:", e);
+      buf.resetReaderIndex();
+      return null;
+    }
+  }
+
   static <HEADER extends DataStreamPacketHeader> HEADER checkHeader(HEADER header, ByteBuf buf) {
     if (header == null) {
       return null;