You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/24 14:00:53 UTC

[incubator-ratis] branch master updated: RATIS-1175. DataStreamOutput should support FileChannel#transferTo. (#296)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c124b1  RATIS-1175. DataStreamOutput should support FileChannel#transferTo. (#296)
7c124b1 is described below

commit 7c124b1c143e7208a4e8cdb7291b7599c21ac40a
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Nov 24 22:00:42 2020 +0800

    RATIS-1175. DataStreamOutput should support FileChannel#transferTo. (#296)
---
 .../apache/ratis/client/api/DataStreamOutput.java  | 28 +++++++-
 .../apache/ratis/client/impl/ClientProtoUtils.java | 12 ++++
 .../ratis/client/impl/DataStreamClientImpl.java    | 42 ++++++++++-
 .../main/java/org/apache/ratis/util/JavaUtils.java | 11 +++
 .../ratis/netty/server/DataStreamManagement.java   | 16 +----
 .../apache/ratis/statemachine/StateMachine.java    |  7 ++
 .../ratis/datastream/DataStreamClusterTests.java   | 84 +++++++++++++++++-----
 .../ratis/datastream/DataStreamTestUtils.java      | 34 +++++++++
 8 files changed, 197 insertions(+), 37 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 8219d69..579367a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -19,16 +19,42 @@ package org.apache.ratis.client.api;
 
 import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientReply;
 
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.CompletableFuture;
 
 /** An asynchronous output stream supporting zero buffer copying. */
 public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
-  /** Send out the data in the buffer asynchronously */
+  /**
+   * This method is the same as writeAsync(buffer, sync_default),
+   * where sync_default depends on the underlying implementation.
+   */
   default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer) {
     return writeAsync(buffer, false);
   }
 
+  /**
+   * Send out the data in the buffer asynchronously.
+   *
+   * @param buffer the data to be sent.
+   * @param sync Should the data be sync'ed to the underlying storage?
+   * @return a future of the reply.
+   */
   CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer, boolean sync);
+
+  /**
+   * Return the future of the {@link RaftClientReply}
+   * which will be received once this stream has been closed successfully.
+   * Note that this method does not trigger closing this stream.
+   *
+   * @return the future of the {@link RaftClientReply}.
+   */
+  CompletableFuture<RaftClientReply> getRaftClientReplyFuture();
+
+  /**
+   * @return a {@link WritableByteChannel} view of this {@link DataStreamOutput}.
+   */
+  WritableByteChannel getWritableByteChannel();
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index ab79f4a..0807b3b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.client.impl;
 import java.nio.ByteBuffer;
 import java.util.Optional;
 
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.DataStreamException;
@@ -266,6 +267,17 @@ public interface ClientProtoUtils {
     return b.build();
   }
 
