You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/08 14:48:42 UTC
[incubator-ratis] branch master updated: RATIS-1218. Add sync
option when test filestore performance (#337)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new efca97e RATIS-1218. Add sync option when test filestore performance (#337)
efca97e is described below
commit efca97e57ddf0f4b5980982140433f16fda7dfde
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Dec 8 22:48:35 2020 +0800
RATIS-1218. Add sync option when test filestore performance (#337)
---
.../apache/ratis/examples/filestore/FileInfo.java | 65 +++++-----------------
.../apache/ratis/examples/filestore/FileStore.java | 8 +--
.../ratis/examples/filestore/FileStoreClient.java | 13 +++--
.../examples/filestore/FileStoreStateMachine.java | 3 +-
.../ratis/examples/filestore/cli/Client.java | 15 +++--
.../ratis/examples/filestore/cli/DataStream.java | 33 +++++++++--
.../ratis/examples/filestore/cli/LoadGen.java | 10 +++-
.../examples/filestore/FileStoreAsyncBaseTest.java | 4 +-
.../examples/filestore/FileStoreBaseTest.java | 4 +-
.../ratis/examples/filestore/FileStoreWriter.java | 8 +--
ratis-proto/src/main/proto/Examples.proto | 1 +
11 files changed, 80 insertions(+), 84 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
index 886a646..5d136e3 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java
@@ -20,7 +20,6 @@ package org.apache.ratis.examples.filestore;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.Preconditions;
@@ -30,13 +29,10 @@ import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@@ -78,7 +74,6 @@ abstract class FileInfo {
ByteString read(CheckedFunction<Path, Path, IOException> resolver, long offset, long length, boolean readCommitted)
throws IOException {
- flush();
if (readCommitted && offset + length > getCommittedSize()) {
throw new IOException("Failed to read Committed: offset (=" + offset
+ " + length (=" + length + ") > size = " + getCommittedSize()
@@ -124,30 +119,6 @@ abstract class FileInfo {
}
}
- static class FileOut implements Closeable {
- private final OutputStream out;
- private final WritableByteChannel channel;
-
- FileOut(Path p) throws IOException {
- this.out = FileUtils.createNewFile(p);
- this.channel = Channels.newChannel(out);
- }
-
- int write(ByteBuffer data) throws IOException {
- return channel.write(data);
- }
-
- void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void close() throws IOException {
- channel.close();
- out.close();
- }
- }
-
static class WriteInfo {
/** Future to make sure that each commit is executed after the corresponding write. */
private final CompletableFuture<Integer> writeFuture;
@@ -176,14 +147,12 @@ abstract class FileInfo {
}
static class UnderConstruction extends FileInfo {
- private FileOut out;
+ private FileStore.FileStoreDataChannel out;
/** The size written to a local file. */
private volatile long writeSize;
/** The size committed to client. */
private volatile long committedSize;
- /** The size at last flush. */
- private volatile long flushSize;
/** A queue to make sure that the writes are in order. */
private final TaskQueue writeQueue = new TaskQueue("writeQueue");
@@ -211,26 +180,27 @@ abstract class FileInfo {
}
CompletableFuture<Integer> submitCreate(
- CheckedFunction<Path, Path, IOException> resolver, ByteString data, boolean close,
+ CheckedFunction<Path, Path, IOException> resolver, ByteString data, boolean close, boolean sync,
ExecutorService executor, RaftPeerId id, long index) {
final Supplier<String> name = () -> "create(" + getRelativePath() + ", "
+ close + ") @" + id + ":" + index;
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
if (out == null) {
- out = new FileOut(resolver.apply(getRelativePath()));
+ out = new FileStore.FileStoreDataChannel(new RandomAccessFile(resolver.apply(getRelativePath()).toFile(),
+ "rw"));
}
- return write(0L, data, close);
+ return write(0L, data, close, sync);
}, name);
return submitWrite(task, executor, id, index);
}
CompletableFuture<Integer> submitWrite(
- long offset, ByteString data, boolean close, ExecutorService executor,
+ long offset, ByteString data, boolean close, boolean sync, ExecutorService executor,
RaftPeerId id, long index) {
final Supplier<String> name = () -> "write(" + getRelativePath() + ", "
+ offset + ", " + close + ") @" + id + ":" + index;
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG,
- () -> write(offset, data, close), name);
+ () -> write(offset, data, close, sync), name);
return submitWrite(task, executor, id, index);
}
@@ -244,7 +214,7 @@ abstract class FileInfo {
return f;
}
- private int write(long offset, ByteString data, boolean close) throws IOException {
+ private int write(long offset, ByteString data, boolean close, boolean sync) throws IOException {
// If leader finish write data with offset = 4096 and writeSize become 8192,
// and 2 follower has not written the data with offset = 4096,
// then start a leader election. So client will retry send the data with offset = 4096,
@@ -273,6 +243,10 @@ abstract class FileInfo {
}
}
+ if (sync) {
+ out.force(false);
+ }
+
if (close) {
out.close();
}
@@ -280,19 +254,6 @@ abstract class FileInfo {
}
}
- void flush() throws IOException {
- if (flushSize >= committedSize) {
- return;
- }
- synchronized (out) {
- if (flushSize >= committedSize) {
- return;
- }
- out.flush();
- flushSize = writeSize;
- }
- }
-
CompletableFuture<Integer> submitCommit(
long offset, int size, Function<UnderConstruction, ReadOnly> closeFunction,
ExecutorService executor, RaftPeerId id, long index) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index 84fab1a..e910a99 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -195,7 +195,7 @@ public class FileStore implements Closeable {
}
CompletableFuture<Integer> write(
- long index, String relative, boolean close, long offset, ByteString data) {
+ long index, String relative, boolean close, boolean sync, long offset, ByteString data) {
final int size = data != null? data.size(): 0;
LOG.trace("write {}, offset={}, size={}, close? {} @{}:{}",
relative, offset, size, close, getId(), index);
@@ -214,8 +214,8 @@ public class FileStore implements Closeable {
}
return size == 0 && !close? CompletableFuture.completedFuture(0)
- : createNew? uc.submitCreate(this::resolve, data, close, writer, getId(), index)
- : uc.submitWrite(offset, data, close, writer, getId(), index);
+ : createNew? uc.submitCreate(this::resolve, data, close, sync, writer, getId(), index)
+ : uc.submitWrite(offset, data, close, sync, writer, getId(), index);
}
@Override
@@ -266,7 +266,7 @@ public class FileStore implements Closeable {
}, writer);
}
- class FileStoreDataChannel implements StateMachine.DataChannel {
+ static class FileStoreDataChannel implements StateMachine.DataChannel {
private final RandomAccessFile randomAccessFile;
FileStoreDataChannel(RandomAccessFile file) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 51dc100..509b144 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -145,11 +145,11 @@ public class FileStoreClient implements Closeable {
return sendReadOnlyFunction.apply(read.toByteString());
}
- public long write(String path, long offset, boolean close, ByteBuffer buffer)
+ public long write(String path, long offset, boolean close, ByteBuffer buffer, boolean sync)
throws IOException {
final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
buffer.limit(chunkSize);
- final ByteString reply = writeImpl(this::send, path, offset, close, buffer);
+ final ByteString reply = writeImpl(this::send, path, offset, close, buffer, sync);
return WriteReplyProto.parseFrom(reply).getLength();
}
@@ -162,21 +162,22 @@ public class FileStoreClient implements Closeable {
return client.getDataStreamApi().stream(request.toByteString().asReadOnlyByteBuffer());
}
- public CompletableFuture<Long> writeAsync(String path, long offset, boolean close, ByteBuffer buffer) {
- return writeImpl(this::sendAsync, path, offset, close, buffer
+ public CompletableFuture<Long> writeAsync(String path, long offset, boolean close, ByteBuffer buffer, boolean sync) {
+ return writeImpl(this::sendAsync, path, offset, close, buffer, sync
).thenApply(reply -> JavaUtils.supplyAndWrapAsCompletionException(
() -> WriteReplyProto.parseFrom(reply).getLength()));
}
private static <OUTPUT, THROWABLE extends Throwable> OUTPUT writeImpl(
CheckedFunction<ByteString, OUTPUT, THROWABLE> sendFunction,
- String path, long offset, boolean close, ByteBuffer data)
+ String path, long offset, boolean close, ByteBuffer data, boolean sync)
throws THROWABLE {
final WriteRequestHeaderProto.Builder header = WriteRequestHeaderProto.newBuilder()
.setPath(ProtoUtils.toByteString(path))
.setOffset(offset)
.setLength(data.remaining())
- .setClose(close);
+ .setClose(close)
+ .setSync(sync);
final WriteRequestProto.Builder write = WriteRequestProto.newBuilder()
.setHeader(header)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index aa99240..0dc1561 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -127,7 +127,8 @@ public class FileStoreStateMachine extends BaseStateMachine {
final WriteRequestHeaderProto h = proto.getWriteHeader();
final CompletableFuture<Integer> f = files.write(entry.getIndex(),
- h.getPath().toStringUtf8(), h.getClose(), h.getOffset(), smLog.getStateMachineEntry().getStateMachineData());
+ h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(),
+ smLog.getStateMachineEntry().getStateMachineData());
// sync only if closing the file
return h.getClose()? f: null;
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index fb12f45..8b45453 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -41,6 +41,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
@@ -107,19 +108,25 @@ public abstract class Client extends SubCommandBase {
operation(client);
}
- public List<String> generateFiles() throws IOException {
+
+ protected void stop(RaftClient client) throws IOException {
+ client.close();
+ System.exit(0);
+ }
+
+ protected List<String> generateFiles() throws IOException {
String entropy = RandomStringUtils.randomAlphanumeric(numFiles);
List<String> paths = new ArrayList<>();
for (int i = 0; i < numFiles; i ++) {
String path = "file-" + entropy + "-" + i;
paths.add(path);
- writeFile(path, fileSizeInBytes, bufferSizeInBytes);
+ writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1);
}
return paths;
}
- public void writeFile(String path, int fileSize, int bufferSize) throws IOException {
+ protected void writeFile(String path, int fileSize, int bufferSize, int random) throws IOException {
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(path, "rw");
@@ -129,7 +136,7 @@ public abstract class Client extends SubCommandBase {
final int chunkSize = Math.min(remaining, bufferSize);
byte[] buffer = new byte[chunkSize];
for (int i = 0; i < chunkSize; i ++) {
- buffer[i]= (byte) ('A' + i % 23);
+ buffer[i]= (byte) (i % random);
}
raf.write(buffer);
offset += chunkSize;
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index dd653be..8dab01a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -49,8 +49,32 @@ public class DataStream extends Client {
@Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true)
private String dataStreamType = "NettyFileRegion";
+ @Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero," +
+ "-1 means on sync", required = true)
+ private int syncSize = -1;
+
+ private boolean checkParam() {
+ if (syncSize != -1 && syncSize % getBufferSizeInBytes() != 0) {
+ System.err.println("Error: syncSize % bufferSize should be zero");
+ return false;
+ }
+
+ if (!dataStreamType.equals("DirectByteBuffer") &&
+ !dataStreamType.equals("MappedByteBuffer") &&
+ !dataStreamType.equals("NettyFileRegion")) {
+ System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
+ return false;
+ }
+
+ return true;
+ }
+
@Override
protected void operation(RaftClient client) throws IOException {
+ if (!checkParam()) {
+ stop(client);
+ }
+
List<String> paths = generateFiles();
FileStoreClient fileStoreClient = new FileStoreClient(client);
System.out.println("Starting DataStream write now ");
@@ -66,8 +90,7 @@ public class DataStream extends Client {
System.out.println("Total data written: " + totalWrittenBytes + " bytes");
System.out.println("Total time taken: " + (endTime - startTime) + " millis");
- client.close();
- System.exit(0);
+ stop(client);
}
private Map<String, List<CompletableFuture<DataStreamReply>>> streamWrite(
@@ -87,8 +110,6 @@ public class DataStream extends Client {
fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel()));
} else if (dataStreamType.equals("NettyFileRegion")) {
fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file));
- } else {
- System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
}
dataStreamOutput.closeAsync();
@@ -131,8 +152,8 @@ public class DataStream extends Client {
+ " byte(s). The channel has reached end-of-stream at " + offset);
} else if (bytesRead > 0) {
offset += bytesRead;
-
- final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer());
+ final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(buf.nioBuffer(),
+ syncSize > 0 && (offset == fileSize || offset % syncSize == 0));
f.thenRun(buf::release);
futures.add(f);
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index 5a14e65..45ee600 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.examples.filestore.cli;
+import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.examples.filestore.FileStoreClient;
@@ -37,6 +38,9 @@ import java.util.concurrent.CompletableFuture;
@Parameters(commandDescription = "Load Generator for FileStore")
public class LoadGen extends Client {
+ @Parameter(names = {"--sync"}, description = "Whether sync every bufferSize", required = true)
+ private int sync = 0;
+
@Override
protected void operation(RaftClient client) throws IOException {
List<String> paths = generateFiles();
@@ -54,8 +58,7 @@ public class LoadGen extends Client {
System.out.println("Total data written: " + totalWrittenBytes + " bytes");
System.out.println("Total time taken: " + (endTime - startTime) + " millis");
- client.close();
- System.exit(0);
+ stop(client);
}
private Map<String, List<CompletableFuture<Long>>> writeByHeapByteBuffer(
@@ -76,7 +79,8 @@ public class LoadGen extends Client {
long offset = 0L;
while(fis.read(buffer, 0, bytesToRead) > 0) {
ByteBuffer b = ByteBuffer.wrap(buffer);
- futures.add(fileStoreClient.writeAsync(path, offset, offset + bytesToRead == getFileSizeInBytes(), b));
+ futures.add(fileStoreClient.writeAsync(path, offset, offset + bytesToRead == getFileSizeInBytes(), b,
+ sync == 1));
offset += bytesToRead;
bytesToRead = (int)Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
if (bytesToRead > 0) {
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
index d70cfa6..eb51e64 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreAsyncBaseTest.java
@@ -77,7 +77,7 @@ public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
.setAsyncExecutor(executor)
.setFileStoreClientSupplier(() -> client)
.build()
- .writeAsync()
+ .writeAsync(false)
.thenCompose(FileStoreWriter::verifyAsync)
.thenCompose(FileStoreWriter::deleteAsync)
.get();
@@ -99,7 +99,7 @@ public abstract class FileStoreAsyncBaseTest<CLUSTER extends MiniRaftCluster>
.setAsyncExecutor(executor)
.setFileStoreClientSupplier(() -> client)
.build()
- .writeAsync(),
+ .writeAsync(false),
() -> path + ":" + fileLength);
writerFutures.add(callable.call());
}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
index cf01050..c455aeb 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java
@@ -92,7 +92,7 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
.setFileSize(fileLength)
.setFileStoreClientSupplier(newClient)
.build()) {
- w.write().verify().delete();
+ w.write(false).verify().delete();
}
}
@@ -112,7 +112,7 @@ public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
.setFileName(path)
.setFileSize(fileLength)
.setFileStoreClientSupplier(newClient)
- .build().write(),
+ .build().write(false),
() -> path + ":" + fileLength);
writerFutures.add(executor.submit(callable));
}
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
index 343b0b1..1caa59f 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -113,7 +113,7 @@ class FileStoreWriter implements Closeable {
return b;
}
- FileStoreWriter write() throws IOException {
+ FileStoreWriter write(boolean sync) throws IOException {
final Random r = new Random(seed);
final int size = fileSize.getSizeInt();
@@ -126,7 +126,7 @@ class FileStoreWriter implements Closeable {
LOG.trace("write {}, offset={}, length={}, close? {}",
fileName, offset, length, close);
- final long written = client.write(fileName, offset, close, b);
+ final long written = client.write(fileName, offset, close, b, sync);
Assert.assertEquals(length, written);
offset += written;
}
@@ -167,7 +167,7 @@ class FileStoreWriter implements Closeable {
return this;
}
- CompletableFuture<FileStoreWriter> writeAsync() {
+ CompletableFuture<FileStoreWriter> writeAsync(boolean sync) {
Objects.requireNonNull(asyncExecutor, "asyncExecutor == null");
final Random r = new Random(seed);
final int size = fileSize.getSizeInt();
@@ -188,7 +188,7 @@ class FileStoreWriter implements Closeable {
LOG.trace("writeAsync {}, offset={}, length={}, close? {}",
fileName, offset, length, close);
- client.writeAsync(fileName, offset, close, b)
+ client.writeAsync(fileName, offset, close, b, sync)
.thenAcceptAsync(written -> Assert.assertEquals(length, (long)written), asyncExecutor)
.thenRun(() -> {
final int count = callCount.decrementAndGet();
diff --git a/ratis-proto/src/main/proto/Examples.proto b/ratis-proto/src/main/proto/Examples.proto
index 42e47de..ecf750d 100644
--- a/ratis-proto/src/main/proto/Examples.proto
+++ b/ratis-proto/src/main/proto/Examples.proto
@@ -41,6 +41,7 @@ message WriteRequestHeaderProto {
bool close = 2; // close the file after write?
uint64 offset = 3;
uint64 length = 4;
+ bool sync = 5;
}
message StreamWriteRequestProto {