You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/17 15:24:52 UTC
[incubator-ratis] branch master updated: RATIS-1196. Save
STREAM_CLOSE RPC call (#339)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1c0646d RATIS-1196. Save STREAM_CLOSE RPC call (#339)
1c0646d is described below
commit 1c0646d46fba1647ceddc3bcd30d09dd67b46476
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Dec 17 23:24:43 2020 +0800
RATIS-1196. Save STREAM_CLOSE RPC call (#339)
---
.../apache/ratis/client/api/DataStreamOutput.java | 25 ++++------
.../ratis/client/impl/DataStreamClientImpl.java | 53 ++++++++++++----------
.../impl/DataStreamRequestByteBuffer.java | 9 ++++
.../impl/DataStreamRequestFilePositionCount.java | 8 ++++
.../StandardWriteOption.java} | 9 ++--
.../DataStreamRequest.java => io/WriteOption.java} | 12 ++++-
.../apache/ratis/protocol/DataStreamRequest.java | 3 ++
.../ratis/protocol/DataStreamRequestHeader.java | 20 +++++++-
.../ratis/examples/filestore/cli/DataStream.java | 11 +++--
.../ratis/examples/filestore/FileStoreWriter.java | 6 ++-
.../apache/ratis/netty/NettyDataStreamUtils.java | 6 +++
.../ratis/netty/server/DataStreamManagement.java | 43 ++++++++++--------
.../netty/server/DataStreamRequestByteBuf.java | 12 ++++-
ratis-proto/src/main/proto/Raft.proto | 10 ++--
.../ratis/datastream/DataStreamTestUtils.java | 5 +-
15 files changed, 154 insertions(+), 78 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index c054d35..be6d13e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.api;
import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
@@ -30,21 +31,13 @@ import java.util.concurrent.CompletableFuture;
/** An asynchronous output stream supporting zero buffer copying. */
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
/**
- * This method is the same as writeAsync(src, sync_default),
- * where sync_default depends on the underlying implementation.
- */
- default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src) {
- return writeAsync(src, false);
- }
-
- /**
* Send out the data in the source buffer asynchronously.
*
* @param src the source buffer to be sent.
- * @param sync Should the data be sync'ed to the underlying storage?
+ * @param options - options specifying how the data was written
* @return a future of the reply.
*/
- CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean sync);
+ CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options);
/**
@@ -52,24 +45,24 @@ public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
* where sync_default depends on the underlying implementation.
*/
default CompletableFuture<DataStreamReply> writeAsync(File src) {
- return writeAsync(src, 0, src.length(), false);
+ return writeAsync(src, 0, src.length());
}
/**
- * The same as writeAsync(FilePositionCount.valueOf(src, position, count), sync).
+ * The same as writeAsync(FilePositionCount.valueOf(src, position, count), options).
*/
- default CompletableFuture<DataStreamReply> writeAsync(File src, long position, long count, boolean sync) {
- return writeAsync(FilePositionCount.valueOf(src, position, count), sync);
+ default CompletableFuture<DataStreamReply> writeAsync(File src, long position, long count, WriteOption... options) {
+ return writeAsync(FilePositionCount.valueOf(src, position, count), options);
}
/**
* Send out the data in the source file asynchronously.
*
* @param src the source file with the starting position and the number of bytes.
- * @param sync Should the data be sync'ed to the underlying storage?
+ * @param options options specifying how the data was written
* @return a future of the reply.
*/
- CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean sync);
+ CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, WriteOption... options);
/**
* Return the future of the {@link RaftClientReply}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 55506b9..a893d85 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -21,7 +21,10 @@ import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
@@ -70,11 +73,7 @@ public class DataStreamClientImpl implements DataStreamClient {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
private final CompletableFuture<RaftClientReply> raftClientReplyFuture = new CompletableFuture<>();
- private final MemoizedSupplier<CompletableFuture<DataStreamReply>> closeSupplier = JavaUtils.memoize(() -> {
- final CompletableFuture<DataStreamReply> f = send(Type.STREAM_CLOSE);
- f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
- return f;
- });
+ private CompletableFuture<DataStreamReply> closeFuture;
private final MemoizedSupplier<WritableByteChannel> writableByteChannelSupplier
= JavaUtils.memoize(() -> new WritableByteChannel() {
@Override
@@ -92,7 +91,11 @@ public class DataStreamClientImpl implements DataStreamClient {
@Override
public void close() throws IOException {
- IOUtils.getFromFuture(closeAsync(), () -> "close(" + ClientInvocationId.valueOf(header) + ")");
+ if (isClosed()) {
+ return;
+ }
+ IOUtils.getFromFuture(writeAsync(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER, StandardWriteOption.CLOSE),
+ () -> "close(" + ClientInvocationId.valueOf(header) + ")");
}
});
@@ -104,46 +107,48 @@ public class DataStreamClientImpl implements DataStreamClient {
this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
}
- private CompletableFuture<DataStreamReply> send(Type type, Object data, long length) {
- final DataStreamRequestHeader h = new DataStreamRequestHeader(type, header.getCallId(), streamOffset, length);
+ private CompletableFuture<DataStreamReply> send(Type type, Object data, long length, WriteOption... options) {
+ final DataStreamRequestHeader h =
+ new DataStreamRequestHeader(type, header.getCallId(), streamOffset, length, options);
return orderedStreamAsync.sendRequest(h, data);
}
- private CompletableFuture<DataStreamReply> send(Type type) {
- return combineHeader(send(type, null, 0));
- }
-
private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataStreamReply> future) {
return future.thenCombine(headerFuture, (reply, headerReply) -> headerReply.isSuccess()? reply : headerReply);
}
- private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, boolean sync) {
+ private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, WriteOption... options) {
if (isClosed()) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
clientId + ": stream already closed, request=" + header));
}
- final CompletableFuture<DataStreamReply> f = send(sync ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA, data, length);
+ final CompletableFuture<DataStreamReply> f = combineHeader(send(Type.STREAM_DATA, data, length, options));
+ if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
+ closeFuture = f;
+ f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
+ }
streamOffset += length;
- return combineHeader(f);
+ return f;
}
@Override
- public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean sync) {
- return writeAsyncImpl(src, src.remaining(), sync);
+ public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options) {
+ return writeAsyncImpl(src, src.remaining(), options);
}
@Override
- public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean sync) {
- return writeAsyncImpl(src, src.getCount(), sync);
+ public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, WriteOption... options) {
+ return writeAsyncImpl(src, src.getCount(), options);
}
- @Override
- public CompletableFuture<DataStreamReply> closeAsync() {
- return closeSupplier.get();
+ boolean isClosed() {
+ return closeFuture != null;
}
- boolean isClosed() {
- return closeSupplier.isInitialized();
+ @Override
+ public CompletableFuture<DataStreamReply> closeAsync() {
+ return isClosed() ? closeFuture :
+ writeAsync(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER, StandardWriteOption.CLOSE);
}
public RaftClientRequest getHeader() {
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 0de14fc..3dc9137 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.datastream.impl;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.util.Preconditions;
@@ -29,8 +30,16 @@ import java.nio.ByteBuffer;
* This class is immutable.
*/
public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
+ private WriteOption[] options;
+
public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) {
super(header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
+ this.options = header.getWriteOptions();
Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
}
+
+ @Override
+ public WriteOption[] getWriteOptions() {
+ return options;
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index 8dd3b8f..fd60bf8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream.impl;
import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
@@ -28,9 +29,11 @@ import org.apache.ratis.protocol.DataStreamRequestHeader;
*/
public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl implements DataStreamRequest {
private final FilePositionCount file;
+ private WriteOption[] options;
public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) {
super(header.getType(), header.getStreamId(), header.getStreamOffset());
+ this.options = header.getWriteOptions();
this.file = file;
}
@@ -43,4 +46,9 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp
public FilePositionCount getFile() {
return file;
}
+
+ @Override
+ public WriteOption[] getWriteOptions() {
+ return options;
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
similarity index 79%
copy from ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
copy to ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
index 8db10fe..0aae8f9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
@@ -15,8 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.ratis.io;
-package org.apache.ratis.protocol;
-
-public interface DataStreamRequest extends DataStreamPacket {
+public enum StandardWriteOption implements WriteOption {
+ /** Sync the data to the underlying storage. */
+ SYNC,
+ /** Close the data to the underlying storage. */
+ CLOSE
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
similarity index 75%
copy from ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
copy to ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
index 8db10fe..11760c4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
@@ -15,8 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.ratis.io;
-package org.apache.ratis.protocol;
+public interface WriteOption {
+ static boolean containsOption(WriteOption[] options, WriteOption target) {
+ for (WriteOption option : options) {
+ if (option == target) {
+ return true;
+ }
+ }
-public interface DataStreamRequest extends DataStreamPacket {
+ return false;
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index 8db10fe..cf09208 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -18,5 +18,8 @@
package org.apache.ratis.protocol;
+import org.apache.ratis.io.WriteOption;
+
public interface DataStreamRequest extends DataStreamPacket {
+ WriteOption[] getWriteOptions();
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 7781cd6..561f4ba 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,6 +18,8 @@
package org.apache.ratis.protocol;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
@@ -51,7 +53,13 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
final DataStreamPacketHeaderProto h = header.getPacketHeader();
if (h.getDataLength() + headerBufLen <= buf.readableBytes()) {
buf.readerIndex(buf.readerIndex() + headerBufLen);
- return new DataStreamRequestHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength());
+ WriteOption[] options = new WriteOption[h.getOptionsCount()];
+ for (int i = 0; i < options.length; i++) {
+ options[i] = StandardWriteOption.values()[h.getOptions(i).ordinal()];
+ }
+
+ return new DataStreamRequestHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength(),
+ options);
} else {
buf.resetReaderIndex();
return null;
@@ -63,7 +71,15 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
}
}
- public DataStreamRequestHeader(Type type, long streamId, long streamOffset, long dataLength) {
+ private final WriteOption[] options;
+
+ public DataStreamRequestHeader(Type type, long streamId, long streamOffset, long dataLength, WriteOption... options) {
super(type, streamId, streamOffset, dataLength);
+ this.options = options;
+ }
+
+ @Override
+ public WriteOption[] getWriteOptions() {
+ return options;
}
}
\ No newline at end of file
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 9afc4bd..c7443f6 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -22,6 +22,7 @@ import com.beust.jcommander.Parameters;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.examples.filestore.FileStoreClient;
+import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
@@ -268,7 +269,8 @@ public class DataStream extends Client {
throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
+ ". The channel has reached end-of-stream at " + offset);
} else if (bytesRead > 0) {
- final CompletableFuture<DataStreamReply> f = out.writeAsync(buf.nioBuffer(), isSync(offset + bytesRead));
+ final CompletableFuture<DataStreamReply> f = isSync(offset + bytesRead) ?
+ out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) : out.writeAsync(buf.nioBuffer());
f.thenRun(buf::release);
futures.add(f);
}
@@ -287,7 +289,8 @@ public class DataStream extends Client {
final long packetSize = getPacketSize(offset);
final MappedByteBuffer mappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, offset, packetSize);
final int remaining = mappedByteBuffer.remaining();
- futures.add(out.writeAsync(mappedByteBuffer, isSync(offset + remaining)));
+ futures.add(isSync(offset + remaining) ?
+ out.writeAsync(mappedByteBuffer, StandardWriteOption.SYNC) : out.writeAsync(mappedByteBuffer));
return remaining;
}
}
@@ -300,7 +303,9 @@ public class DataStream extends Client {
@Override
long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures) {
final long packetSize = getPacketSize(offset);
- futures.add(out.writeAsync(getFile(), offset, packetSize, isSync(offset + packetSize)));
+ futures.add(isSync(offset + packetSize) ?
+ out.writeAsync(getFile(), offset, packetSize, StandardWriteOption.SYNC) :
+ out.writeAsync(getFile(), offset, packetSize));
return packetSize;
}
}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
index 1caa59f..991ce82 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -19,6 +19,7 @@ package org.apache.ratis.examples.filestore;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -147,7 +148,8 @@ class FileStoreWriter implements Closeable {
LOG.trace("write {}, offset={}, length={}, close? {}",
fileName, offset, length, close);
final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
- futures.add(dataStreamOutput.writeAsync(bf, close));
+ futures.add(close ?
+ dataStreamOutput.writeAsync(bf, StandardWriteOption.CLOSE) : dataStreamOutput.writeAsync(bf));
sizes.add(length);
offset += length;
}
@@ -161,7 +163,7 @@ class FileStoreWriter implements Closeable {
reply = futures.get(i).join();
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
- Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA);
+ Assert.assertEquals(reply.getType(), RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA);
}
return this;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index ccf25c7..e6111f5 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -21,6 +21,8 @@ import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
@@ -47,6 +49,10 @@ public interface NettyDataStreamUtils {
.setStreamOffset(request.getStreamOffset())
.setType(request.getType())
.setDataLength(request.getDataLength());
+ for (WriteOption option : request.getWriteOptions()) {
+ b.addOptions(DataStreamPacketHeaderProto.Option.forNumber(
+ ((StandardWriteOption) option).ordinal()));
+ }
return DataStreamRequestHeaderProto
.newBuilder()
.setPacketHeader(b)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index e5ede5e..121c69e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -23,6 +23,8 @@ import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.ClientId;
@@ -76,9 +78,9 @@ public class DataStreamManagement {
this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s -> 0L));
}
- CompletableFuture<Long> write(ByteBuf buf, boolean sync, Executor executor) {
+ CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor executor) {
return composeAsync(writeFuture, executor,
- n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, sync, stream), executor));
+ n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, options, stream), executor));
}
CompletableFuture<Long> close(Executor executor) {
@@ -95,7 +97,7 @@ public class DataStreamManagement {
}
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
- return out.writeAsync(request.slice().nioBuffer(), request.getType() == Type.STREAM_DATA_SYNC);
+ return out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions());
}
CompletableFuture<DataStreamReply> close() {
@@ -259,7 +261,7 @@ public class DataStreamManagement {
return composed;
}
- static long writeTo(ByteBuf buf, boolean sync, DataStream stream) {
+ static long writeTo(ByteBuf buf, WriteOption[] options, DataStream stream) {
final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
@@ -270,13 +272,17 @@ public class DataStreamManagement {
}
}
- if (sync) {
+ if (WriteOption.containsOption(options, StandardWriteOption.SYNC)) {
try {
channel.force(false);
} catch (IOException e) {
throw new CompletionException(e);
}
}
+
+ if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
+ close(stream);
+ }
return byteWritten;
}
@@ -290,12 +296,13 @@ public class DataStreamManagement {
}
static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
- DataStreamRequestByteBuf request, RaftClientReply reply) {
+ DataStreamRequestByteBuf request, RaftClientReply reply, long bytesWritten) {
final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
return DataStreamReplyByteBuffer.newBuilder()
.setDataStreamPacket(request)
.setBuffer(buffer)
.setSuccess(reply.isSuccess())
+ .setBytesWritten(bytesWritten)
.build();
}
@@ -311,15 +318,15 @@ public class DataStreamManagement {
ctx.writeAndFlush(builder.build());
}
- private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
+ private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request, long bytesWritten,
ChannelHandlerContext ctx) {
try {
AsyncRpcApi asyncRpcApi = (AsyncRpcApi) (server.getDivision(info.getRequest()
.getRaftGroupId())
.getRaftClient()
.async());
- return asyncRpcApi.sendForward(info.request)
- .thenAcceptAsync(reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply)), requestExecutor);
+ return asyncRpcApi.sendForward(info.request).thenAcceptAsync(
+ reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, bytesWritten)), requestExecutor);
} catch (IOException e) {
throw new CompletionException(e);
}
@@ -348,7 +355,7 @@ public class DataStreamManagement {
ChannelHandlerContext ctx) {
LOG.warn("Failed to process {}", request, throwable);
try {
- ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0));
} catch (Throwable t) {
LOG.warn("Failed to sendDataStreamException {} for {}", throwable, request, t);
}
@@ -358,6 +365,7 @@ public class DataStreamManagement {
CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
+ boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE);
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
@@ -367,7 +375,7 @@ public class DataStreamManagement {
throw new IllegalStateException("Failed to create a new stream for " + request
+ " since a stream already exists: " + info);
}
- } else if (request.getType() == Type.STREAM_CLOSE) {
+ } else if (close) {
info = Optional.ofNullable(streams.remove(key)).orElseThrow(
() -> new IllegalStateException("Failed to remove StreamInfo for " + request));
} else {
@@ -380,12 +388,9 @@ public class DataStreamManagement {
if (request.getType() == Type.STREAM_HEADER) {
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
- } else if (request.getType() == Type.STREAM_DATA || request.getType() == Type.STREAM_DATA_SYNC) {
- localWrite = info.getLocal().write(buf, request.getType() == Type.STREAM_DATA_SYNC, writeExecutor);
+ } else if (request.getType() == Type.STREAM_DATA) {
+ localWrite = info.getLocal().write(buf, request.getWriteOptions(), writeExecutor);
remoteWrites = info.applyToRemotes(out -> out.write(request));
- } else if (request.getType() == Type.STREAM_CLOSE) {
- localWrite = info.getLocal().close(writeExecutor);
- remoteWrites = info.isPrimary()? info.applyToRemotes(RemoteStream::close): Collections.emptyList();
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
@@ -393,12 +398,12 @@ public class DataStreamManagement {
composeAsync(info.getPrevious(), requestExecutor, n -> JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
if (request.getType() == Type.STREAM_HEADER
- || request.getType() == Type.STREAM_DATA || request.getType() == Type.STREAM_DATA_SYNC) {
+ || (request.getType() == Type.STREAM_DATA && !close)) {
sendReply(remoteWrites, request, bytesWritten, ctx);
- } else if (request.getType() == Type.STREAM_CLOSE) {
+ } else if (close) {
if (info.isPrimary()) {
// after all server close stream, primary server start transaction
- startTransaction(info, request, ctx);
+ startTransaction(info, request, bytesWritten, ctx);
} else {
sendReply(remoteWrites, request, bytesWritten, ctx);
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index afe9465..40cb8e7 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -19,6 +19,7 @@
package org.apache.ratis.netty.server;
import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
@@ -32,14 +33,16 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
*/
public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest {
private final ByteBuf buf;
+ private final WriteOption[] options;
- public DataStreamRequestByteBuf(Type type, long streamId, long streamOffset, ByteBuf buf) {
+ public DataStreamRequestByteBuf(Type type, long streamId, long streamOffset, WriteOption[] options, ByteBuf buf) {
super(type, streamId, streamOffset);
this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
+ this.options = options;
}
public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
- this(header.getType(), header.getStreamId(), header.getStreamOffset(), buf);
+ this(header.getType(), header.getStreamId(), header.getStreamOffset(), header.getWriteOptions(), buf);
}
@Override
@@ -50,4 +53,9 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
public ByteBuf slice() {
return buf.slice();
}
+
+ @Override
+ public WriteOption[] getWriteOptions() {
+ return options;
+ }
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 5964f2f..f380581 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -300,14 +300,18 @@ message DataStreamPacketHeaderProto {
enum Type {
STREAM_HEADER = 0;
STREAM_DATA = 1;
- STREAM_DATA_SYNC = 2;
- STREAM_CLOSE = 3;
+ }
+
+ enum Option {
+ SYNC = 0;
+ CLOSE = 1;
}
uint64 streamId = 1;
uint64 streamOffset = 2;
Type type = 3;
- uint64 dataLength = 4;
+ repeated Option options = 4;
+ uint64 dataLength = 5;
}
message DataStreamRequestHeaderProto {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 2de9332..f7cdc36 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
@@ -260,7 +261,7 @@ public interface DataStreamTestUtils {
sizes.add(size);
final ByteBuffer bf = initBuffer(dataSize, size);
- futures.add(out.writeAsync(bf, i == bufferNum - 1));
+ futures.add(i == bufferNum - 1 ? out.writeAsync(bf, StandardWriteOption.SYNC) : out.writeAsync(bf));
dataSize += size;
}
@@ -272,7 +273,7 @@ public interface DataStreamTestUtils {
// check writeAsync requests
for (int i = 0; i < futures.size(); i++) {
final DataStreamReply reply = futures.get(i).join();
- final Type expectedType = i == futures.size() - 1 ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA;
+ final Type expectedType = Type.STREAM_DATA;
assertSuccessReply(expectedType, sizes.get(i).longValue(), reply);
}
return dataSize;