+  static RaftClientReply getRaftClientReply(DataStreamReply reply) {
+    if (!(reply instanceof DataStreamReplyByteBuffer)) {
+      throw new IllegalStateException("Unexpected " + reply.getClass() + ": reply is " + reply);
+    }
+    try {
+      return toRaftClientReply(((DataStreamReplyByteBuffer) reply).slice());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalStateException("Failed to getRaftClientReply from " + reply, e);
+    }
+  }
+
   static RaftClientReply toRaftClientReply(ByteBuffer buffer) throws InvalidProtocolBufferException {
     return toRaftClientReply(RaftClientReplyProto.parseFrom(buffer));
   }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 45ea1be..ffae192 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -24,16 +24,20 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -60,8 +64,32 @@ public class DataStreamClientImpl implements DataStreamClient {
   public final class DataStreamOutputImpl implements DataStreamOutputRpc {
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
-    private final MemoizedSupplier<CompletableFuture<DataStreamReply>> closeSupplier
-        = JavaUtils.memoize(() -> send(Type.STREAM_CLOSE));
+    private final CompletableFuture<RaftClientReply> raftClientReplyFuture = new CompletableFuture<>();
+    private final MemoizedSupplier<CompletableFuture<DataStreamReply>> closeSupplier = JavaUtils.memoize(() -> {
+      final CompletableFuture<DataStreamReply> f = send(Type.STREAM_CLOSE);
+      f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
+      return f;
+    });
+    private final MemoizedSupplier<WritableByteChannel> writableByteChannelSupplier
+        = JavaUtils.memoize(() -> new WritableByteChannel() {
+      @Override
+      public int write(ByteBuffer src) throws IOException {
+        final int remaining = src.remaining();
+        final DataStreamReply reply = IOUtils.getFromFuture(writeAsync(src),
+            () -> "write(" + remaining + " bytes for " + ClientInvocationId.valueOf(header) + ")");
+        return Math.toIntExact(reply.getBytesWritten());
+      }
+
+      @Override
+      public boolean isOpen() {
+        return !isClosed();
+      }
+
+      @Override
+      public void close() throws IOException {
+        IOUtils.getFromFuture(closeAsync(), () -> "close(" + ClientInvocationId.valueOf(header) + ")");
+      }
+    });
 
     private long streamOffset = 0;
 
@@ -117,6 +145,16 @@ public class DataStreamClientImpl implements DataStreamClient {
     public CompletableFuture<DataStreamReply> getHeaderFuture() {
       return headerFuture;
     }
+
+    @Override
+    public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() {
+      return raftClientReplyFuture;
+    }
+
+    @Override
+    public WritableByteChannel getWritableByteChannel() {
+      return writableByteChannelSupplier.get();
+    }
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 3281fdc..1511688 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -234,6 +235,16 @@ public interface JavaUtils {
     return CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY));
   }
 
+  static <V> BiConsumer<V, Throwable> asBiConsumer(CompletableFuture<V> future) {
+    return (v, t) -> {
+      if (t != null) {
+        future.completeExceptionally(t);
+      } else {
+        future.complete(v);
+      }
+    };
+  }
+
   static <OUTPUT, THROWABLE extends Throwable> OUTPUT supplyAndWrapAsCompletionException(
       CheckedSupplier<OUTPUT, THROWABLE> supplier) {
     try {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 14d4196..8cfa763 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -37,7 +37,6 @@ import org.apache.ratis.server.RaftServer.Division;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.StateMachine.DataChannel;
-import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
@@ -341,7 +340,7 @@ public class DataStreamManagement {
     final Stream<RaftClientReply> remoteReplies = results.stream()
         .filter(r -> !r.isCompletedExceptionally())
         .map(CompletableFuture::join)
-        .map(DataStreamManagement::getRaftClientReply);
+        .map(ClientProtoUtils::getRaftClientReply);
 
     // choose the leader's reply if there is any.  Otherwise, use the local reply
     final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
@@ -397,19 +396,6 @@ public class DataStreamManagement {
     });
   }
 
-  static RaftClientReply getRaftClientReply(DataStreamReply dataStreamReply) {
-    if (dataStreamReply instanceof DataStreamReplyByteBuffer) {
-      try {
-        return ClientProtoUtils.toRaftClientReply(((DataStreamReplyByteBuffer) dataStreamReply).slice());
-      } catch (InvalidProtocolBufferException e) {
-        throw new IllegalStateException("Failed to parse " + JavaUtils.getClassSimpleName(dataStreamReply.getClass())
-            + ": reply is " + dataStreamReply, e);
-      }
-    } else {
-      throw new IllegalStateException("Unexpected " + dataStreamReply.getClass() + ": reply is " + dataStreamReply);
-    }
-  }
-
   void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
       CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
     LOG.debug("{}: read {}", this, request);
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 95c6c41..a950167 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -239,6 +239,13 @@ public interface StateMachine extends Closeable {
    * For write state machine data.
    */
   interface DataChannel extends WritableByteChannel {
+    /**
+     * Similar to {@link java.nio.channels.FileChannel#force(boolean)},
+     * the underlying implementation should force writing the data and/or metadata to the underlying storage.
+     *
+     * @param metadata Should the metadata be forced?
+     * @throws IOException If there are IO errors.
+     */
     void force(boolean metadata) throws IOException;
   }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 2e84546..c93c294 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -22,17 +22,19 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.datastream.DataStreamTestUtils.SingleDataStream;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.impl.RaftServerProxy;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collection;
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
@@ -50,26 +52,70 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
   }
 
   void testStreamWrites(CLUSTER cluster) throws Exception {
-    final RaftServerImpl leader = waitForLeader(cluster);
-
-    final RaftGroup raftGroup = cluster.getGroup();
-    final Collection<RaftPeer> peers = raftGroup.getPeers();
-    Assert.assertEquals(NUM_SERVERS, peers.size());
-    RaftPeer raftPeer = peers.iterator().next();
+    waitForLeader(cluster);
+    runTestDataStreamOutput(cluster);
+    runTestTransferTo(cluster);
+  }
 
+  void runTestDataStreamOutput(CLUSTER cluster) throws Exception {
     final RaftClientRequest request;
-    try (RaftClient client = cluster.createClient(raftPeer)) {
-      // send a stream request
+    final CompletableFuture<RaftClientReply> reply;
+    try (RaftClient client = cluster.createClient()) {
       try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
+        request = out.getHeader();
+        reply = out.getRaftClientReplyFuture();
+
+        // write using DataStreamOutput
         DataStreamTestUtils.writeAndAssertReplies(out, 1000, 10);
+      }
+    }
+
+    watchOrSleep(cluster, reply.join().getLogIndex());
+    assertLogEntry(cluster, request);
+  }
+
+  void runTestTransferTo(CLUSTER cluster) throws Exception {
+    final int size = 4_000_000 + ThreadLocalRandom.current().nextInt(1_000_000);
+
+    // create data file
+    final File f = new File(getTestDir(), "a.txt");
+    DataStreamTestUtils.createFile(f, size);
+
+    final RaftClientRequest request;
+    final CompletableFuture<RaftClientReply> reply;
+    try (RaftClient client = cluster.createClient()) {
+      try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
         request = out.getHeader();
+        reply = out.getRaftClientReplyFuture();
+
+        // write using transferTo WritableByteChannel
+        try(FileInputStream in = new FileInputStream(f)) {
+          final long transferred = in.getChannel().transferTo(0, size, out.getWritableByteChannel());
+          Assert.assertEquals(size, transferred);
+        }
       }
     }
 
-    // verify the write request is in the Raft log.
-    final RaftLog log = leader.getState().getLog();
-    final LogEntryProto entry = DataStreamTestUtils.searchLogEntry(ClientInvocationId.valueOf(request), log);
-    LOG.info("entry={}", entry);
-    Assert.assertNotNull(entry);
+    watchOrSleep(cluster, reply.join().getLogIndex());
+    assertLogEntry(cluster, request);
+  }
+
+  void watchOrSleep(CLUSTER cluster, long index) throws Exception {
+    try (RaftClient client = cluster.createClient()) {
+      client.async().watch(index, ReplicationLevel.ALL).join();
+    } catch (UnsupportedOperationException e) {
+      // the cluster does not support watch
+      ONE_SECOND.sleep();
+    }
+  }
+
+  void assertLogEntry(CLUSTER cluster, RaftClientRequest request) throws Exception {
+    for (RaftServerProxy proxy : cluster.getServers()) {
+      final RaftServerImpl impl = proxy.getImpl(cluster.getGroupId());
+      final MultiDataStreamStateMachine stateMachine = (MultiDataStreamStateMachine) impl.getStateMachine();
+      final SingleDataStream s = stateMachine.getSingleDataStream(request);
+      Assert.assertFalse(s.getDataChannel().isOpen());
+      DataStreamTestUtils.assertLogEntry(impl, s);
+    }
   }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 177d1ff..73d4b24 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -43,12 +43,16 @@ import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -82,6 +86,36 @@ public interface DataStreamTestUtils {
     return buffer;
   }
 
+  static void createFile(File f, int size) throws Exception {
+    final ReadableByteChannel source = new ReadableByteChannel() {
+      private int offset = 0;
+
+      @Override
+      public boolean isOpen() {
+        return offset < size;
+      }
+
+      @Override
+      public void close() {
+        offset = size;
+      }
+
+      @Override
+      public int read(ByteBuffer dst) {
+        final int start = offset;
+        for(; dst.remaining() > 0 && isOpen(); offset++) {
+          dst.put(pos2byte(offset));
+        }
+        return offset - start;
+      }
+    };
+    FileUtils.createDirectories(f.getParentFile());
+    try(FileOutputStream out = new FileOutputStream(f)) {
+      final long transferred = out.getChannel().transferFrom(source, 0, size);
+      Assert.assertEquals(size, transferred);
+    }
+  }
+
   static ByteString bytesWritten2ByteString(long bytesWritten) {
     return ByteString.copyFromUtf8("bytesWritten=" + bytesWritten);
   }