You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/15 01:28:16 UTC

[incubator-ratis] branch master updated: RATIS-1155. Add a builder for DataStreamReplyByteBuffer. (#276)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a43f36a  RATIS-1155. Add a builder for DataStreamReplyByteBuffer. (#276)
a43f36a is described below

commit a43f36a1590aee59b612d6775131ebb1fc7262aa
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Nov 15 09:28:09 2020 +0800

    RATIS-1155. Add a builder for DataStreamReplyByteBuffer. (#276)
    
    * RATIS-1155. Add a builder for DataStreamReplyByteBuffer.
    
    * Fix checkstyle.
---
 .../ratis/client/impl/OrderedStreamAsync.java      |  6 +-
 .../impl/DataStreamPacketByteBuffer.java           |  4 +-
 .../datastream/impl/DataStreamPacketImpl.java      | 27 ++++---
 .../datastream/impl/DataStreamReplyByteBuffer.java | 85 +++++++++++++++++++---
 .../impl/DataStreamRequestByteBuffer.java          |  4 +-
 .../apache/ratis/protocol/DataStreamPacket.java    |  4 +-
 .../ratis/protocol/DataStreamPacketHeader.java     |  2 +-
 .../apache/ratis/netty/NettyDataStreamUtils.java   |  5 +-
 .../ratis/netty/server/DataStreamManagement.java   | 58 +++++++++------
 .../netty/server/DataStreamRequestByteBuf.java     |  6 +-
 .../ratis/netty/server/NettyServerStreamRpc.java   |  4 +-
 11 files changed, 142 insertions(+), 63 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 12b2bb7..1318d4b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -43,8 +43,8 @@ public class OrderedStreamAsync {
     private final long seqNum;
     private final CompletableFuture<DataStreamReply> replyFuture = new CompletableFuture<>();
 
-    DataStreamWindowRequest(long streamId, long offset, ByteBuffer data, long seqNum, Type type){
-      super(streamId, offset, data, type);
+    DataStreamWindowRequest(Type type, long streamId, long offset, ByteBuffer data, long seqNum) {
+      super(type, streamId, offset, data);
       this.seqNum = seqNum;
     }
 
@@ -96,7 +96,7 @@ public class OrderedStreamAsync {
           "Interrupted when sending streamId=" + streamId + ", offset= " + offset + ", length=" + length, e));
     }
     final LongFunction<DataStreamWindowRequest> constructor
-        = seqNum -> new DataStreamWindowRequest(streamId, offset, data.slice(), seqNum, type);
+        = seqNum -> new DataStreamWindowRequest(type, streamId, offset, data.slice(), seqNum);
     return slidingWindow.submitNewRequest(constructor, this::sendRequestToNetwork).
            getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index 0caa77e..aa8cddb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -28,8 +28,8 @@ public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
 
   private final ByteBuffer buffer;
 
-  public DataStreamPacketByteBuffer(long streamId, long streamOffset, ByteBuffer buffer, Type type) {
-    super(streamId, streamOffset, type);
+  protected DataStreamPacketByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
+    super(type, streamId, streamOffset);
     this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY_BYTE_BUFFER;
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
index 0b51c21..1f528e3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
@@ -27,14 +27,19 @@ import org.apache.ratis.util.JavaUtils;
  * This class is immutable.
  */
 public abstract class DataStreamPacketImpl implements DataStreamPacket {
+  private final Type type;
   private final long streamId;
   private final long streamOffset;
-  private final Type type;
 
-  public DataStreamPacketImpl(long streamId, long streamOffset, Type type) {
+  public DataStreamPacketImpl(Type type, long streamId, long streamOffset) {
+    this.type = type;
     this.streamId = streamId;
     this.streamOffset = streamOffset;
-    this.type = type;
+  }
+
+  @Override
+  public Type getType() {
+    return type;
   }
 
   @Override
@@ -48,17 +53,11 @@ public abstract class DataStreamPacketImpl implements DataStreamPacket {
   }
 
   @Override
-  public Type getType() {
-    return type;
-  }
-
-  @Override
   public String toString() {
-    return JavaUtils.getClassSimpleName(getClass()) + "{"
-        + "streamId=" + getStreamId()
-        + ", streamOffset=" + getStreamOffset()
-        + ", dataLength=" + getDataLength()
-        + ", type=" + getType()
-        + '}';
+    return JavaUtils.getClassSimpleName(getClass()) + ":"
+        + getType()
+        + ",id=" + getStreamId()
+        + ",offset=" + getStreamOffset()
+        + ",length=" + getDataLength();
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 3e0736e..a681716 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.datastream.impl;
 
+import org.apache.ratis.protocol.DataStreamPacket;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamReplyHeader;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
@@ -28,21 +29,83 @@ import java.nio.ByteBuffer;
  *
  * This class is immutable.
  */
-public class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer implements DataStreamReply {
-  private final long bytesWritten;
+public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer implements DataStreamReply {
+  public static final class Builder {
+    private Type type;
+    private long streamId;
+    private long streamOffset;
+    private ByteBuffer buffer;
+
+    private boolean success;
+    private long bytesWritten;
+
+    private Builder() {}
+
+    public Builder setType(Type type) {
+      this.type = type;
+      return this;
+    }
+
+    public Builder setStreamId(long streamId) {
+      this.streamId = streamId;
+      return this;
+    }
+
+    public Builder setStreamOffset(long streamOffset) {
+      this.streamOffset = streamOffset;
+      return this;
+    }
+
+    public Builder setBuffer(ByteBuffer buffer) {
+      this.buffer = buffer;
+      return this;
+    }
+
+    public Builder setSuccess(boolean success) {
+      this.success = success;
+      return this;
+    }
+
+    public Builder setBytesWritten(long bytesWritten) {
+      this.bytesWritten = bytesWritten;
+      return this;
+    }
+
+    public Builder setDataStreamReplyHeader(DataStreamReplyHeader header) {
+      return setDataStreamPacket(header)
+          .setSuccess(header.isSuccess())
+          .setBytesWritten(header.getBytesWritten());
+    }
+
+    public Builder setDataStreamPacket(DataStreamPacket packet) {
+      return setType(packet.getType())
+          .setStreamId(packet.getStreamId())
+          .setStreamOffset(packet.getStreamOffset());
+    }
+
+    public DataStreamReplyByteBuffer build() {
+      return new DataStreamReplyByteBuffer(type, streamId, streamOffset, buffer, success, bytesWritten);
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
   private final boolean success;
+  private final long bytesWritten;
 
-  public DataStreamReplyByteBuffer(long streamId, long streamOffset, ByteBuffer buffer,
-      long bytesWritten, boolean success, Type type) {
-    super(streamId, streamOffset, buffer, type);
+  private DataStreamReplyByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer,
+      boolean success, long bytesWritten) {
+    super(type, streamId, streamOffset, buffer);
 
     this.success = success;
     this.bytesWritten = bytesWritten;
   }
 
-  public DataStreamReplyByteBuffer(DataStreamReplyHeader header, ByteBuffer buffer) {
-    this(header.getStreamId(), header.getStreamOffset(), buffer, header.getBytesWritten(), header.isSuccess(),
-        header.getType());
+  @Override
+  public boolean isSuccess() {
+    return success;
   }
 
   @Override
@@ -51,7 +114,9 @@ public class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer implem
   }
 
   @Override
-  public boolean isSuccess() {
-    return success;
+  public String toString() {
+    return super.toString()
+        + "," + (success? "SUCCESS": "FAILED")
+        + ",bytesWritten=" + bytesWritten;
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 7e63340..584f567 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
  * This class is immutable.
  */
 public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
-  public DataStreamRequestByteBuffer(long streamId, long streamOffset, ByteBuffer buffer, Type type) {
-    super(streamId, streamOffset, buffer, type);
+  public DataStreamRequestByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
+    super(type, streamId, streamOffset, buffer);
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index 995ed16..9da90c8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -21,11 +21,11 @@ package org.apache.ratis.protocol;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
 public interface DataStreamPacket {
+  Type getType();
+
   long getStreamId();
 
   long getStreamOffset();
 
   long getDataLength();
-
-  Type getType();
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index f3d705a..e93b178 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -29,7 +29,7 @@ public class DataStreamPacketHeader extends DataStreamPacketImpl {
   private final long dataLength;
 
   public DataStreamPacketHeader(long streamId, long streamOffset, long dataLength, Type type) {
-    super(streamId, streamOffset, type);
+    super(type, streamId, streamOffset);
     this.dataLength = dataLength;
   }
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index d3b8834..33d77fa 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -99,7 +99,10 @@ public interface NettyDataStreamUtils {
   static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf buf) {
     return Optional.ofNullable(DataStreamReplyHeader.read(buf))
         .map(header -> checkHeader(header, buf))
-        .map(header -> new DataStreamReplyByteBuffer(header, decodeData(buf, header, ByteBuf::nioBuffer)))
+        .map(header -> DataStreamReplyByteBuffer.newBuilder()
+            .setDataStreamReplyHeader(header)
+            .setBuffer(decodeData(buf, header, ByteBuf::nioBuffer))
+            .build())
         .orElse(null);
   }
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 3aaad05..8808a95 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -96,9 +96,7 @@ public class DataStreamManagement {
         ChannelHandlerContext ctx, Executor executor) {
       return out.startTransactionAsync().thenApplyAsync(reply -> {
         if (reply.isSuccess()) {
-          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
-              ((DataStreamReplyByteBuffer)reply).slice(): null;
-          sendReplySuccess(request, buffer, -1, ctx);
+          ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
         }
         return reply;
       }, executor);
@@ -256,26 +254,38 @@ public class DataStreamManagement {
     }
   }
 
-  static void sendReplyNotSuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, ChannelHandlerContext ctx) {
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), buffer, -1, false, request.getType());
-    ctx.writeAndFlush(reply);
+  static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
+      DataStreamRequestByteBuf request, DataStreamReply reply) {
+    final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+        ((DataStreamReplyByteBuffer)reply).slice(): null;
+    return DataStreamReplyByteBuffer.newBuilder()
+        .setDataStreamPacket(request)
+        .setBuffer(buffer)
+        .setSuccess(reply.isSuccess())
+        .setBytesWritten(reply.getBytesWritten())
+        .build();
   }
 
-  static void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, long bytesWritten,
-      ChannelHandlerContext ctx) {
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), buffer, bytesWritten, true, request.getType());
-    ctx.writeAndFlush(reply);
+  static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
+      DataStreamRequestByteBuf request, RaftClientReply reply) {
+    final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+    return DataStreamReplyByteBuffer.newBuilder()
+        .setDataStreamPacket(request)
+        .setBuffer(buffer)
+        .setSuccess(reply.isSuccess())
+        .build();
   }
 
   static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
       DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
-    if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
-      sendReplyNotSuccess(request, null, ctx);
-    } else {
-      sendReplySuccess(request, null, bytesWritten, ctx);
+    final boolean success = checkSuccessRemoteWrite(remoteWrites, bytesWritten);
+    final DataStreamReplyByteBuffer.Builder builder = DataStreamReplyByteBuffer.newBuilder()
+        .setDataStreamPacket(request)
+        .setSuccess(success);
+    if (success) {
+      builder.setBytesWritten(bytesWritten);
     }
+    ctx.writeAndFlush(builder.build());
   }
 
   private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
@@ -283,21 +293,24 @@ public class DataStreamManagement {
     try {
       return server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
         if (reply.isSuccess()) {
-          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
-          sendReplySuccess(request, buffer, -1, ctx);
+          ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the other peers
           // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
           forwardStartTransaction(info, request, reply, ctx, executor);
         } else if (request.getType() == Type.START_TRANSACTION){
-          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
-          sendReplyNotSuccess(request, buffer, ctx);
+          ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, null, ctx);
+      // TODO include IOException in the reply
+      final DataStreamReplyByteBuffer reply = DataStreamReplyByteBuffer.newBuilder()
+          .setDataStreamPacket(request)
+          .setSuccess(false)
+          .build();
+      ctx.writeAndFlush(reply);
       return CompletableFuture.completedFuture(null);
     }
   }
@@ -316,8 +329,7 @@ public class DataStreamManagement {
         .findAny().orElse(localReply);
 
     // send reply
-    final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(chosen).toByteString().asReadOnlyByteBuffer();
-    sendReplyNotSuccess(request, buffer, ctx);
+    ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, chosen));
   }
 
   static void forwardStartTransaction(StreamInfo info, DataStreamRequestByteBuf request, RaftClientReply localReply,
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 499e6a5..afe9465 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -33,13 +33,13 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest {
   private final ByteBuf buf;
 
-  public DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf buf, Type type) {
-    super(streamId, streamOffset, type);
+  public DataStreamRequestByteBuf(Type type, long streamId, long streamOffset, ByteBuf buf) {
+    super(type, streamId, streamOffset);
     this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
   }
 
   public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
-    this(header.getStreamId(), header.getStreamOffset(), buf, header.getType());
+    this(header.getType(), header.getStreamId(), header.getStreamOffset(), buf);
   }
 
   @Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 125160d..772e62c 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -172,7 +172,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     };
   }
 
-  ByteToMessageDecoder newDecoder() {
+  static ByteToMessageDecoder newDecoder() {
     return new ByteToMessageDecoder() {
       {
         this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
@@ -185,7 +185,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     };
   }
 
-  MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
+  static MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
     return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
       @Override
       protected void encode(ChannelHandlerContext context, DataStreamReplyByteBuffer reply, List<Object> out) {