You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/26 02:12:50 UTC
[incubator-ratis] branch master updated: RATIS-1179. Add a new API
to DataStreamOutput for sending a File. (#299)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1ec750d RATIS-1179. Add a new API to DataStreamOutput for sending a File. (#299)
1ec750d is described below
commit 1ec750d3d30a510dd69fba8b3a689e4adc96827b
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 26 10:12:41 2020 +0800
RATIS-1179. Add a new API to DataStreamOutput for sending a File. (#299)
---
.../apache/ratis/client/api/DataStreamOutput.java | 39 +++++++++++--
.../apache/ratis/client/impl/ClientProtoUtils.java | 7 ++-
.../ratis/client/impl/DataStreamClientImpl.java | 32 +++++++----
.../ratis/client/impl/OrderedStreamAsync.java | 39 +++++++++----
.../impl/DataStreamRequestByteBuffer.java | 8 ++-
...ava => DataStreamRequestFilePositionCount.java} | 26 ++++++---
.../FilePositionCount.java} | 42 ++++++++++----
.../ratis/protocol/DataStreamPacketHeader.java | 2 +-
.../ratis/protocol/DataStreamReplyHeader.java | 19 +++----
.../ratis/protocol/DataStreamRequestHeader.java | 16 +++---
.../apache/ratis/netty/NettyDataStreamUtils.java | 23 ++++++--
.../ratis/netty/client/NettyClientStreamRpc.java | 11 ++++
.../ratis/datastream/DataStreamClusterTests.java | 64 +++++++++++++++++-----
.../ratis/datastream/DataStreamTestUtils.java | 15 +++--
14 files changed, 249 insertions(+), 94 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 579367a..c054d35 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -18,9 +18,11 @@
package org.apache.ratis.client.api;
import org.apache.ratis.io.CloseAsync;
+import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
+import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
@@ -28,21 +30,46 @@ import java.util.concurrent.CompletableFuture;
/** An asynchronous output stream supporting zero buffer copying. */
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
/**
- * This method is the same as writeAsync(buffer, sync_default),
+ * This method is the same as writeAsync(src, sync_default),
* where sync_default depends on the underlying implementation.
*/
- default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer) {
- return writeAsync(buffer, false);
+ default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src) {
+ return writeAsync(src, false);
}
/**
- * Send out the data in the buffer asynchronously.
+ * Send out the data in the source buffer asynchronously.
*
- * @param buffer the data to be sent.
+ * @param src the source buffer to be sent.
* @param sync Should the data be sync'ed to the underlying storage?
* @return a future of the reply.
*/
- CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer, boolean sync);
+ CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean sync);
+
+
+ /**
+ * The same as writeAsync(src, 0, src.length(), sync_default).
+ * where sync_default depends on the underlying implementation.
+ */
+ default CompletableFuture<DataStreamReply> writeAsync(File src) {
+ return writeAsync(src, 0, src.length(), false);
+ }
+
+ /**
+ * The same as writeAsync(FilePositionCount.valueOf(src, position, count), sync).
+ */
+ default CompletableFuture<DataStreamReply> writeAsync(File src, long position, long count, boolean sync) {
+ return writeAsync(FilePositionCount.valueOf(src, position, count), sync);
+ }
+
+ /**
+ * Send out the data in the source file asynchronously.
+ *
+ * @param src the source file with the starting position and the number of bytes.
+ * @param sync Should the data be sync'ed to the underlying storage?
+ * @return a future of the reply.
+ */
+ CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean sync);
/**
* Return the future of the {@link RaftClientReply}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 0807b3b..1a8ebc9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -121,8 +121,11 @@ public interface ClientProtoUtils {
request.getSlidingWindowEntry());
}
- static RaftClientRequestProto toRaftClientRequestProto(
- RaftClientRequest request) {
+ static ByteBuffer toRaftClientRequestProtoByteBuffer(RaftClientRequest request) {
+ return toRaftClientRequestProto(request).toByteString().asReadOnlyByteBuffer();
+ }
+
+ static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) {
final RaftClientRequestProto.Builder b = RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(request));
if (request.getMessage() != null) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index ffae192..e6d5851 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -21,11 +21,12 @@ import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
@@ -95,35 +96,44 @@ public class DataStreamClientImpl implements DataStreamClient {
private DataStreamOutputImpl(RaftClientRequest request) {
this.header = request;
- this.headerFuture = orderedStreamAsync.sendRequest(request.getCallId(), -1,
- ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
+ final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
+ this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
}
- private CompletableFuture<DataStreamReply> send(Type type, ByteBuffer buffer) {
- return orderedStreamAsync.sendRequest(header.getCallId(), streamOffset, buffer, type);
+ private CompletableFuture<DataStreamReply> send(Type type, Object data, long length) {
+ final DataStreamRequestHeader h = new DataStreamRequestHeader(type, header.getCallId(), streamOffset, length);
+ return orderedStreamAsync.sendRequest(h, data);
}
private CompletableFuture<DataStreamReply> send(Type type) {
- return combineHeader(send(type, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER));
+ return combineHeader(send(type, null, 0));
}
private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataStreamReply> future) {
return future.thenCombine(headerFuture, (reply, headerReply) -> headerReply.isSuccess()? reply : headerReply);
}
- // send to the attached dataStreamClientRpc
- @Override
- public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf, boolean sync) {
+ private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, boolean sync) {
if (isClosed()) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
clientId + ": stream already closed, request=" + header));
}
- final CompletableFuture<DataStreamReply> f = send(sync ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA, buf);
- streamOffset += buf.remaining();
+ final CompletableFuture<DataStreamReply> f = send(sync ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA, data, length);
+ streamOffset += length;
return combineHeader(f);
}
@Override
+ public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean sync) {
+ return writeAsyncImpl(src, src.remaining(), sync);
+ }
+
+ @Override
+ public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean sync) {
+ return writeAsyncImpl(src, src.getCount(), sync);
+ }
+
+ @Override
public CompletableFuture<DataStreamReply> closeAsync() {
return closeSupplier.get();
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 1318d4b..10c4db4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -20,10 +20,14 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
+import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
@@ -38,16 +42,29 @@ import java.util.function.LongFunction;
public class OrderedStreamAsync {
public static final Logger LOG = LoggerFactory.getLogger(OrderedStreamAsync.class);
- static class DataStreamWindowRequest extends DataStreamRequestByteBuffer
- implements SlidingWindow.ClientSideRequest<DataStreamReply> {
+ static class DataStreamWindowRequest implements SlidingWindow.ClientSideRequest<DataStreamReply> {
+ private final DataStreamRequestHeader header;
+ private final Object data;
private final long seqNum;
private final CompletableFuture<DataStreamReply> replyFuture = new CompletableFuture<>();
- DataStreamWindowRequest(Type type, long streamId, long offset, ByteBuffer data, long seqNum) {
- super(type, streamId, offset, data);
+ DataStreamWindowRequest(DataStreamRequestHeader header, Object data, long seqNum) {
+ this.header = header;
+ this.data = data;
this.seqNum = seqNum;
}
+ DataStreamRequest getDataStreamRequest() {
+ if (header.getDataLength() == 0) {
+ return new DataStreamRequestByteBuffer(header, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+ } else if (data instanceof ByteBuffer) {
+ return new DataStreamRequestByteBuffer(header, (ByteBuffer)data);
+ } else if (data instanceof FilePositionCount) {
+ return new DataStreamRequestFilePositionCount(header, (FilePositionCount)data);
+ }
+ throw new IllegalStateException("Unexpected " + data.getClass());
+ }
+
@Override
public void setFirstRequest() {
}
@@ -87,16 +104,15 @@ public class OrderedStreamAsync {
this.requestSemaphore = new Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties));
}
- CompletableFuture<DataStreamReply> sendRequest(long streamId, long offset, ByteBuffer data, Type type){
- final int length = data.remaining();
+ CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data) {
try {
requestSemaphore.acquire();
} catch (InterruptedException e){
return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
- "Interrupted when sending streamId=" + streamId + ", offset= " + offset + ", length=" + length, e));
+ "Interrupted when sending " + JavaUtils.getClassSimpleName(data.getClass()) + ", header= " + header, e));
}
final LongFunction<DataStreamWindowRequest> constructor
- = seqNum -> new DataStreamWindowRequest(type, streamId, offset, data.slice(), seqNum);
+ = seqNum -> new DataStreamWindowRequest(header, data, seqNum);
return slidingWindow.submitNewRequest(constructor, this::sendRequestToNetwork).
getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
}
@@ -109,7 +125,8 @@ public class OrderedStreamAsync {
if(slidingWindow.isFirst(request.getSeqNum())){
request.setFirstRequest();
}
- final CompletableFuture<DataStreamReply> requestFuture = dataStreamClientRpc.streamAsync(request);
+ final CompletableFuture<DataStreamReply> requestFuture = dataStreamClientRpc.streamAsync(
+ request.getDataStreamRequest());
requestFuture.thenApply(reply -> {
slidingWindow.receiveReply(
request.getSeqNum(), reply, this::sendRequestToNetwork);
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 584f567..0de14fc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -18,7 +18,8 @@
package org.apache.ratis.datastream.impl;
import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.util.Preconditions;
import java.nio.ByteBuffer;
@@ -28,7 +29,8 @@ import java.nio.ByteBuffer;
* This class is immutable.
*/
public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
- public DataStreamRequestByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
- super(type, streamId, streamOffset, buffer);
+ public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) {
+ super(header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
+ Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
similarity index 56%
copy from ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
copy to ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index 584f567..8dd3b8f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -17,18 +17,30 @@
*/
package org.apache.ratis.datastream.impl;
+import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-
-import java.nio.ByteBuffer;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
/**
- * Implements {@link DataStreamRequest} with {@link ByteBuffer}.
+ * Implements {@link DataStreamRequest} with {@link FilePositionCount}.
*
* This class is immutable.
*/
-public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
- public DataStreamRequestByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
- super(type, streamId, streamOffset, buffer);
+public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl implements DataStreamRequest {
+ private final FilePositionCount file;
+
+ public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) {
+ super(header.getType(), header.getStreamId(), header.getStreamOffset());
+ this.file = file;
+ }
+
+ @Override
+ public long getDataLength() {
+ return file.getCount();
+ }
+
+ /** @return the file with the starting position and the byte count. */
+ public FilePositionCount getFile() {
+ return file;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/io/FilePositionCount.java
similarity index 50%
copy from ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
copy to ratis-common/src/main/java/org/apache/ratis/io/FilePositionCount.java
index 584f567..150e042 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/FilePositionCount.java
@@ -15,20 +15,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.datastream.impl;
+package org.apache.ratis.io;
-import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-
-import java.nio.ByteBuffer;
+import java.io.File;
/**
- * Implements {@link DataStreamRequest} with {@link ByteBuffer}.
+ * Encapsulate a {@link File} with a starting position and a byte count.
*
- * This class is immutable.
+ * The class is immutable.
*/
-public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
- public DataStreamRequestByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
- super(type, streamId, streamOffset, buffer);
+public final class FilePositionCount {
+ public static FilePositionCount valueOf(File file, long position, long count) {
+ return new FilePositionCount(file, position, count);
+ }
+
+ private final File file;
+ private final long position;
+ private final long count;
+
+ private FilePositionCount(File file, long position, long count) {
+ this.file = file;
+ this.position = position;
+ this.count = count;
+ }
+
+ /** @return the file. */
+ public File getFile() {
+ return file;
+ }
+
+ /** @return the starting position. */
+ public long getPosition() {
+ return position;
+ }
+
+ /** @return the byte count. */
+ public long getCount() {
+ return count;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index e93b178..18a2b6c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -28,7 +28,7 @@ public class DataStreamPacketHeader extends DataStreamPacketImpl {
private final long dataLength;
- public DataStreamPacketHeader(long streamId, long streamOffset, long dataLength, Type type) {
+ public DataStreamPacketHeader(Type type, long streamId, long streamOffset, long dataLength) {
super(type, streamId, streamOffset);
this.dataLength = dataLength;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index e829b9f..d02c933 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -18,8 +18,9 @@
package org.apache.ratis.protocol;
-import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
@@ -44,15 +45,11 @@ public class DataStreamReplyHeader extends DataStreamPacketHeader implements Dat
ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
DataStreamReplyHeaderProto header = DataStreamReplyHeaderProto.parseFrom(headerBuf.nioBuffer());
+ final DataStreamPacketHeaderProto h = header.getPacketHeader();
if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
buf.readerIndex(buf.readerIndex() + headerBufLen);
- return new DataStreamReplyHeader(
- header.getPacketHeader().getStreamId(),
- header.getPacketHeader().getStreamOffset(),
- header.getPacketHeader().getDataLength(),
- header.getPacketHeader().getType(),
- header.getBytesWritten(),
- header.getSuccess());
+ return new DataStreamReplyHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength(),
+ header.getBytesWritten(), header.getSuccess());
} else {
buf.resetReaderIndex();
return null;
@@ -67,9 +64,9 @@ public class DataStreamReplyHeader extends DataStreamPacketHeader implements Dat
private final long bytesWritten;
private final boolean success;
- public DataStreamReplyHeader(
- long streamId, long streamOffset, long dataLength, Type type, long bytesWritten, boolean success) {
- super(streamId, streamOffset, dataLength, type);
+ public DataStreamReplyHeader(Type type, long streamId, long streamOffset, long dataLength,
+ long bytesWritten, boolean success) {
+ super(type, streamId, streamOffset, dataLength);
this.bytesWritten = bytesWritten;
this.success = success;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index b87d68f..7781cd6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,8 +18,9 @@
package org.apache.ratis.protocol;
-import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
@@ -47,13 +48,10 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
DataStreamRequestHeaderProto header = DataStreamRequestHeaderProto.parseFrom(headerBuf.nioBuffer());
- if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
+ final DataStreamPacketHeaderProto h = header.getPacketHeader();
+ if (h.getDataLength() + headerBufLen <= buf.readableBytes()) {
buf.readerIndex(buf.readerIndex() + headerBufLen);
- return new DataStreamRequestHeader(
- header.getPacketHeader().getStreamId(),
- header.getPacketHeader().getStreamOffset(),
- header.getPacketHeader().getDataLength(),
- header.getPacketHeader().getType());
+ return new DataStreamRequestHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength());
} else {
buf.resetReaderIndex();
return null;
@@ -65,7 +63,7 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
}
}
- public DataStreamRequestHeader(long streamId, long streamOffset, long dataLength, Type type) {
- super(streamId, streamOffset, dataLength, type);
+ public DataStreamRequestHeader(Type type, long streamId, long streamOffset, long dataLength) {
+ super(type, streamId, streamOffset, dataLength);
}
}
\ No newline at end of file
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index a3f850d..ccf25c7 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -19,16 +19,20 @@ package org.apache.ratis.netty;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
+import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
import org.apache.ratis.protocol.DataStreamPacketHeader;
import org.apache.ratis.protocol.DataStreamReplyHeader;
+import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.thirdparty.io.netty.channel.DefaultFileRegion;
import java.nio.ByteBuffer;
import java.util.Optional;
@@ -36,8 +40,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
public interface NettyDataStreamUtils {
-
- static ByteBuffer getDataStreamRequestHeaderProtoByteBuf(DataStreamRequestByteBuffer request) {
+ static ByteBuffer getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
.newBuilder()
.setStreamId(request.getStreamId())
@@ -69,16 +72,28 @@ public interface NettyDataStreamUtils {
.asReadOnlyByteBuffer();
}
- static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer request, Consumer<ByteBuf> out,
+ static void encodeDataStreamRequestHeader(DataStreamRequest request, Consumer<Object> out,
ByteBufAllocator allocator) {
- ByteBuffer headerBuf = getDataStreamRequestHeaderProtoByteBuf(request);
+ final ByteBuffer headerBuf = getDataStreamRequestHeaderProtoByteBuffer(request);
final ByteBuf headerLenBuf = allocator.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
headerLenBuf.writeInt(headerBuf.remaining());
out.accept(headerLenBuf);
out.accept(Unpooled.wrappedBuffer(headerBuf));
+ }
+
+ static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer request, Consumer<Object> out,
+ ByteBufAllocator allocator) {
+ encodeDataStreamRequestHeader(request, out, allocator);
out.accept(Unpooled.wrappedBuffer(request.slice()));
}
+ static void encodeDataStreamRequestFilePositionCount(
+ DataStreamRequestFilePositionCount request, Consumer<Object> out, ByteBufAllocator allocator) {
+ encodeDataStreamRequestHeader(request, out, allocator);
+ final FilePositionCount f = request.getFile();
+ out.accept(new DefaultFileRegion(f.getFile(), f.getPosition(), f.getCount()));
+ }
+
static void encodeDataStreamReplyByteBuffer(DataStreamReplyByteBuffer reply, Consumer<ByteBuf> out,
ByteBufAllocator allocator) {
ByteBuffer headerBuf = getDataStreamReplyHeaderProtoByteBuf(reply);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 17b240b..0c369f5 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -21,6 +21,7 @@ package org.apache.ratis.netty.client;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
@@ -94,6 +95,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(newEncoder());
+ p.addLast(newEncoderDataStreamRequestFilePositionCount());
p.addLast(newDecoder());
p.addLast(getClientHandler());
}
@@ -109,6 +111,15 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
};
}
+ MessageToMessageEncoder<DataStreamRequestFilePositionCount> newEncoderDataStreamRequestFilePositionCount() {
+ return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>() {
+ @Override
+ protected void encode(ChannelHandlerContext ctx, DataStreamRequestFilePositionCount request, List<Object> out) {
+ NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request, out::add, ctx.alloc());
+ }
+ };
+ }
+
ByteToMessageDecoder newDecoder() {
return new ByteToMessageDecoder() {
{
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index c93c294..f46015e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -23,11 +23,15 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
import org.apache.ratis.datastream.DataStreamTestUtils.SingleDataStream;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.util.Timestamp;
+import org.apache.ratis.util.function.CheckedConsumer;
import org.junit.Assert;
import org.junit.Test;
@@ -54,7 +58,16 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
void testStreamWrites(CLUSTER cluster) throws Exception {
waitForLeader(cluster);
runTestDataStreamOutput(cluster);
- runTestTransferTo(cluster);
+
+ // create data file
+ final int size = 10_000_000 + ThreadLocalRandom.current().nextInt(1_000_000);
+ final File f = new File(getTestDir(), "a.txt");
+ DataStreamTestUtils.createFile(f, size);
+
+ for(int i = 0; i < 3; i++) {
+ runTestWriteFile(cluster, i, writeAsyncDefaultFileRegion(f, size));
+ runTestWriteFile(cluster, i, transferToWritableByteChannel(f, size));
+ }
}
void runTestDataStreamOutput(CLUSTER cluster) throws Exception {
@@ -74,13 +87,8 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
assertLogEntry(cluster, request);
}
- void runTestTransferTo(CLUSTER cluster) throws Exception {
- final int size = 4_000_000 + ThreadLocalRandom.current().nextInt(1_000_000);
-
- // create data file
- final File f = new File(getTestDir(), "a.txt");
- DataStreamTestUtils.createFile(f, size);
-
+ void runTestWriteFile(CLUSTER cluster, int i,
+ CheckedConsumer<DataStreamOutputImpl, Exception> testCase) throws Exception {
final RaftClientRequest request;
final CompletableFuture<RaftClientReply> reply;
try (RaftClient client = cluster.createClient()) {
@@ -88,11 +96,9 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
request = out.getHeader();
reply = out.getRaftClientReplyFuture();
- // write using transferTo WritableByteChannel
- try(FileInputStream in = new FileInputStream(f)) {
- final long transferred = in.getChannel().transferTo(0, size, out.getWritableByteChannel());
- Assert.assertEquals(size, transferred);
- }
+ final Timestamp start = Timestamp.currentTime();
+ testCase.accept(out);
+ LOG.info("{}: {} elapsed {}ms", i, testCase, start.elapsedTimeMs());
}
}
@@ -100,6 +106,38 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
assertLogEntry(cluster, request);
}
+ static CheckedConsumer<DataStreamOutputImpl, Exception> transferToWritableByteChannel(File f, int size) {
+ return new CheckedConsumer<DataStreamOutputImpl, Exception>() {
+ @Override
+ public void accept(DataStreamOutputImpl out) throws Exception {
+ try (FileInputStream in = new FileInputStream(f)) {
+ final long transferred = in.getChannel().transferTo(0, size, out.getWritableByteChannel());
+ Assert.assertEquals(size, transferred);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "transferToWritableByteChannel";
+ }
+ };
+ }
+
+ static CheckedConsumer<DataStreamOutputImpl, Exception> writeAsyncDefaultFileRegion(File f, int size) {
+ return new CheckedConsumer<DataStreamOutputImpl, Exception>() {
+ @Override
+ public void accept(DataStreamOutputImpl out) {
+ final DataStreamReply dataStreamReply = out.writeAsync(f).join();
+ DataStreamTestUtils.assertSuccessReply(Type.STREAM_DATA, size, dataStreamReply);
+ }
+
+ @Override
+ public String toString() {
+ return "writeAsyncDefaultFileRegion";
+ }
+ };
+ }
+
void watchOrSleep(CLUSTER cluster, long index) throws Exception {
try (RaftClient client = cluster.createClient()) {
client.async().watch(index, ReplicationLevel.ALL).join();
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 73d4b24..47d88d4 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -265,21 +265,24 @@ public interface DataStreamTestUtils {
{ // check header
final DataStreamReply reply = out.getHeaderFuture().join();
- Assert.assertTrue(reply.isSuccess());
- Assert.assertEquals(0, reply.getBytesWritten());
- Assert.assertEquals(reply.getType(), Type.STREAM_HEADER);
+ assertSuccessReply(Type.STREAM_HEADER, 0, reply);
}
// check writeAsync requests
for (int i = 0; i < futures.size(); i++) {
final DataStreamReply reply = futures.get(i).join();
- Assert.assertTrue(reply.isSuccess());
- Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
- Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA);
+ final Type expectedType = i == futures.size() - 1 ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA;
+ assertSuccessReply(expectedType, sizes.get(i).longValue(), reply);
}
return dataSize;
}
+ static void assertSuccessReply(Type expectedType, long expectedBytesWritten, DataStreamReply reply) {
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertEquals(expectedBytesWritten, reply.getBytesWritten());
+ Assert.assertEquals(expectedType, reply.getType());
+ }
+
static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(
Iterable<RaftServer> servers, RaftPeerId leader, DataStreamOutputImpl out, int bufferSize, int bufferNum) {
LOG.info("start Stream{}", out.getHeader().getCallId());