You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:18 UTC

[ozone] 13/35: HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 0f5a55a62dd90831b713c013cd7ac0d988c50426
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 1 23:39:53 2021 +0800

    HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782)
---
 .../transport/server/ratis/ContainerStateMachine.java   |  7 +++++--
 .../common/transport/server/ratis/LocalStream.java      | 10 +++++++++-
 .../transport/server/ratis/XceiverServerRatis.java      |  5 -----
 .../hadoop/hdds/conf/DatanodeRatisServerConfig.java     | 17 -----------------
 4 files changed, 14 insertions(+), 25 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index aed81ab5d8..ed5659d1d7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -527,8 +527,11 @@ public class ContainerStateMachine extends BaseStateMachine {
 
         ContainerCommandResponseProto response = runCommand(
             requestProto, context);
-        String path = response.getMessage();
-        return new LocalStream(new StreamDataChannel(Paths.get(path)));
+        final StreamDataChannel channel = new StreamDataChannel(
+            Paths.get(response.getMessage()));
+        final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ?
+            getChunkExecutor(requestProto.getWriteChunk()) : null;
+        return new LocalStream(channel, chunkExecutor);
       } catch (IOException e) {
         throw new CompletionException("Failed to create data stream", e);
       }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
index baae013966..780f874398 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
@@ -23,12 +23,15 @@ import org.apache.ratis.statemachine.StateMachine;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
 
 class LocalStream implements StateMachine.DataStream {
   private final StateMachine.DataChannel dataChannel;
+  private final Executor executor;
 
-  LocalStream(StateMachine.DataChannel dataChannel) {
+  LocalStream(StateMachine.DataChannel dataChannel, Executor executor) {
     this.dataChannel = dataChannel;
+    this.executor = executor;
   }
 
   @Override
@@ -47,4 +50,9 @@ class LocalStream implements StateMachine.DataStream {
       }
     });
   }
+
+  @Override
+  public Executor getExecutor() {
+    return executor;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 2fcc07fc23..6b0ad0e41e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -237,11 +237,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             .getStreamRequestThreads();
     RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties,
         dataStreamAsyncRequestThreadPoolSize);
-    int dataStreamWriteRequestThreadPoolSize =
-        conf.getObject(DatanodeRatisServerConfig.class)
-            .getStreamWriteThreads();
-    RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
-        dataStreamWriteRequestThreadPoolSize);
     int dataStreamClientPoolSize =
         conf.getObject(DatanodeRatisServerConfig.class)
             .getClientPoolSize();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index 3132928abe..058932e769 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -141,23 +141,6 @@ public class DatanodeRatisServerConfig {
     this.streamRequestThreads = streamRequestThreads;
   }
 
-  @Config(key = "datastream.write.threads",
-      defaultValue = "20",
-      type = ConfigType.INT,
-      tags = {OZONE, DATANODE, RATIS, DATASTREAM},
-      description = "Maximum number of threads in the thread pool for " +
-          "datastream write."
-  )
-  private int streamWriteThreads;
-
-  public int getStreamWriteThreads() {
-    return streamWriteThreads;
-  }
-
-  public void setStreamWriteThreads(int streamWriteThreads) {
-    this.streamWriteThreads = streamWriteThreads;
-  }
-
   @Config(key = "datastream.client.pool.size",
       defaultValue = "10",
       type = ConfigType.INT,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org