You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/09 03:52:49 UTC

[incubator-ratis] branch master updated: RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion. (#338)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 79223f8  RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion. (#338)
79223f8 is described below

commit 79223f89054109d2c499d81a149e60c15fc90453
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Dec 9 11:52:39 2020 +0800

    RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion. (#338)
    
    * RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion.
    
    * Fix a bug.
    
    * Fix checkstyle
    
    * Use closeAsync() instaed of try-with-resource.
---
 .../apache/ratis/examples/filestore/FileInfo.java  |   8 +-
 .../apache/ratis/examples/filestore/FileStore.java |   9 +-
 .../ratis/examples/filestore/FileStoreClient.java  |   2 +-
 .../ratis/examples/filestore/cli/DataStream.java   | 206 ++++++++++++++++-----
 4 files changed, 167 insertions(+), 58 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index 5d136e3..6aa0b05 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.file.Files;
@@ -68,10 +67,6 @@ abstract class FileInfo {
         "File " + getRelativePath() + " size is unknown.");
   }
 
-  void flush() throws IOException {
-    // no-op
-  }
-
   ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
       throws IOException {
     if (readCommitted && offset + length > getCommittedSize()) {
@@ -186,8 +181,7 @@ abstract class FileInfo {
           + close + ") @" + id + ":" + index;
       final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
         if (out == null) {
-          out = new FileStore.FileStoreDataChannel(new RandomAccessFile(resolver.apply(getRelativePath()).toFile(),
-              "rw"));
+          out = new FileStore.FileStoreDataChannel(resolver.apply(getRelativePath()));
         }
         return write(0L, data, close, sync);
       }, name);
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index e910a99..8896462 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -259,7 +259,7 @@ public class FileStore implements Closeable {
     return CompletableFuture.supplyAsync(() -> {
       try {
         final Path full = resolve(normalize(p));
-        return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+        return new FileStoreDataChannel(full);
       } catch (IOException e) {
         throw new CompletionException("Failed to create " + p, e);
       }
@@ -267,14 +267,17 @@ public class FileStore implements Closeable {
   }
 
   static class FileStoreDataChannel implements StateMachine.DataChannel {
+    private final Path path;
     private final RandomAccessFile randomAccessFile;
 
-    FileStoreDataChannel(RandomAccessFile file) {
-      randomAccessFile = file;
+    FileStoreDataChannel(Path path) throws FileNotFoundException {
+      this.path = path;
+      this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
     }
 
     @Override
     public void force(boolean metadata) throws IOException {
+      LOG.debug("force({}) at {}", metadata, path);
       randomAccessFile.getChannel().force(metadata);
     }
 
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 509b144..d6e6851 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -153,7 +153,7 @@ public class FileStoreClient implements Closeable {
     return WriteReplyProto.parseFrom(reply).getLength();
   }
 
-  public DataStreamOutput getStreamOutput(String path, int dataSize) {
+  public DataStreamOutput getStreamOutput(String path, long dataSize) {
     final StreamWriteRequestProto header = StreamWriteRequestProto.newBuilder()
         .setPath(ProtoUtils.toByteString(path))
         .setLength(dataSize)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 8dab01a..a5b4a72 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -24,8 +24,8 @@ import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.examples.filestore.FileStoreClient;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
 import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 
 import java.io.File;
@@ -34,35 +34,74 @@ import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
 
 /**
  * Subcommand to generate load in filestore data stream state machine.
  */
 @Parameters(commandDescription = "Load Generator for FileStore DataStream")
 public class DataStream extends Client {
+  enum Type {
+    DirectByteBuffer(DirectByteBufferType::new),
+    MappedByteBuffer(MappedByteBufferType::new),
+    NettyFileRegion(NettyFileRegionType::new);
 
-  @Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true)
-  private String dataStreamType = "NettyFileRegion";
+    private final BiFunction<String, DataStream, TransferType> constructor;
+
+    Type(BiFunction<String, DataStream, TransferType> constructor) {
+      this.constructor = constructor;
+    }
+
+    BiFunction<String, DataStream, TransferType> getConstructor() {
+      return constructor;
+    }
+
+    static Type valueOfIgnoreCase(String s) {
+      for (Type type : values()) {
+        if (type.name().equalsIgnoreCase(s)) {
+          return type;
+        }
+      }
+      return null;
+    }
+  }
+
+  // To be used as a Java annotation attribute value
+  private static final String DESCRIPTION = "[DirectByteBuffer, MappedByteBuffer, NettyFileRegion]";
+
+  {
+    // Assert if the description is correct.
+    final String expected = Arrays.asList(Type.values()).toString();
+    Preconditions.assertTrue(expected.equals(DESCRIPTION),
+        () -> "Unexpected description: " + DESCRIPTION + " does not equal to the expected string " + expected);
+  }
+
+  @Parameter(names = {"--type"}, description = DESCRIPTION, required = true)
+  private String dataStreamType = Type.NettyFileRegion.name();
 
   @Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero," +
       "-1 means on sync", required = true)
   private int syncSize = -1;
 
+  int getSyncSize() {
+    return syncSize;
+  }
+
   private boolean checkParam() {
     if (syncSize != -1 && syncSize % getBufferSizeInBytes() != 0) {
       System.err.println("Error: syncSize % bufferSize should be zero");
       return false;
     }
 
-    if (!dataStreamType.equals("DirectByteBuffer") &&
-        !dataStreamType.equals("MappedByteBuffer") &&
-        !dataStreamType.equals("NettyFileRegion")) {
-      System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
+    if (Type.valueOfIgnoreCase(dataStreamType) == null) {
+      System.err.println("Error: dataStreamType should be one of " + DESCRIPTION);
       return false;
     }
 
@@ -101,18 +140,11 @@ public class DataStream extends Client {
       final long fileLength = file.length();
       Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
           + getFileSizeInBytes() + " but actual size is " + fileLength);
-      FileInputStream fis = new FileInputStream(file);
-      final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());
-
-      if (dataStreamType.equals("DirectByteBuffer")) {
-        fileMap.put(path, writeByDirectByteBuffer(dataStreamOutput, fis.getChannel()));
-      } else if (dataStreamType.equals("MappedByteBuffer")) {
-        fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel()));
-      } else if (dataStreamType.equals("NettyFileRegion")) {
-        fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file));
-      }
 
-      dataStreamOutput.closeAsync();
+      final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
+          .orElseThrow(IllegalStateException::new);
+      final TransferType writer = type.getConstructor().apply(path, this);
+      fileMap.put(path, writer.transfer(fileStoreClient));
     }
     return fileMap;
   }
@@ -134,46 +166,126 @@ public class DataStream extends Client {
     return totalBytes;
   }
 
-  private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
-      FileChannel fileChannel) throws IOException {
-    final int fileSize = getFileSizeInBytes();
-    final int bufferSize = getBufferSizeInBytes();
-    if (fileSize <= 0) {
-      return Collections.emptyList();
+  abstract static class TransferType {
+    private final String path;
+    private final File file;
+    private final long fileSize;
+    private final int bufferSize;
+    private final long syncSize;
+    private long syncPosition = 0;
+
+    TransferType(String path, DataStream cli) {
+      this.path = path;
+      this.file = new File(path);
+      this.fileSize = cli.getFileSizeInBytes();
+      this.bufferSize = cli.getBufferSizeInBytes();
+      this.syncSize = cli.getSyncSize();
+
+      final long actualSize = file.length();
+      Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file size: expected size is "
+          + fileSize + " but actual size is " + actualSize + ", path=" + path);
+    }
+
+    File getFile() {
+      return file;
+    }
+
+    int getBufferSize() {
+      return bufferSize;
+    }
+
+    long getPacketSize(long offset) {
+      return Math.min(bufferSize, fileSize - offset);
     }
-    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-    final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
 
-    for(long offset = 0L; offset < fileSize;) {
-      final ByteBuf buf = alloc.directBuffer(bufferSize);
-      final int bytesRead = buf.writeBytes(fileChannel, bufferSize);
+    boolean isSync(long position) {
+      if (syncSize > 0) {
+        if (position >= fileSize || position - syncPosition >= syncSize) {
+          syncPosition = position;
+          return true;
+        }
+      }
+      return false;
+    }
+
+    List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient client) throws IOException {
+      if (fileSize <= 0) {
+        return Collections.emptyList();
+      }
+
+      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+      final DataStreamOutput out = client.getStreamOutput(path, fileSize);
+      try (FileInputStream fis = new FileInputStream(file)) {
+        final FileChannel in = fis.getChannel();
+        for (long offset = 0L; offset < fileSize; ) {
+          offset += write(in, out, offset, futures);
+        }
+      } catch (Throwable e) {
+        throw new IOException("Failed to transfer " + path);
+      } finally {
+        futures.add(out.closeAsync());
+      }
+      return futures;
+    }
+
+    abstract long write(FileChannel in, DataStreamOutput out, long offset,
+        List<CompletableFuture<DataStreamReply>> futures) throws IOException;
+
+    @Override
+    public String toString() {
+      return JavaUtils.getClassSimpleName(getClass()) + "{" + path + ", size=" + fileSize + "}";
+    }
+  }
+
+  static class DirectByteBufferType extends TransferType {
+    DirectByteBufferType(String path, DataStream cli) {
+      super(path, cli);
+    }
+
+    @Override
+    long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
+        throws IOException {
+      final int bufferSize = getBufferSize();
+      final ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize);
+      final int bytesRead = buf.writeBytes(in, bufferSize);
       if (bytesRead < 0) {
-        throw new IllegalStateException("Failed to read " + fileSize
-            + " byte(s). The channel has reached end-of-stream at " + offset);
+        throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
+            + ". The channel has reached end-of-stream at " + offset);
       } else if (bytesRead > 0) {
-        offset += bytesRead;
-        final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer(),
-            syncSize > 0 && (offset == fileSize || offset % syncSize == 0));
+        final CompletableFuture<DataStreamReply> f = out.writeAsync(buf.nioBuffer(), isSync(offset + bytesRead));
         f.thenRun(buf::release);
         futures.add(f);
       }
+      return bytesRead;
     }
-
-    return futures;
   }
 
