You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/09 03:52:49 UTC
[incubator-ratis] branch master updated: RATIS-1220. FileStore
stream to send small packets for MappedByteBuffer and NettyFileRegion.
(#338)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 79223f8 RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion. (#338)
79223f8 is described below
commit 79223f89054109d2c499d81a149e60c15fc90453
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Dec 9 11:52:39 2020 +0800
RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion. (#338)
* RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion.
* Fix a bug.
* Fix checkstyle
* Use closeAsync() instaed of try-with-resource.
---
.../apache/ratis/examples/filestore/FileInfo.java | 8 +-
.../apache/ratis/examples/filestore/FileStore.java | 9 +-
.../ratis/examples/filestore/FileStoreClient.java | 2 +-
.../ratis/examples/filestore/cli/DataStream.java | 206 ++++++++++++++++-----
4 files changed, 167 insertions(+), 58 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index 5d136e3..6aa0b05 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
@@ -68,10 +67,6 @@ abstract class FileInfo {
"File " + getRelativePath() + " size is unknown.");
}
- void flush() throws IOException {
- // no-op
- }
-
ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
throws IOException {
if (readCommitted && offset + length > getCommittedSize()) {
@@ -186,8 +181,7 @@ abstract class FileInfo {
+ close + ") @" + id + ":" + index;
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
if (out == null) {
- out = new FileStore.FileStoreDataChannel(new RandomAccessFile(resolver.apply(getRelativePath()).toFile(),
- "rw"));
+ out = new FileStore.FileStoreDataChannel(resolver.apply(getRelativePath()));
}
return write(0L, data, close, sync);
}, name);
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index e910a99..8896462 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -259,7 +259,7 @@ public class FileStore implements Closeable {
return CompletableFuture.supplyAsync(() -> {
try {
final Path full = resolve(normalize(p));
- return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw"));
+ return new FileStoreDataChannel(full);
} catch (IOException e) {
throw new CompletionException("Failed to create " + p, e);
}
@@ -267,14 +267,17 @@ public class FileStore implements Closeable {
}
static class FileStoreDataChannel implements StateMachine.DataChannel {
+ private final Path path;
private final RandomAccessFile randomAccessFile;
- FileStoreDataChannel(RandomAccessFile file) {
- randomAccessFile = file;
+ FileStoreDataChannel(Path path) throws FileNotFoundException {
+ this.path = path;
+ this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
}
@Override
public void force(boolean metadata) throws IOException {
+ LOG.debug("force({}) at {}", metadata, path);
randomAccessFile.getChannel().force(metadata);
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 509b144..d6e6851 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -153,7 +153,7 @@ public class FileStoreClient implements Closeable {
return WriteReplyProto.parseFrom(reply).getLength();
}
- public DataStreamOutput getStreamOutput(String path, int dataSize) {
+ public DataStreamOutput getStreamOutput(String path, long dataSize) {
final StreamWriteRequestProto header = StreamWriteRequestProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
.setLength(dataSize)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 8dab01a..a5b4a72 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -24,8 +24,8 @@ import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import java.io.File;
@@ -34,35 +34,74 @@ import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
/**
* Subcommand to generate load in filestore data stream state machine.
*/
@Parameters(commandDescription = "Load Generator for FileStore DataStream")
public class DataStream extends Client {
+ enum Type {
+ DirectByteBuffer(DirectByteBufferType::new),
+ MappedByteBuffer(MappedByteBufferType::new),
+ NettyFileRegion(NettyFileRegionType::new);
- @Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true)
- private String dataStreamType = "NettyFileRegion";
+ private final BiFunction<String, DataStream, TransferType> constructor;
+
+ Type(BiFunction<String, DataStream, TransferType> constructor) {
+ this.constructor = constructor;
+ }
+
+ BiFunction<String, DataStream, TransferType> getConstructor() {
+ return constructor;
+ }
+
+ static Type valueOfIgnoreCase(String s) {
+ for (Type type : values()) {
+ if (type.name().equalsIgnoreCase(s)) {
+ return type;
+ }
+ }
+ return null;
+ }
+ }
+
+ // To be used as a Java annotation attribute value
+ private static final String DESCRIPTION = "[DirectByteBuffer, MappedByteBuffer, NettyFileRegion]";
+
+ {
+ // Assert if the description is correct.
+ final String expected = Arrays.asList(Type.values()).toString();
+ Preconditions.assertTrue(expected.equals(DESCRIPTION),
+ () -> "Unexpected description: " + DESCRIPTION + " does not equal to the expected string " + expected);
+ }
+
+ @Parameter(names = {"--type"}, description = DESCRIPTION, required = true)
+ private String dataStreamType = Type.NettyFileRegion.name();
@Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero," +
"-1 means on sync", required = true)
private int syncSize = -1;
+ int getSyncSize() {
+ return syncSize;
+ }
+
private boolean checkParam() {
if (syncSize != -1 && syncSize % getBufferSizeInBytes() != 0) {
System.err.println("Error: syncSize % bufferSize should be zero");
return false;
}
- if (!dataStreamType.equals("DirectByteBuffer") &&
- !dataStreamType.equals("MappedByteBuffer") &&
- !dataStreamType.equals("NettyFileRegion")) {
- System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
+ if (Type.valueOfIgnoreCase(dataStreamType) == null) {
+ System.err.println("Error: dataStreamType should be one of " + DESCRIPTION);
return false;
}
@@ -101,18 +140,11 @@ public class DataStream extends Client {
final long fileLength = file.length();
Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
+ getFileSizeInBytes() + " but actual size is " + fileLength);
- FileInputStream fis = new FileInputStream(file);
- final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());
-
- if (dataStreamType.equals("DirectByteBuffer")) {
- fileMap.put(path, writeByDirectByteBuffer(dataStreamOutput, fis.getChannel()));
- } else if (dataStreamType.equals("MappedByteBuffer")) {
- fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel()));
- } else if (dataStreamType.equals("NettyFileRegion")) {
- fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file));
- }
- dataStreamOutput.closeAsync();
+ final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
+ .orElseThrow(IllegalStateException::new);
+ final TransferType writer = type.getConstructor().apply(path, this);
+ fileMap.put(path, writer.transfer(fileStoreClient));
}
return fileMap;
}
@@ -134,46 +166,126 @@ public class DataStream extends Client {
return totalBytes;
}
- private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
- FileChannel fileChannel) throws IOException {
- final int fileSize = getFileSizeInBytes();
- final int bufferSize = getBufferSizeInBytes();
- if (fileSize <= 0) {
- return Collections.emptyList();
+ abstract static class TransferType {
+ private final String path;
+ private final File file;
+ private final long fileSize;
+ private final int bufferSize;
+ private final long syncSize;
+ private long syncPosition = 0;
+
+ TransferType(String path, DataStream cli) {
+ this.path = path;
+ this.file = new File(path);
+ this.fileSize = cli.getFileSizeInBytes();
+ this.bufferSize = cli.getBufferSizeInBytes();
+ this.syncSize = cli.getSyncSize();
+
+ final long actualSize = file.length();
+ Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file size: expected size is "
+ + fileSize + " but actual size is " + actualSize + ", path=" + path);
+ }
+
+ File getFile() {
+ return file;
+ }
+
+ int getBufferSize() {
+ return bufferSize;
+ }
+
+ long getPacketSize(long offset) {
+ return Math.min(bufferSize, fileSize - offset);
}
- List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
- for(long offset = 0L; offset < fileSize;) {
- final ByteBuf buf = alloc.directBuffer(bufferSize);
- final int bytesRead = buf.writeBytes(fileChannel, bufferSize);
+ boolean isSync(long position) {
+ if (syncSize > 0) {
+ if (position >= fileSize || position - syncPosition >= syncSize) {
+ syncPosition = position;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient client) throws IOException {
+ if (fileSize <= 0) {
+ return Collections.emptyList();
+ }
+
+ final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ final DataStreamOutput out = client.getStreamOutput(path, fileSize);
+ try (FileInputStream fis = new FileInputStream(file)) {
+ final FileChannel in = fis.getChannel();
+ for (long offset = 0L; offset < fileSize; ) {
+ offset += write(in, out, offset, futures);
+ }
+ } catch (Throwable e) {
+ throw new IOException("Failed to transfer " + path);
+ } finally {
+ futures.add(out.closeAsync());
+ }
+ return futures;
+ }
+
+ abstract long write(FileChannel in, DataStreamOutput out, long offset,
+ List<CompletableFuture<DataStreamReply>> futures) throws IOException;
+
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(getClass()) + "{" + path + ", size=" + fileSize + "}";
+ }
+ }
+
+ static class DirectByteBufferType extends TransferType {
+ DirectByteBufferType(String path, DataStream cli) {
+ super(path, cli);
+ }
+
+ @Override
+ long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
+ throws IOException {
+ final int bufferSize = getBufferSize();
+ final ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize);
+ final int bytesRead = buf.writeBytes(in, bufferSize);
if (bytesRead < 0) {
- throw new IllegalStateException("Failed to read " + fileSize
- + " byte(s). The channel has reached end-of-stream at " + offset);
+ throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
+ + ". The channel has reached end-of-stream at " + offset);
} else if (bytesRead > 0) {
- offset += bytesRead;
- final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer(),
- syncSize > 0 && (offset == fileSize || offset % syncSize == 0));
+ final CompletableFuture<DataStreamReply> f = out.writeAsync(buf.nioBuffer(), isSync(offset + bytesRead));
f.thenRun(buf::release);
futures.add(f);
}
+ return bytesRead;
}
-
- return futures;
}
- private List<CompletableFuture<DataStreamReply>> writeByMappedByteBuffer(DataStreamOutput dataStreamOutput,
- FileChannel fileChannel) throws IOException {
- List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes());
- futures.add(dataStreamOutput.writeAsync(mappedByteBuffer));
- return futures;
+ static class MappedByteBufferType extends TransferType {
+ MappedByteBufferType(String path, DataStream cli) {
+ super(path, cli);
+ }
+
+ @Override
+ long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures)
+ throws IOException {
+ final long packetSize = getPacketSize(offset);
+ final MappedByteBuffer mappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, offset, packetSize);
+ final int remaining = mappedByteBuffer.remaining();
+ futures.add(out.writeAsync(mappedByteBuffer, isSync(offset + remaining)));
+ return remaining;
+ }
}
- private List<CompletableFuture<DataStreamReply>> writeByNettyFileRegion(
- DataStreamOutput dataStreamOutput, File file) {
- List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- futures.add(dataStreamOutput.writeAsync(file));
- return futures;
+ static class NettyFileRegionType extends TransferType {
+ NettyFileRegionType(String path, DataStream cli) {
+ super(path, cli);
+ }
+
+ @Override
+ long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures) {
+ final long packetSize = getPacketSize(offset);
+ futures.add(out.writeAsync(getFile(), offset, packetSize, isSync(offset + packetSize)));
+ return packetSize;
+ }
}
}