You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/15 01:28:16 UTC
[incubator-ratis] branch master updated: RATIS-1155. Add a builder
for DataStreamReplyByteBuffer. (#276)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a43f36a RATIS-1155. Add a builder for DataStreamReplyByteBuffer. (#276)
a43f36a is described below
commit a43f36a1590aee59b612d6775131ebb1fc7262aa
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Nov 15 09:28:09 2020 +0800
RATIS-1155. Add a builder for DataStreamReplyByteBuffer. (#276)
* RATIS-1155. Add a builder for DataStreamReplyByteBuffer.
* Fix checkstyle.
---
.../ratis/client/impl/OrderedStreamAsync.java | 6 +-
.../impl/DataStreamPacketByteBuffer.java | 4 +-
.../datastream/impl/DataStreamPacketImpl.java | 27 ++++---
.../datastream/impl/DataStreamReplyByteBuffer.java | 85 +++++++++++++++++++---
.../impl/DataStreamRequestByteBuffer.java | 4 +-
.../apache/ratis/protocol/DataStreamPacket.java | 4 +-
.../ratis/protocol/DataStreamPacketHeader.java | 2 +-
.../apache/ratis/netty/NettyDataStreamUtils.java | 5 +-
.../ratis/netty/server/DataStreamManagement.java | 58 +++++++++------
.../netty/server/DataStreamRequestByteBuf.java | 6 +-
.../ratis/netty/server/NettyServerStreamRpc.java | 4 +-
11 files changed, 142 insertions(+), 63 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 12b2bb7..1318d4b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -43,8 +43,8 @@ public class OrderedStreamAsync {
private final long seqNum;
private final CompletableFuture<DataStreamReply> replyFuture = new CompletableFuture<>();
- DataStreamWindowRequest(long streamId, long offset, ByteBuffer data, long seqNum, Type type){
- super(streamId, offset, data, type);
+ DataStreamWindowRequest(Type type, long streamId, long offset, ByteBuffer data, long seqNum) {
+ super(type, streamId, offset, data);
this.seqNum = seqNum;
}
@@ -96,7 +96,7 @@ public class OrderedStreamAsync {
"Interrupted when sending streamId=" + streamId + ", offset= " + offset + ", length=" + length, e));
}
final LongFunction<DataStreamWindowRequest> constructor
- = seqNum -> new DataStreamWindowRequest(streamId, offset, data.slice(), seqNum, type);
+ = seqNum -> new DataStreamWindowRequest(type, streamId, offset, data.slice(), seqNum);
return slidingWindow.submitNewRequest(constructor, this::sendRequestToNetwork).
getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index 0caa77e..aa8cddb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -28,8 +28,8 @@ public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
private final ByteBuffer buffer;
- public DataStreamPacketByteBuffer(long streamId, long streamOffset, ByteBuffer buffer, Type type) {
- super(streamId, streamOffset, type);
+ protected DataStreamPacketByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
+ super(type, streamId, streamOffset);
this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY_BYTE_BUFFER;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
index 0b51c21..1f528e3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
@@ -27,14 +27,19 @@ import org.apache.ratis.util.JavaUtils;
* This class is immutable.
*/
public abstract class DataStreamPacketImpl implements DataStreamPacket {
+ private final Type type;
private final long streamId;
private final long streamOffset;
- private final Type type;
- public DataStreamPacketImpl(long streamId, long streamOffset, Type type) {
+ public DataStreamPacketImpl(Type type, long streamId, long streamOffset) {
+ this.type = type;
this.streamId = streamId;
this.streamOffset = streamOffset;
- this.type = type;
+ }
+
+ @Override
+ public Type getType() {
+ return type;
}
@Override
@@ -48,17 +53,11 @@ public abstract class DataStreamPacketImpl implements DataStreamPacket {
}
@Override
- public Type getType() {
- return type;
- }
-
- @Override
public String toString() {
- return JavaUtils.getClassSimpleName(getClass()) + "{"
- + "streamId=" + getStreamId()
- + ", streamOffset=" + getStreamOffset()
- + ", dataLength=" + getDataLength()
- + ", type=" + getType()
- + '}';
+ return JavaUtils.getClassSimpleName(getClass()) + ":"
+ + getType()
+ + ",id=" + getStreamId()
+ + ",offset=" + getStreamOffset()
+ + ",length=" + getDataLength();
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 3e0736e..a681716 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.datastream.impl;
+import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamReplyHeader;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
@@ -28,21 +29,83 @@ import java.nio.ByteBuffer;
*
* This class is immutable.
*/
-public class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer implements DataStreamReply {
- private final long bytesWritten;
+public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer implements DataStreamReply {
+ public static final class Builder {
+ private Type type;
+ private long streamId;
+ private long streamOffset;
+ private ByteBuffer buffer;
+
+ private boolean success;
+ private long bytesWritten;
+
+ private Builder() {}
+
+ public Builder setType(Type type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder setStreamId(long streamId) {
+ this.streamId = streamId;
+ return this;
+ }
+
+ public Builder setStreamOffset(long streamOffset) {
+ this.streamOffset = streamOffset;
+ return this;
+ }
+
+ public Builder setBuffer(ByteBuffer buffer) {
+ this.buffer = buffer;
+ return this;
+ }
+
+ public Builder setSuccess(boolean success) {
+ this.success = success;
+ return this;
+ }
+
+ public Builder setBytesWritten(long bytesWritten) {
+ this.bytesWritten = bytesWritten;
+ return this;
+ }
+
+ public Builder setDataStreamReplyHeader(DataStreamReplyHeader header) {
+ return setDataStreamPacket(header)
+ .setSuccess(header.isSuccess())
+ .setBytesWritten(header.getBytesWritten());
+ }
+
+ public Builder setDataStreamPacket(DataStreamPacket packet) {
+ return setType(packet.getType())
+ .setStreamId(packet.getStreamId())
+ .setStreamOffset(packet.getStreamOffset());
+ }
+
+ public DataStreamReplyByteBuffer build() {
+ return new DataStreamReplyByteBuffer(type, streamId, streamOffset, buffer, success, bytesWritten);
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
private final boolean success;
+ private final long bytesWritten;
- public DataStreamReplyByteBuffer(long streamId, long streamOffset, ByteBuffer buffer,
- long bytesWritten, boolean success, Type type) {
- super(streamId, streamOffset, buffer, type);
+ private DataStreamReplyByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer,
+ boolean success, long bytesWritten) {
+ super(type, streamId, streamOffset, buffer);
this.success = success;
this.bytesWritten = bytesWritten;
}
- public DataStreamReplyByteBuffer(DataStreamReplyHeader header, ByteBuffer buffer) {
- this(header.getStreamId(), header.getStreamOffset(), buffer, header.getBytesWritten(), header.isSuccess(),
- header.getType());
+ @Override
+ public boolean isSuccess() {
+ return success;
}
@Override
@@ -51,7 +114,9 @@ public class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer implem
}
@Override
- public boolean isSuccess() {
- return success;
+ public String toString() {
+ return super.toString()
+ + "," + (success? "SUCCESS": "FAILED")
+ + ",bytesWritten=" + bytesWritten;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 7e63340..584f567 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
* This class is immutable.
*/
public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
- public DataStreamRequestByteBuffer(long streamId, long streamOffset, ByteBuffer buffer, Type type) {
- super(streamId, streamOffset, buffer, type);
+ public DataStreamRequestByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
+ super(type, streamId, streamOffset, buffer);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index 995ed16..9da90c8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -21,11 +21,11 @@ package org.apache.ratis.protocol;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
public interface DataStreamPacket {
+ Type getType();
+
long getStreamId();
long getStreamOffset();
long getDataLength();
-
- Type getType();
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index f3d705a..e93b178 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -29,7 +29,7 @@ public class DataStreamPacketHeader extends DataStreamPacketImpl {
private final long dataLength;
public DataStreamPacketHeader(long streamId, long streamOffset, long dataLength, Type type) {
- super(streamId, streamOffset, type);
+ super(type, streamId, streamOffset);
this.dataLength = dataLength;
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index d3b8834..33d77fa 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -99,7 +99,10 @@ public interface NettyDataStreamUtils {
static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf buf) {
return Optional.ofNullable(DataStreamReplyHeader.read(buf))
.map(header -> checkHeader(header, buf))
- .map(header -> new DataStreamReplyByteBuffer(header, decodeData(buf, header, ByteBuf::nioBuffer)))
+ .map(header -> DataStreamReplyByteBuffer.newBuilder()
+ .setDataStreamReplyHeader(header)
+ .setBuffer(decodeData(buf, header, ByteBuf::nioBuffer))
+ .build())
.orElse(null);
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 3aaad05..8808a95 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -96,9 +96,7 @@ public class DataStreamManagement {
ChannelHandlerContext ctx, Executor executor) {
return out.startTransactionAsync().thenApplyAsync(reply -> {
if (reply.isSuccess()) {
- final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
- ((DataStreamReplyByteBuffer)reply).slice(): null;
- sendReplySuccess(request, buffer, -1, ctx);
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
}
return reply;
}, executor);
@@ -256,26 +254,38 @@ public class DataStreamManagement {
}
}
- static void sendReplyNotSuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, ChannelHandlerContext ctx) {
- final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(), buffer, -1, false, request.getType());
- ctx.writeAndFlush(reply);
+ static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
+ DataStreamRequestByteBuf request, DataStreamReply reply) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ return DataStreamReplyByteBuffer.newBuilder()
+ .setDataStreamPacket(request)
+ .setBuffer(buffer)
+ .setSuccess(reply.isSuccess())
+ .setBytesWritten(reply.getBytesWritten())
+ .build();
}
- static void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, long bytesWritten,
- ChannelHandlerContext ctx) {
- final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(), buffer, bytesWritten, true, request.getType());
- ctx.writeAndFlush(reply);
+ static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
+ DataStreamRequestByteBuf request, RaftClientReply reply) {
+ final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ return DataStreamReplyByteBuffer.newBuilder()
+ .setDataStreamPacket(request)
+ .setBuffer(buffer)
+ .setSuccess(reply.isSuccess())
+ .build();
}
static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
- if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
- sendReplyNotSuccess(request, null, ctx);
- } else {
- sendReplySuccess(request, null, bytesWritten, ctx);
+ final boolean success = checkSuccessRemoteWrite(remoteWrites, bytesWritten);
+ final DataStreamReplyByteBuffer.Builder builder = DataStreamReplyByteBuffer.newBuilder()
+ .setDataStreamPacket(request)
+ .setSuccess(success);
+ if (success) {
+ builder.setBytesWritten(bytesWritten);
}
+ ctx.writeAndFlush(builder.build());
}
private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
@@ -283,21 +293,24 @@ public class DataStreamManagement {
try {
return server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
if (reply.isSuccess()) {
- ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
- sendReplySuccess(request, buffer, -1, ctx);
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
} else if (request.getType() == Type.STREAM_CLOSE) {
// if this server is not the leader, forward start transition to the other peers
// there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
forwardStartTransaction(info, request, reply, ctx, executor);
} else if (request.getType() == Type.START_TRANSACTION){
- ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
- sendReplyNotSuccess(request, buffer, ctx);
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
}, executor);
} catch (IOException e) {
- sendReplyNotSuccess(request, null, ctx);
+ // TODO include IOException in the reply
+ final DataStreamReplyByteBuffer reply = DataStreamReplyByteBuffer.newBuilder()
+ .setDataStreamPacket(request)
+ .setSuccess(false)
+ .build();
+ ctx.writeAndFlush(reply);
return CompletableFuture.completedFuture(null);
}
}
@@ -316,8 +329,7 @@ public class DataStreamManagement {
.findAny().orElse(localReply);
// send reply
- final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(chosen).toByteString().asReadOnlyByteBuffer();
- sendReplyNotSuccess(request, buffer, ctx);
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, chosen));
}
static void forwardStartTransaction(StreamInfo info, DataStreamRequestByteBuf request, RaftClientReply localReply,
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 499e6a5..afe9465 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -33,13 +33,13 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest {
private final ByteBuf buf;
- public DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf buf, Type type) {
- super(streamId, streamOffset, type);
+ public DataStreamRequestByteBuf(Type type, long streamId, long streamOffset, ByteBuf buf) {
+ super(type, streamId, streamOffset);
this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
}
public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
- this(header.getStreamId(), header.getStreamOffset(), buf, header.getType());
+ this(header.getType(), header.getStreamId(), header.getStreamOffset(), buf);
}
@Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 125160d..772e62c 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -172,7 +172,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
};
}
- ByteToMessageDecoder newDecoder() {
+ static ByteToMessageDecoder newDecoder() {
return new ByteToMessageDecoder() {
{
this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
@@ -185,7 +185,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
};
}
- MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
+ static MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
@Override
protected void encode(ChannelHandlerContext context, DataStreamReplyByteBuffer reply, List<Object> out) {