You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/14 07:02:41 UTC
[incubator-ratis] branch master updated: RATIS-1238. Seperate
request and write executor (#353)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 89c886f RATIS-1238. Seperate request and write executor (#353)
89c886f is described below
commit 89c886f2976c5af8a43c3509433b6842ddf2360b
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Mon Dec 14 15:00:24 2020 +0800
RATIS-1238. Seperate request and write executor (#353)
---
.../ratis/examples/common/SubCommandBase.java | 3 ---
.../ratis/examples/filestore/cli/Client.java | 8 +++----
.../ratis/examples/filestore/cli/Server.java | 2 ++
.../ratis/netty/server/DataStreamManagement.java | 19 +++++++++-------
.../apache/ratis/server/RaftServerConfigKeys.java | 26 +++++++++++++++++-----
5 files changed, 36 insertions(+), 22 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
index 15f1c37..f168e79 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
@@ -54,9 +54,6 @@ public abstract class SubCommandBase {
return parsePeers(peers)[0];
}
- public boolean isPrimary(String id) {
- return getPrimary().getId().toString().equals(id);
- }
public abstract void run() throws Exception;
public String getRaftGroupId() {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 48c3f1e..814236b 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -43,7 +43,6 @@ import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -140,7 +139,7 @@ public abstract class Client extends SubCommandBase {
return new File(storageDir.get(Math.abs(hash)), fileName).getAbsolutePath();
}
- protected void dropCache() throws InterruptedException, IOException {
+ protected void dropCache() {
String[] cmds = {"/bin/sh","-c","echo 3 > /proc/sys/vm/drop_caches"};
try {
Process pro = Runtime.getRuntime().exec(cmds);
@@ -154,8 +153,7 @@ public abstract class Client extends SubCommandBase {
final CompletableFuture<Long> future = new CompletableFuture<>();
CompletableFuture.supplyAsync(() -> {
try {
- future.complete(
- writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1));
+ future.complete(writeFile(path, fileSizeInBytes, bufferSizeInBytes));
} catch (IOException e) {
future.completeExceptionally(e);
}
@@ -185,7 +183,7 @@ public abstract class Client extends SubCommandBase {
return paths;
}
- protected long writeFile(String path, long fileSize, long bufferSize, int random) throws IOException {
+ protected long writeFile(String path, long fileSize, long bufferSize) throws IOException {
final byte[] buffer = new byte[Math.toIntExact(bufferSize)];
long offset = 0;
try(RandomAccessFile raf = new RandomAccessFile(path, "rw")) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index b94b6c8..c8f4f23 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -92,6 +92,8 @@ public class Server extends SubCommandBase {
RaftServerConfigKeys.Write.setElementLimit(properties, 40960);
RaftServerConfigKeys.Write.setByteLimit(properties, SizeInBytes.valueOf("1000MB"));
ConfUtils.setFiles(properties::setFiles, FileStoreCommon.STATEMACHINE_DIR_KEY, storageDir);
+ RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties, writeThreadNum);
+ RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties, writeThreadNum);
ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_WRITE_THREAD_NUM, writeThreadNum);
ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_READ_THREAD_NUM, readThreadNum);
ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_COMMIT_THREAD_NUM, commitThreadNum);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 73aa303..e5ede5e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -202,15 +202,18 @@ public class DataStreamManagement {
private final String name;
private final StreamMap streams = new StreamMap();
- private final Executor executor;
+ private final Executor requestExecutor;
+ private final Executor writeExecutor;
DataStreamManagement(RaftServer server) {
this.server = server;
this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
final RaftProperties properties = server.getProperties();
- this.executor = Executors.newFixedThreadPool(
- RaftServerConfigKeys.DataStream.asyncThreadPoolSize(properties));
+ this.requestExecutor = Executors.newFixedThreadPool(
+ RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
+ this.writeExecutor = Executors.newFixedThreadPool(
+ RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
}
private CompletableFuture<DataStream> computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
@@ -316,7 +319,7 @@ public class DataStreamManagement {
.getRaftClient()
.async());
return asyncRpcApi.sendForward(info.request)
- .thenAcceptAsync(reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply)), executor);
+ .thenAcceptAsync(reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply)), requestExecutor);
} catch (IOException e) {
throw new CompletionException(e);
}
@@ -378,16 +381,16 @@ public class DataStreamManagement {
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA || request.getType() == Type.STREAM_DATA_SYNC) {
- localWrite = info.getLocal().write(buf, request.getType() == Type.STREAM_DATA_SYNC, executor);
+ localWrite = info.getLocal().write(buf, request.getType() == Type.STREAM_DATA_SYNC, writeExecutor);
remoteWrites = info.applyToRemotes(out -> out.write(request));
} else if (request.getType() == Type.STREAM_CLOSE) {
- localWrite = info.getLocal().close(executor);
+ localWrite = info.getLocal().close(writeExecutor);
remoteWrites = info.isPrimary()? info.applyToRemotes(RemoteStream::close): Collections.emptyList();
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
- composeAsync(info.getPrevious(), executor, n -> JavaUtils.allOf(remoteWrites)
+ composeAsync(info.getPrevious(), requestExecutor, n -> JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
if (request.getType() == Type.STREAM_HEADER
|| request.getType() == Type.STREAM_DATA || request.getType() == Type.STREAM_DATA_SYNC) {
@@ -403,7 +406,7 @@ public class DataStreamManagement {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
return null;
- }, executor)).whenComplete((v, exception) -> {
+ }, requestExecutor)).whenComplete((v, exception) -> {
try {
if (exception != null) {
replyDataStreamException(server, exception, info.getRequest(), request, ctx);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index c2b38e7..f2f0596 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -420,16 +420,30 @@ public interface RaftServerConfigKeys {
interface DataStream {
String PREFIX = RaftServerConfigKeys.PREFIX + ".data-stream";
- String ASYNC_THREAD_POOL_SIZE_KEY = PREFIX + ".async.thread.pool.size";
- int ASYNC_THREAD_POOL_SIZE_DEFAULT = 16;
+ String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX + ".async.request.thread.pool.size";
+ int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 16;
- static int asyncThreadPoolSize(RaftProperties properties) {
- return getInt(properties::getInt, ASYNC_THREAD_POOL_SIZE_KEY, ASYNC_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
+ static int asyncRequestThreadPoolSize(RaftProperties properties) {
+ return getInt(properties::getInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY,
+ ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
requireMin(0), requireMax(65536));
}
- static void setAsyncThreadPoolSize(RaftProperties properties, int port) {
- setInt(properties::setInt, ASYNC_THREAD_POOL_SIZE_KEY, port);
+ static void setAsyncRequestThreadPoolSize(RaftProperties properties, int port) {
+ setInt(properties::setInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, port);
+ }
+
+ String ASYNC_WRITE_THREAD_POOL_SIZE_KEY = PREFIX + ".async.write.thread.pool.size";
+ int ASYNC_WRITE_THREAD_POOL_SIZE_DEFAULT = 16;
+
+ static int asyncWriteThreadPoolSize(RaftProperties properties) {
+ return getInt(properties::getInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY,
+ ASYNC_WRITE_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
+ requireMin(0), requireMax(65536));
+ }
+
+ static void setAsyncWriteThreadPoolSize(RaftProperties properties, int port) {
+ setInt(properties::setInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY, port);
}
}