You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2021/11/30 09:45:04 UTC
[ratis] branch master updated: RATIS-1449. Move netty related code out from ratis-common. (#543)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 10a8311 RATIS-1449. Move netty related code out from ratis-common. (#543)
10a8311 is described below
commit 10a8311b9ef425fa841a3b520cec5420b92797c5
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Nov 30 17:44:59 2021 +0800
RATIS-1449. Move netty related code out from ratis-common. (#543)
---
.../ratis/protocol/DataStreamReplyHeader.java | 39 ----------
.../ratis/protocol/DataStreamRequestHeader.java | 50 -------------
.../apache/ratis/netty/NettyDataStreamUtils.java | 83 +++++++++++++++++++++-
3 files changed, 81 insertions(+), 91 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index 9e3d4d4..f0dee6a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -18,49 +18,10 @@
package org.apache.ratis.protocol;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** The header format is {@link DataStreamPacketHeader}, bytesWritten and flags. */
public class DataStreamReplyHeader extends DataStreamPacketHeader implements DataStreamReply {
- private static final Logger LOG = LoggerFactory.getLogger(DataStreamReplyHeader.class);
-
- public static DataStreamReplyHeader read(ByteBuf buf) {
- if (getSizeOfHeaderLen() > buf.readableBytes()) {
- return null;
- }
-
- int headerBufLen = buf.readInt();
- if (headerBufLen > buf.readableBytes()) {
- buf.resetReaderIndex();
- return null;
- }
-
- try {
- ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
- DataStreamReplyHeaderProto header = DataStreamReplyHeaderProto.parseFrom(headerBuf.nioBuffer());
-
- final DataStreamPacketHeaderProto h = header.getPacketHeader();
- if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
- buf.readerIndex(buf.readerIndex() + headerBufLen);
- return new DataStreamReplyHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
- h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(), header.getSuccess());
- } else {
- buf.resetReaderIndex();
- return null;
- }
- } catch (InvalidProtocolBufferException e) {
- LOG.error("Fail to decode reply header:", e);
- buf.resetReaderIndex();
- return null;
- }
- }
-
private final long bytesWritten;
private final boolean success;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index c8f8cfc..f2fbc15 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -19,64 +19,14 @@
package org.apache.ratis.protocol;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The header format is the same {@link DataStreamPacketHeader}
* since there are no additional fields.
*/
public class DataStreamRequestHeader extends DataStreamPacketHeader implements DataStreamRequest {
- private static final Logger LOG = LoggerFactory.getLogger(DataStreamRequestHeader.class);
-
- public static DataStreamRequestHeader read(ByteBuf buf) {
- if (getSizeOfHeaderBodyLen() > buf.readableBytes()) {
- return null;
- }
-
- long headerBodyBufLen = buf.readLong();
- if (headerBodyBufLen > buf.readableBytes()) {
- buf.resetReaderIndex();
- return null;
- }
-
- int headerBufLen = buf.readInt();
- if (headerBufLen > buf.readableBytes()) {
- buf.resetReaderIndex();
- return null;
- }
-
- try {
- ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
- DataStreamRequestHeaderProto header = DataStreamRequestHeaderProto.parseFrom(headerBuf.nioBuffer());
-
- final DataStreamPacketHeaderProto h = header.getPacketHeader();
- if (h.getDataLength() + headerBufLen <= buf.readableBytes()) {
- buf.readerIndex(buf.readerIndex() + headerBufLen);
- WriteOption[] options = new WriteOption[h.getOptionsCount()];
- for (int i = 0; i < options.length; i++) {
- options[i] = StandardWriteOption.values()[h.getOptions(i).ordinal()];
- }
-
- return new DataStreamRequestHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
- h.getStreamOffset(), h.getDataLength(), options);
- } else {
- buf.resetReaderIndex();
- return null;
- }
- } catch (InvalidProtocolBufferException e) {
- LOG.error("Fail to decode request header:", e);
- buf.resetReaderIndex();
- return null;
- }
- }
private final WriteOption[] options;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index d463b07..5c93f2a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -27,14 +27,18 @@ import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamPacketHeader;
import org.apache.ratis.protocol.DataStreamReplyHeader;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.thirdparty.io.netty.channel.DefaultFileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Optional;
@@ -42,6 +46,8 @@ import java.util.function.Consumer;
import java.util.function.Function;
public interface NettyDataStreamUtils {
+ Logger LOG = LoggerFactory.getLogger(NettyDataStreamUtils.class);
+
static ByteBuffer getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
.newBuilder()
@@ -119,12 +125,54 @@ public interface NettyDataStreamUtils {
}
static DataStreamRequestByteBuf decodeDataStreamRequestByteBuf(ByteBuf buf) {
- return Optional.ofNullable(DataStreamRequestHeader.read(buf))
+ return Optional.ofNullable(decodeDataStreamRequestHeader(buf))
.map(header -> checkHeader(header, buf))
.map(header -> new DataStreamRequestByteBuf(header, decodeData(buf, header, ByteBuf::retain)))
.orElse(null);
}
+ static DataStreamRequestHeader decodeDataStreamRequestHeader(ByteBuf buf) {
+ if (DataStreamPacketHeader.getSizeOfHeaderBodyLen() > buf.readableBytes()) {
+ return null;
+ }
+
+ long headerBodyBufLen = buf.readLong();
+ if (headerBodyBufLen > buf.readableBytes()) {
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ int headerBufLen = buf.readInt();
+ if (headerBufLen > buf.readableBytes()) {
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ try {
+ ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
+ DataStreamRequestHeaderProto header = DataStreamRequestHeaderProto.parseFrom(headerBuf.nioBuffer());
+
+ final DataStreamPacketHeaderProto h = header.getPacketHeader();
+ if (h.getDataLength() + headerBufLen <= buf.readableBytes()) {
+ buf.readerIndex(buf.readerIndex() + headerBufLen);
+ WriteOption[] options = new WriteOption[h.getOptionsCount()];
+ for (int i = 0; i < options.length; i++) {
+ options[i] = StandardWriteOption.values()[h.getOptions(i).ordinal()];
+ }
+
+ return new DataStreamRequestHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
+ h.getStreamOffset(), h.getDataLength(), options);
+ } else {
+ buf.resetReaderIndex();
+ return null;
+ }
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Fail to decode request header:", e);
+ buf.resetReaderIndex();
+ return null;
+ }
+ }
+
static ByteBuffer copy(ByteBuf buf) {
final byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
@@ -132,7 +180,7 @@ public interface NettyDataStreamUtils {
}
static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf buf) {
- return Optional.ofNullable(DataStreamReplyHeader.read(buf))
+ return Optional.ofNullable(decodeDataStreamReplyHeader(buf))
.map(header -> checkHeader(header, buf))
.map(header -> DataStreamReplyByteBuffer.newBuilder()
.setDataStreamReplyHeader(header)
@@ -141,6 +189,37 @@ public interface NettyDataStreamUtils {
.orElse(null);
}
+ static DataStreamReplyHeader decodeDataStreamReplyHeader(ByteBuf buf) {
+ if (DataStreamPacketHeader.getSizeOfHeaderLen() > buf.readableBytes()) {
+ return null;
+ }
+
+ int headerBufLen = buf.readInt();
+ if (headerBufLen > buf.readableBytes()) {
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ try {
+ ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
+ DataStreamReplyHeaderProto header = DataStreamReplyHeaderProto.parseFrom(headerBuf.nioBuffer());
+
+ final DataStreamPacketHeaderProto h = header.getPacketHeader();
+ if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
+ buf.readerIndex(buf.readerIndex() + headerBufLen);
+ return new DataStreamReplyHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
+ h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(), header.getSuccess());
+ } else {
+ buf.resetReaderIndex();
+ return null;
+ }
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Fail to decode reply header:", e);
+ buf.resetReaderIndex();
+ return null;
+ }
+ }
+
static <HEADER extends DataStreamPacketHeader> HEADER checkHeader(HEADER header, ByteBuf buf) {
if (header == null) {
return null;