You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/24 14:00:53 UTC
[incubator-ratis] branch master updated: RATIS-1175.
DataStreamOutput should support FileChannel#transferTo. (#296)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7c124b1 RATIS-1175. DataStreamOutput should support FileChannel#transferTo. (#296)
7c124b1 is described below
commit 7c124b1c143e7208a4e8cdb7291b7599c21ac40a
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Nov 24 22:00:42 2020 +0800
RATIS-1175. DataStreamOutput should support FileChannel#transferTo. (#296)
---
.../apache/ratis/client/api/DataStreamOutput.java | 28 +++++++-
.../apache/ratis/client/impl/ClientProtoUtils.java | 12 ++++
.../ratis/client/impl/DataStreamClientImpl.java | 42 ++++++++++-
.../main/java/org/apache/ratis/util/JavaUtils.java | 11 +++
.../ratis/netty/server/DataStreamManagement.java | 16 +----
.../apache/ratis/statemachine/StateMachine.java | 7 ++
.../ratis/datastream/DataStreamClusterTests.java | 84 +++++++++++++++++-----
.../ratis/datastream/DataStreamTestUtils.java | 34 +++++++++
8 files changed, 197 insertions(+), 37 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 8219d69..579367a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -19,16 +19,42 @@ package org.apache.ratis.client.api;
import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientReply;
import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
/** An asynchronous output stream supporting zero buffer copying. */
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
- /** Send out the data in the buffer asynchronously */
+ /**
+ * This method is the same as writeAsync(buffer, sync_default),
+ * where sync_default depends on the underlying implementation.
+ */
default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer) {
return writeAsync(buffer, false);
}
+ /**
+ * Send out the data in the buffer asynchronously.
+ *
+ * @param buffer the data to be sent.
+ * @param sync Should the data be sync'ed to the underlying storage?
+ * @return a future of the reply.
+ */
CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer, boolean sync);
+
+ /**
+ * Return the future of the {@link RaftClientReply}
+ * which will be received once this stream has been closed successfully.
+ * Note that this method does not trigger closing this stream.
+ *
+ * @return the future of the {@link RaftClientReply}.
+ */
+ CompletableFuture<RaftClientReply> getRaftClientReplyFuture();
+
+ /**
+ * @return a {@link WritableByteChannel} view of this {@link DataStreamOutput}.
+ */
+ WritableByteChannel getWritableByteChannel();
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index ab79f4a..0807b3b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.client.impl;
import java.nio.ByteBuffer;
import java.util.Optional;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
@@ -266,6 +267,17 @@ public interface ClientProtoUtils {
return b.build();
}
+ static RaftClientReply getRaftClientReply(DataStreamReply reply) {
+ if (!(reply instanceof DataStreamReplyByteBuffer)) {
+ throw new IllegalStateException("Unexpected " + reply.getClass() + ": reply is " + reply);
+ }
+ try {
+ return toRaftClientReply(((DataStreamReplyByteBuffer) reply).slice());
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException("Failed to getRaftClientReply from " + reply, e);
+ }
+ }
+
static RaftClientReply toRaftClientReply(ByteBuffer buffer) throws InvalidProtocolBufferException {
return toRaftClientReply(RaftClientReplyProto.parseFrom(buffer));
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 45ea1be..ffae192 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -24,16 +24,20 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
/**
@@ -60,8 +64,32 @@ public class DataStreamClientImpl implements DataStreamClient {
public final class DataStreamOutputImpl implements DataStreamOutputRpc {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
- private final MemoizedSupplier<CompletableFuture<DataStreamReply>> closeSupplier
- = JavaUtils.memoize(() -> send(Type.STREAM_CLOSE));
+ private final CompletableFuture<RaftClientReply> raftClientReplyFuture = new CompletableFuture<>();
+ private final MemoizedSupplier<CompletableFuture<DataStreamReply>> closeSupplier = JavaUtils.memoize(() -> {
+ final CompletableFuture<DataStreamReply> f = send(Type.STREAM_CLOSE);
+ f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
+ return f;
+ });
+ private final MemoizedSupplier<WritableByteChannel> writableByteChannelSupplier
+ = JavaUtils.memoize(() -> new WritableByteChannel() {
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ final int remaining = src.remaining();
+ final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src),
+ () -> "write(" + remaining + " bytes for " + ClientInvocationId.valueOf(header) + ")");
+ return Math.toIntExact(reply.getBytesWritten());
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !isClosed();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.getFromFuture(closeAsync(), () -> "close(" + ClientInvocationId.valueOf(header) + ")");
+ }
+ });
private long streamOffset = 0;
@@ -117,6 +145,16 @@ public class DataStreamClientImpl implements DataStreamClient {
public CompletableFuture<DataStreamReply> getHeaderFuture() {
return headerFuture;
}
+
+ @Override
+ public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() {
+ return raftClientReplyFuture;
+ }
+
+ @Override
+ public WritableByteChannel getWritableByteChannel() {
+ return writableByteChannelSupplier.get();
+ }
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 3281fdc..1511688 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -234,6 +235,16 @@ public interface JavaUtils {
return CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY));
}
+ static <V> BiConsumer<V, Throwable> asBiConsumer(CompletableFuture<V> future) {
+ return (v, t) -> {
+ if (t != null) {
+ future.completeExceptionally(t);
+ } else {
+ future.complete(v);
+ }
+ };
+ }
+
static <OUTPUT, THROWABLE extends Throwable> OUTPUT supplyAndWrapAsCompletionException(
CheckedSupplier<OUTPUT, THROWABLE> supplier) {
try {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 14d4196..8cfa763 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -37,7 +37,6 @@ import org.apache.ratis.server.RaftServer.Division;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
@@ -341,7 +340,7 @@ public class DataStreamManagement {
final Stream<RaftClientReply> remoteReplies = results.stream()
.filter(r -> !r.isCompletedExceptionally())
.map(CompletableFuture::join)
- .map(DataStreamManagement::getRaftClientReply);
+ .map(ClientProtoUtils::getRaftClientReply);
// choose the leader's reply if there is any. Otherwise, use the local reply
final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
@@ -397,19 +396,6 @@ public class DataStreamManagement {
});
}
- static RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
- if (dataStreamReply instanceof DataStreamReplyByteBuffer) {
- try {
- return ClientProtoUtils.toRaftClientReply(((DataStreamReplyByteBuffer) dataStreamReply).slice());
- } catch (InvalidProtocolBufferException e) {
- throw new IllegalStateException("Failed to parse " + JavaUtils.getClassSimpleName(dataStreamReply.getClass())
- + ": reply is " + dataStreamReply, e);
- }
- } else {
- throw new IllegalStateException("Unexpected " + dataStreamReply.getClass() + ": reply is " + dataStreamReply);
- }
- }
-
void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
LOG.debug("{}: read {}", this, request);
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 95c6c41..a950167 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -239,6 +239,13 @@ public interface StateMachine extends Closeable {
* For write state machine data.
*/
interface DataChannel extends WritableByteChannel {
+ /**
+ * Similar to {@link java.nio.channels.FileChannel#force(boolean)},
+ * the underlying implementation should force writing the data and/or metadata to the underlying storage.
+ *
+ * @param metadata Should the metadata be forced?
+ * @throws IOException If there are IO errors.
+ */
void force(boolean metadata) throws IOException;
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 2e84546..c93c294 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -22,17 +22,19 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.datastream.DataStreamTestUtils.SingleDataStream;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.impl.RaftServerProxy;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Collection;
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
@@ -50,26 +52,70 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
}
void testStreamWrites(CLUSTER cluster) throws Exception {
- final RaftServerImpl leader = waitForLeader(cluster);
-
- final RaftGroup raftGroup = cluster.getGroup();
- final Collection<RaftPeer> peers = raftGroup.getPeers();
- Assert.assertEquals(NUM_SERVERS, peers.size());
- RaftPeer raftPeer = peers.iterator().next();
+ waitForLeader(cluster);
+ runTestDataStreamOutput(cluster);
+ runTestTransferTo(cluster);
+ }
+ void runTestDataStreamOutput(CLUSTER cluster) throws Exception {
final RaftClientRequest request;
- try (RaftClient client = cluster.createClient(raftPeer)) {
- // send a stream request
+ final CompletableFuture<RaftClientReply> reply;
+ try (RaftClient client = cluster.createClient()) {
try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
+ request = out.getHeader();
+ reply = out.getRaftClientReplyFuture();
+
+ // write using DataStreamOutput
DataStreamTestUtils.writeAndAssertReplies(out, 1000, 10);
+ }
+ }
+
+ watchOrSleep(cluster, reply.join().getLogIndex());
+ assertLogEntry(cluster, request);
+ }
+
+ void runTestTransferTo(CLUSTER cluster) throws Exception {
+ final int size = 4_000_000 + ThreadLocalRandom.current().nextInt(1_000_000);
+
+ // create data file
+ final File f = new File(getTestDir(), "a.txt");
+ DataStreamTestUtils.createFile(f, size);
+
+ final RaftClientRequest request;
+ final CompletableFuture<RaftClientReply> reply;
+ try (RaftClient client = cluster.createClient()) {
+ try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
request = out.getHeader();
+ reply = out.getRaftClientReplyFuture();
+
+ // write using transferTo WritableByteChannel
+ try(FileInputStream in = new FileInputStream(f)) {
+ final long transferred = in.getChannel().transferTo(0, size, out.getWritableByteChannel());
+ Assert.assertEquals(size, transferred);
+ }
}
}
- // verify the write request is in the Raft log.
- final RaftLog log = leader.getState().getLog();
- final LogEntryProto entry = DataStreamTestUtils.searchLogEntry(ClientInvocationId.valueOf(request), log);
- LOG.info("entry={}", entry);
- Assert.assertNotNull(entry);
+ watchOrSleep(cluster, reply.join().getLogIndex());
+ assertLogEntry(cluster, request);
+ }
+
+ void watchOrSleep(CLUSTER cluster, long index) throws Exception {
+ try (RaftClient client = cluster.createClient()) {
+ client.async().watch(index, ReplicationLevel.ALL).join();
+ } catch (UnsupportedOperationException e) {
+ // the cluster does not support watch
+ ONE_SECOND.sleep();
+ }
+ }
+
+ void assertLogEntry(CLUSTER cluster, RaftClientRequest request) throws Exception {
+ for (RaftServerProxy proxy : cluster.getServers()) {
+ final RaftServerImpl impl = proxy.getImpl(cluster.getGroupId());
+ final MultiDataStreamStateMachine stateMachine = (MultiDataStreamStateMachine) impl.getStateMachine();
+ final SingleDataStream s = stateMachine.getSingleDataStream(request);
+ Assert.assertFalse(s.getDataChannel().isOpen());
+ DataStreamTestUtils.assertLogEntry(impl, s);
+ }
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 177d1ff..73d4b24 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -43,12 +43,16 @@ import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileOutputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -82,6 +86,36 @@ public interface DataStreamTestUtils {
return buffer;
}
+ static void createFile(File f, int size) throws Exception {
+ final ReadableByteChannel source = new ReadableByteChannel() {
+ private int offset = 0;
+
+ @Override
+ public boolean isOpen() {
+ return offset < size;
+ }
+
+ @Override
+ public void close() {
+ offset = size;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) {
+ final int start = offset;
+ for(; dst.remaining() > 0 && isOpen(); offset++) {
+ dst.put(pos2byte(offset));
+ }
+ return offset - start;
+ }
+ };
+ FileUtils.createDirectories(f.getParentFile());
+ try(FileOutputStream out = new FileOutputStream(f)) {
+ final long transferred = out.getChannel().transferFrom(source, 0, size);
+ Assert.assertEquals(size, transferred);
+ }
+ }
+
static ByteString bytesWritten2ByteString(long bytesWritten) {
return ByteString.copyFromUtf8("bytesWritten=" + bytesWritten);
}