You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/11 07:51:04 UTC
[incubator-ratis] branch master updated: RATIS-1229. FileStore
client generate files in parallel (#347)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5fff9c5 RATIS-1229. FileStore client generate files in parallel (#347)
5fff9c5 is described below
commit 5fff9c597f44d2731605e23a758f00030700c5f8
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Dec 11 15:50:57 2020 +0800
RATIS-1229. FileStore client generate files in parallel (#347)
* RATIS-1229. FileStore client generate files in parallel
* drop system cache
* fix code review
---
.../apache/ratis/examples/filestore/FileStore.java | 29 ++++++--
.../ratis/examples/filestore/FileStoreCommon.java | 12 ++-
.../examples/filestore/FileStoreStateMachine.java | 9 +--
.../ratis/examples/filestore/cli/Client.java | 87 ++++++++++++++++------
.../ratis/examples/filestore/cli/DataStream.java | 8 +-
.../ratis/examples/filestore/cli/LoadGen.java | 8 +-
.../ratis/examples/filestore/cli/Server.java | 26 ++++---
7 files changed, 127 insertions(+), 52 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index c0af78e..1ecff52 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.examples.filestore;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.examples.filestore.FileInfo.ReadOnly;
import org.apache.ratis.examples.filestore.FileInfo.UnderConstruction;
import org.apache.ratis.proto.ExamplesProtos.ReadReplyProto;
@@ -101,14 +103,31 @@ public class FileStore implements Closeable {
private final List<Supplier<Path>> rootSuppliers;
private final FileMap files;
- private final ExecutorService writer = Executors.newFixedThreadPool(10);
- private final ExecutorService committer = Executors.newFixedThreadPool(3);
- private final ExecutorService reader = Executors.newFixedThreadPool(10);
- private final ExecutorService deleter = Executors.newFixedThreadPool(3);
+ private final ExecutorService writer;
+ private final ExecutorService committer;
+ private final ExecutorService reader;
+ private final ExecutorService deleter;
- public FileStore(Supplier<RaftPeerId> idSupplier, List<File> dirs) {
+ public FileStore(Supplier<RaftPeerId> idSupplier, RaftProperties properties) {
this.idSupplier = idSupplier;
this.rootSuppliers = new ArrayList<>();
+
+ int writeThreadNum = ConfUtils.getInt(properties::getInt, FileStoreCommon.STATEMACHINE_WRITE_THREAD_NUM,
+ 1, LOG::info);
+ int readThreadNum = ConfUtils.getInt(properties::getInt, FileStoreCommon.STATEMACHINE_READ_THREAD_NUM,
+ 1, LOG::info);
+ int commitThreadNum = ConfUtils.getInt(properties::getInt, FileStoreCommon.STATEMACHINE_COMMIT_THREAD_NUM,
+ 1, LOG::info);
+ int deleteThreadNum = ConfUtils.getInt(properties::getInt, FileStoreCommon.STATEMACHINE_DELETE_THREAD_NUM,
+ 1, LOG::info);
+ writer = Executors.newFixedThreadPool(writeThreadNum);
+ reader = Executors.newFixedThreadPool(readThreadNum);
+ committer = Executors.newFixedThreadPool(commitThreadNum);
+ deleter = Executors.newFixedThreadPool(deleteThreadNum);
+
+ final List<File> dirs = ConfUtils.getFiles(properties::getFiles, FileStoreCommon.STATEMACHINE_DIR_KEY,
+ null, LOG::info);
+ Objects.requireNonNull(dirs, FileStoreCommon.STATEMACHINE_DIR_KEY + " is not set.");
for (File dir : dirs) {
this.rootSuppliers.add(
JavaUtils.memoize(() -> dir.toPath().resolve(getId().toString()).normalize().toAbsolutePath()));
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
index cc769fc..152bc5a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
@@ -25,7 +25,17 @@ import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
public interface FileStoreCommon {
- String STATEMACHINE_DIR_KEY = "example.filestore.statemachine.dir";
+ String STATEMACHINE_PREFIX = "example.filestore.statemachine";
+
+ String STATEMACHINE_DIR_KEY = STATEMACHINE_PREFIX + ".dir";
+
+ String STATEMACHINE_WRITE_THREAD_NUM = STATEMACHINE_PREFIX + ".write.thread.num";
+
+ String STATEMACHINE_READ_THREAD_NUM = STATEMACHINE_PREFIX + ".read.thread.num";
+
+ String STATEMACHINE_COMMIT_THREAD_NUM = STATEMACHINE_PREFIX + ".commit.thread.num";
+
+ String STATEMACHINE_DELETE_THREAD_NUM = STATEMACHINE_PREFIX + ".delete.thread.num";
SizeInBytes MAX_CHUNK_SIZE = SizeInBytes.valueOf(64, TraditionalBinaryPrefix.MEGA);
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 65833e0..64e1efa 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.examples.filestore;
-import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.ExamplesProtos;
import org.apache.ratis.proto.ExamplesProtos.DeleteReplyProto;
@@ -42,11 +41,8 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.FileUtils;
-import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
-import java.util.List;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
public class FileStoreStateMachine extends BaseStateMachine {
@@ -55,10 +51,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
private final FileStore files;
public FileStoreStateMachine(RaftProperties properties) {
- final List<File> dirs = ConfUtils.getFiles(properties::getFiles, FileStoreCommon.STATEMACHINE_DIR_KEY,
- null, LOG::info);
- Objects.requireNonNull(dirs, FileStoreCommon.STATEMACHINE_DIR_KEY + " is not set.");
- this.files = new FileStore(this::getId, dirs);
+ this.files = new FileStore(this::getId, properties);
}
@Override
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index cbf86b1..48c3f1e 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -33,16 +33,22 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
+import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
@@ -51,7 +57,7 @@ import java.util.concurrent.TimeUnit;
public abstract class Client extends SubCommandBase {
@Parameter(names = {"--size"}, description = "Size of each file in bytes", required = true)
- private int fileSizeInBytes;
+ private long fileSizeInBytes;
@Parameter(names = {"--bufferSize"}, description = "Size of buffer in bytes, should less than 4MB, " +
"i.e BUFFER_BYTE_LIMIT_DEFAULT", required = true)
@@ -60,13 +66,17 @@ public abstract class Client extends SubCommandBase {
@Parameter(names = {"--numFiles"}, description = "Number of files to be written", required = true)
private int numFiles;
+ @Parameter(names = {"--storage", "-s"}, description = "Storage dir, eg. --storage dir1 --storage dir2",
+ required = true)
+ private List<File> storageDir = new ArrayList<>();
+
private static final int MAX_THREADS_NUM = 100;
public int getNumThread() {
return numFiles < MAX_THREADS_NUM ? numFiles : MAX_THREADS_NUM;
}
- public int getFileSizeInBytes() {
+ public long getFileSizeInBytes() {
return fileSizeInBytes;
}
@@ -112,6 +122,10 @@ public abstract class Client extends SubCommandBase {
builder.setPrimaryDataStreamServer(getPrimary());
RaftClient client = builder.build();
+ for (File dir : storageDir) {
+ FileUtils.createDirectories(dir);
+ }
+
operation(client);
}
@@ -121,38 +135,69 @@ public abstract class Client extends SubCommandBase {
System.exit(0);
}
- protected List<String> generateFiles() throws IOException {
+ public String getPath(String fileName) {
+ int hash = fileName.hashCode() % storageDir.size();
+ return new File(storageDir.get(Math.abs(hash)), fileName).getAbsolutePath();
+ }
+
+ protected void dropCache() throws InterruptedException, IOException {
+ String[] cmds = {"/bin/sh","-c","echo 3 > /proc/sys/vm/drop_caches"};
+ try {
+ Process pro = Runtime.getRuntime().exec(cmds);
+ pro.waitFor();
+ } catch (Throwable t) {
+ System.err.println("Failed to run command:" + Arrays.toString(cmds) + ":" + t.getMessage());
+ }
+ }
+
+ private CompletableFuture<Long> writeFileAsync(String path, ExecutorService executor) {
+ final CompletableFuture<Long> future = new CompletableFuture<>();
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ future.complete(
+ writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }, executor);
+ return future;
+ }
+
+ protected List<String> generateFiles(ExecutorService executor) {
UUID uuid = UUID.randomUUID();
List<String> paths = new ArrayList<>();
+ List<CompletableFuture<Long>> futures = new ArrayList<>();
for (int i = 0; i < numFiles; i ++) {
- String path = "file-" + uuid + "-" + i;
+ String path = getPath("file-" + uuid + "-" + i);
paths.add(path);
- writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1);
+ futures.add(writeFileAsync(path, executor));
+ }
+
+ for (int i = 0; i < futures.size(); i ++) {
+ long size = futures.get(i).join();
+ if (size != fileSizeInBytes) {
+ System.err.println("Error: path:" + paths.get(i) + " write:" + size +
+ " mismatch expected size:" + fileSizeInBytes);
+ }
}
return paths;
}
- protected void writeFile(String path, int fileSize, int bufferSize, int random) throws IOException {
- RandomAccessFile raf = null;
- try {
- raf = new RandomAccessFile(path, "rw");
- int offset = 0;
+ protected long writeFile(String path, long fileSize, long bufferSize, int random) throws IOException {
+ final byte[] buffer = new byte[Math.toIntExact(bufferSize)];
+ long offset = 0;
+ try(RandomAccessFile raf = new RandomAccessFile(path, "rw")) {
while (offset < fileSize) {
- final int remaining = fileSize - offset;
- final int chunkSize = Math.min(remaining, bufferSize);
- byte[] buffer = new byte[chunkSize];
- for (int i = 0; i < chunkSize; i ++) {
- buffer[i]= (byte) (i % random);
- }
- raf.write(buffer);
+ final long remaining = fileSize - offset;
+ final long chunkSize = Math.min(remaining, bufferSize);
+ ThreadLocalRandom.current().nextBytes(buffer);
+ raf.write(buffer, 0, Math.toIntExact(chunkSize));
offset += chunkSize;
}
- } finally {
- if (raf != null) {
- raf.close();
- }
}
+ return offset;
}
protected abstract void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException;
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 76c4a8a..9afc4bd 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -117,12 +117,12 @@ public class DataStream extends Client {
stop(client);
}
- List<String> paths = generateFiles();
+ final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
+ List<String> paths = generateFiles(executor);
+ dropCache();
FileStoreClient fileStoreClient = new FileStoreClient(client);
System.out.println("Starting DataStream write now ");
- final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
-
long startTime = System.currentTimeMillis();
long totalWrittenBytes = waitStreamFinish(streamWrite(paths, fileStoreClient, executor));
@@ -230,7 +230,7 @@ public class DataStream extends Client {
}
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- final DataStreamOutput out = client.getStreamOutput(path, fileSize);
+ final DataStreamOutput out = client.getStreamOutput(file.getName(), fileSize);
try (FileInputStream fis = new FileInputStream(file)) {
final FileChannel in = fis.getChannel();
for (long offset = 0L; offset < fileSize; ) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index f211f29..77cec3a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -48,12 +48,12 @@ public class LoadGen extends Client {
@Override
protected void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException {
- List<String> paths = generateFiles();
+ final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
+ List<String> paths = generateFiles(executor);
+ dropCache();
FileStoreClient fileStoreClient = new FileStoreClient(client);
System.out.println("Starting Async write now ");
- final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
-
long startTime = System.currentTimeMillis();
long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, fileStoreClient, executor));
@@ -99,7 +99,7 @@ public class LoadGen extends Client {
try (FileInputStream fis = new FileInputStream(file)) {
final FileChannel in = fis.getChannel();
for (long offset = 0L; offset < getFileSizeInBytes(); ) {
- offset += write(in, offset, fileStoreClient, path, futures);
+ offset += write(in, offset, fileStoreClient, file.getName(), futures);
}
} catch (Throwable e) {
future.completeExceptionally(e);
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 2ab348b..b94b6c8 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -31,7 +31,6 @@ import org.apache.ratis.metrics.JVMMetrics;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -44,10 +43,8 @@ import org.apache.ratis.util.TimeDuration;
import java.io.File;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
* Class to start a ratis arithmetic example server.
@@ -62,6 +59,18 @@ public class Server extends SubCommandBase {
required = true)
private List<File> storageDir = new ArrayList<>();
+ @Parameter(names = {"--writeThreadNum"}, description = "Number of write thread")
+ private int writeThreadNum = 20;
+
+ @Parameter(names = {"--readThreadNum"}, description = "Number of read thread")
+ private int readThreadNum = 20;
+
+ @Parameter(names = {"--commitThreadNum"}, description = "Number of commit thread")
+ private int commitThreadNum = 3;
+
+ @Parameter(names = {"--deleteThreadNum"}, description = "Number of delete thread")
+ private int deleteThreadNum = 3;
+
@Override
public void run() throws Exception {
JVMMetrics.initJvmMetrics(TimeDuration.valueOf(10, TimeUnit.SECONDS));
@@ -82,8 +91,11 @@ public class Server extends SubCommandBase {
RaftServerConfigKeys.setStorageDir(properties, storageDir);
RaftServerConfigKeys.Write.setElementLimit(properties, 40960);
RaftServerConfigKeys.Write.setByteLimit(properties, SizeInBytes.valueOf("1000MB"));
- ConfUtils.setFiles(properties::setFiles, FileStoreCommon.STATEMACHINE_DIR_KEY,
- storageDir);
+ ConfUtils.setFiles(properties::setFiles, FileStoreCommon.STATEMACHINE_DIR_KEY, storageDir);
+ ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_WRITE_THREAD_NUM, writeThreadNum);
+ ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_READ_THREAD_NUM, readThreadNum);
+ ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_COMMIT_THREAD_NUM, commitThreadNum);
+ ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_DELETE_THREAD_NUM, deleteThreadNum);
StateMachine stateMachine = new FileStoreStateMachine(properties);
final RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(getRaftGroupId())),
@@ -100,8 +112,4 @@ public class Server extends SubCommandBase {
TimeUnit.SECONDS.sleep(1);
}
}
-
- private Collection<RaftPeer> getOtherRaftPeers(Collection<RaftPeer> peers) {
- return peers.stream().filter(r -> !r.getId().toString().equals(id)).collect(Collectors.toList());
- }
}