You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/06/09 03:05:30 UTC
[ozone] 13/36: HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 12cd5d167c2f835107bacefd76c779c062c13fb7
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 1 23:39:53 2021 +0800
HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782)
---
.../transport/server/ratis/ContainerStateMachine.java | 7 +++++--
.../common/transport/server/ratis/LocalStream.java | 10 +++++++++-
.../transport/server/ratis/XceiverServerRatis.java | 5 -----
.../hadoop/hdds/conf/DatanodeRatisServerConfig.java | 17 -----------------
4 files changed, 14 insertions(+), 25 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 121a6d6bdd..83255e0450 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -527,8 +527,11 @@ public class ContainerStateMachine extends BaseStateMachine {
ContainerCommandResponseProto response = runCommand(
requestProto, context);
- String path = response.getMessage();
- return new LocalStream(new StreamDataChannel(Paths.get(path)));
+ final StreamDataChannel channel = new StreamDataChannel(
+ Paths.get(response.getMessage()));
+ final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ?
+ getChunkExecutor(requestProto.getWriteChunk()) : null;
+ return new LocalStream(channel, chunkExecutor);
} catch (IOException e) {
throw new CompletionException("Failed to create data stream", e);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
index baae013966..780f874398 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
@@ -23,12 +23,15 @@ import org.apache.ratis.statemachine.StateMachine;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
class LocalStream implements StateMachine.DataStream {
private final StateMachine.DataChannel dataChannel;
+ private final Executor executor;
- LocalStream(StateMachine.DataChannel dataChannel) {
+ LocalStream(StateMachine.DataChannel dataChannel, Executor executor) {
this.dataChannel = dataChannel;
+ this.executor = executor;
}
@Override
@@ -47,4 +50,9 @@ class LocalStream implements StateMachine.DataStream {
}
});
}
+
+ @Override
+ public Executor getExecutor() {
+ return executor;
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 2fcc07fc23..6b0ad0e41e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -237,11 +237,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
.getStreamRequestThreads();
RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties,
dataStreamAsyncRequestThreadPoolSize);
- int dataStreamWriteRequestThreadPoolSize =
- conf.getObject(DatanodeRatisServerConfig.class)
- .getStreamWriteThreads();
- RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
- dataStreamWriteRequestThreadPoolSize);
int dataStreamClientPoolSize =
conf.getObject(DatanodeRatisServerConfig.class)
.getClientPoolSize();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index 3132928abe..058932e769 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -141,23 +141,6 @@ public class DatanodeRatisServerConfig {
this.streamRequestThreads = streamRequestThreads;
}
- @Config(key = "datastream.write.threads",
- defaultValue = "20",
- type = ConfigType.INT,
- tags = {OZONE, DATANODE, RATIS, DATASTREAM},
- description = "Maximum number of threads in the thread pool for " +
- "datastream write."
- )
- private int streamWriteThreads;
-
- public int getStreamWriteThreads() {
- return streamWriteThreads;
- }
-
- public void setStreamWriteThreads(int streamWriteThreads) {
- this.streamWriteThreads = streamWriteThreads;
- }
-
@Config(key = "datastream.client.pool.size",
defaultValue = "10",
type = ConfigType.INT,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org