-  private List<CompletableFuture<DataStreamReply>> writeByMappedByteBuffer(DataStreamOutput dataStreamOutput,
-      FileChannel fileChannel) throws IOException {
-    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-    MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes());
-    futures.add(dataStreamOutput.writeAsync(mappedByteBuffer));
-    return futures;
+  static class MappedByteBufferType extends TransferType {
+    MappedByteBufferType(String path, DataStream cli) {
+      super(path, cli);
+    }
+
+    @Override
+    long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
+        throws IOException {
+      final long packetSize = getPacketSize(offset);
+      final MappedByteBuffer mappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, offset, packetSize);
+      final int remaining = mappedByteBuffer.remaining();
+      futures.add(out.writeAsync(mappedByteBuffer, isSync(offset + remaining)));
+      return remaining;
+    }
   }
 
-  private List<CompletableFuture<DataStreamReply>> writeByNettyFileRegion(
-      DataStreamOutput dataStreamOutput, File file) {
-    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-    futures.add(dataStreamOutput.writeAsync(file));
-    return futures;
+  static class NettyFileRegionType extends TransferType {
+    NettyFileRegionType(String path, DataStream cli) {
+      super(path, cli);
+    }
+
+    @Override
+    long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures) {
+      final long packetSize = getPacketSize(offset);
+      futures.add(out.writeAsync(getFile(), offset, packetSize, isSync(offset + packetSize)));
+      return packetSize;
+    }
   }
 }