You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/24 16:27:22 UTC

[ozone] 04/36: HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 968d1f1ac801408fe6428814527fc672386c98cb
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Jul 28 20:22:53 2021 +0800

    HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  9 +++--
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  | 15 ++++++--
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   | 18 +++++++++
 .../org/apache/hadoop/ozone/audit/DNAction.java    |  3 +-
 .../helpers/ContainerCommandRequestPBHelper.java   |  1 +
 .../common/src/main/resources/ozone-default.xml    | 20 ++++++++++
 .../org/apache/hadoop/hdds/conf/ConfigTag.java     |  3 +-
 .../container/common/impl/HddsDispatcher.java      |  3 +-
 .../transport/server/ratis/XceiverServerRatis.java | 43 +++++++++++++++++++++-
 .../ozone/container/keyvalue/KeyValueHandler.java  | 33 +++++++++++++++++
 .../keyvalue/impl/ChunkManagerDispatcher.java      |  6 +++
 .../keyvalue/impl/FilePerBlockStrategy.java        |  8 ++++
 .../keyvalue/interfaces/ChunkManager.java          |  5 +++
 .../container/common/TestDatanodeStateMachine.java |  6 ++-
 .../hdds/conf/DatanodeRatisServerConfig.java       | 35 ++++++++++++++++++
 .../src/main/proto/DatanodeClientProtocol.proto    |  4 +-
 .../ozone/container/common/TestEndPoint.java       |  4 ++
 .../intellij/runConfigurations/Datanode2.xml       |  2 +-
 .../intellij/runConfigurations/Datanode3.xml       |  2 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |  1 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  3 ++
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |  2 +
 .../server/TestSecureContainerServer.java          |  2 +
 23 files changed, 213 insertions(+), 15 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 25826f3e23..78a0eeb7c5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -273,8 +273,10 @@ public class DatanodeDetails extends NodeImpl implements
         return port;
       }
     }
-    // if no separate admin/server port, return single Ratis one for compat
-    if (name == Name.RATIS_ADMIN || name == Name.RATIS_SERVER) {
+    // if no separate admin/server/datastream port, return single Ratis one for
+    // compat
+    if (name == Name.RATIS_ADMIN || name == Name.RATIS_SERVER ||
+        name == Name.RATIS_DATASTREAM) {
       return getPort(Name.RATIS);
     }
     return null;
@@ -784,7 +786,8 @@ public class DatanodeDetails extends NodeImpl implements
      * Ports that are supported in DataNode.
      */
     public enum Name {
-      STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER;
+      STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER,
+      RATIS_DATASTREAM;
 
       public static final Set<Name> ALL_PORTS = ImmutableSet.copyOf(
           Name.values());
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 8a6193d329..e93eb951ee 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -44,6 +44,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.proto.RaftProtos;
@@ -119,7 +120,9 @@ public final class RatisHelper {
         .setId(toRaftPeerId(dn))
         .setAddress(toRaftPeerAddress(dn, Port.Name.RATIS_SERVER))
         .setAdminAddress(toRaftPeerAddress(dn, Port.Name.RATIS_ADMIN))
-        .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS));
+        .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS))
+        .setDataStreamAddress(
+            toRaftPeerAddress(dn, Port.Name.RATIS_DATASTREAM));
   }
 
   private static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
@@ -173,6 +176,7 @@ public final class RatisHelper {
       ConfigurationSource ozoneConfiguration) throws IOException {
     return newRaftClient(rpcType,
         toRaftPeerId(pipeline.getLeaderNode()),
+        toRaftPeer(pipeline.getFirstNode()),
         newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
             pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration);
   }
@@ -192,7 +196,7 @@ public final class RatisHelper {
   public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
       RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
       ConfigurationSource configuration) {
-    return newRaftClient(rpcType, leader.getId(),
+    return newRaftClient(rpcType, leader.getId(), leader,
         newRaftGroup(Collections.singletonList(leader)), retryPolicy,
         tlsConfig, configuration);
   }
@@ -200,14 +204,14 @@ public final class RatisHelper {
   public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
       RetryPolicy retryPolicy,
       ConfigurationSource ozoneConfiguration) {
-    return newRaftClient(rpcType, leader.getId(),
+    return newRaftClient(rpcType, leader.getId(), leader,
         newRaftGroup(Collections.singletonList(leader)), retryPolicy, null,
         ozoneConfiguration);
   }
 
   @SuppressWarnings("checkstyle:ParameterNumber")
   private static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
-      RaftGroup group, RetryPolicy retryPolicy,
+      RaftPeer primary, RaftGroup group, RetryPolicy retryPolicy,
       GrpcTlsConfig tlsConfig, ConfigurationSource ozoneConfiguration) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("newRaftClient: {}, leader={}, group={}",
@@ -221,6 +225,7 @@ public final class RatisHelper {
     return RaftClient.newBuilder()
         .setRaftGroup(group)
         .setLeaderId(leader)
+        .setPrimaryDataStreamServer(primary)
         .setProperties(properties)
         .setParameters(setClientTlsConf(rpcType, tlsConfig))
         .setRetryPolicy(retryPolicy)
@@ -278,6 +283,8 @@ public final class RatisHelper {
   public static RaftProperties setRpcType(RaftProperties properties,
       RpcType rpcType) {
     RaftConfigKeys.Rpc.setType(properties, rpcType);
+    RaftConfigKeys.DataStream.setType(properties,
+        SupportedDataStreamType.NETTY);
     return properties;
   }
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index c9cecccc5f..7ee874a60b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -57,6 +57,12 @@ public final class OzoneConfigKeys {
   public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
       false;
 
+  public static final String DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT =
+      "dfs.container.ratis.datastream.random.port";
+  public static final boolean
+      DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT_DEFAULT =
+      false;
+
   public static final String DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY =
       "dfs.container.chunk.write.sync";
   public static final boolean DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT = false;
@@ -79,6 +85,18 @@ public final class OzoneConfigKeys {
       "dfs.container.ratis.server.port";
   public static final int DFS_CONTAINER_RATIS_SERVER_PORT_DEFAULT = 9856;
 
+  /**
+   * Ratis Port where containers listen to datastream requests.
+   */
+  public static final String DFS_CONTAINER_RATIS_DATASTREAM_ENABLE
+      = "dfs.container.ratis.datastream.enable";
+  public static final boolean DFS_CONTAINER_RATIS_DATASTREAM_ENABLE_DEFAULT
+      = true;
+  public static final String DFS_CONTAINER_RATIS_DATASTREAM_PORT
+      = "dfs.container.ratis.datastream.port";
+  public static final int DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT
+      = 9855;
+
   /**
    * When set to true, allocate a random free port for ozone container, so that
    * a mini cluster is able to launch multiple containers on a node.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index 1c87f2bdeb..73aff9ac83 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -38,7 +38,8 @@ public enum DNAction implements AuditAction {
   PUT_SMALL_FILE,
   GET_SMALL_FILE,
   CLOSE_CONTAINER,
-  GET_COMMITTED_BLOCK_LENGTH;
+  GET_COMMITTED_BLOCK_LENGTH,
+  STREAM_INIT;
 
   @Override
   public String getAction() {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
index a13f164eec..4d7f0f37c4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
@@ -187,6 +187,7 @@ public final class ContainerCommandRequestPBHelper {
     case GetSmallFile     : return DNAction.GET_SMALL_FILE;
     case CloseContainer   : return DNAction.CLOSE_CONTAINER;
     case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
+    case StreamInit       : return DNAction.STREAM_INIT;
     default :
       LOG.debug("Invalid command type - {}", cmdType);
       return null;
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a10b4315d8..e4d753d1b9 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -53,6 +53,26 @@
     <tag>OZONE, CONTAINER, MANAGEMENT</tag>
     <description>The ipc port number of container.</description>
   </property>
+  <property>
+    <name>dfs.container.ratis.datastream.enable</name>
+    <value>true</value>
+    <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+    <description>If enable datastream ipc of container.</description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.datastream.port</name>
+    <value>9855</value>
+    <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+    <description>The datastream port number of container.</description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.datastream.random.port</name>
+    <value>false</value>
+    <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+    <description>Allocates a random free port for ozone container datastream.
+      This is used only while running unit tests.
+    </description>
+  </property>
   <property>
     <name>dfs.container.ipc.random.port</name>
     <value>false</value>
diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 8cf584d75f..3728a0b1f5 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@ -46,5 +46,6 @@ public enum ConfigTag {
   DELETION,
   HA,
   BALANCER,
-  UPGRADE
+  UPGRADE,
+  DATASTREAM
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 83ba104439..2e0f1de496 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -199,7 +199,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
     boolean isWriteStage =
         (cmdType == Type.WriteChunk && dispatcherContext != null
             && dispatcherContext.getStage()
-            == DispatcherContext.WriteChunkStage.WRITE_DATA);
+            == DispatcherContext.WriteChunkStage.WRITE_DATA)
+            || (cmdType == Type.StreamInit);
     boolean isWriteCommitStage =
         (cmdType == Type.WriteChunk && dispatcherContext != null
             && dispatcherContext.getStage()
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index c8d715cc60..d69b64cce1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -98,6 +98,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
@@ -129,6 +130,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private int serverPort;
   private int adminPort;
   private int clientPort;
+  private int dataStreamPort;
   private final RaftServer server;
   private final List<ThreadPoolExecutor> chunkExecutors;
   private final ContainerDispatcher dispatcher;
@@ -148,6 +150,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   // Timeout used while calling submitRequest directly.
   private long requestTimeout;
   private boolean shouldDeleteRatisLogDirectory;
+  private boolean streamEnable;
 
   private XceiverServerRatis(DatanodeDetails dd,
       ContainerDispatcher dispatcher, ContainerController containerController,
@@ -157,6 +160,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     Objects.requireNonNull(dd, "id == null");
     datanodeDetails = dd;
     assignPorts();
+    this.streamEnable = conf.getBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE_DEFAULT);
     RaftProperties serverProperties = newRaftProperties();
     this.context = context;
     this.dispatcher = dispatcher;
@@ -213,6 +219,32 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         chunkExecutors, this, conf);
   }
 
+  private void setUpRatisStream(RaftProperties properties) {
+    // set the datastream config
+    if (conf.getBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT,
+        OzoneConfigKeys.
+            DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT_DEFAULT)) {
+      dataStreamPort = 0;
+    } else {
+      dataStreamPort = conf.getInt(
+          OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT,
+          OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT);
+    }
+    NettyConfigKeys.DataStream.setPort(properties, dataStreamPort);
+    int dataStreamAsyncRequestThreadPoolSize =
+        conf.getObject(DatanodeRatisServerConfig.class)
+            .getStreamRequestThreads();
+    RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties,
+        dataStreamAsyncRequestThreadPoolSize);
+    int dataStreamWriteRequestThreadPoolSize =
+        conf.getObject(DatanodeRatisServerConfig.class)
+            .getStreamWriteThreads();
+    RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
+        dataStreamWriteRequestThreadPoolSize);
+  }
+
+  @SuppressWarnings("checkstyle:methodlength")
   private RaftProperties newRaftProperties() {
     final RaftProperties properties = new RaftProperties();
 
@@ -231,6 +263,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
     // set the configs enable and set the stateMachineData sync timeout
     RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true);
+    if (streamEnable) {
+      setUpRatisStream(properties);
+    }
+
     timeUnit = OzoneConfigKeys.
         DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT.getUnit();
     duration = conf.getTimeDuration(
@@ -491,7 +527,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
           Port.Name.RATIS_ADMIN);
       serverPort = getRealPort(serverRpc.getInetSocketAddress(),
           Port.Name.RATIS_SERVER);
-
+      if (streamEnable) {
+        DataStreamServerRpc dataStreamServerRpc =
+            server.getDataStreamServerRpc();
+        dataStreamPort = getRealPort(dataStreamServerRpc.getInetSocketAddress(),
+            Port.Name.RATIS_DATASTREAM);
+      }
       isStarted = true;
     }
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index d0a865fde3..44e5f38a90 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -99,6 +99,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse;
+import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
@@ -224,6 +225,8 @@ public class KeyValueHandler extends Handler {
       return handler.handleDeleteChunk(request, kvContainer);
     case WriteChunk:
       return handler.handleWriteChunk(request, kvContainer, dispatcherContext);
+    case StreamInit:
+      return handler.handleStreamInit(request, kvContainer, dispatcherContext);
     case ListChunk:
       return handler.handleUnsupportedOp(request);
     case CompactChunk:
@@ -250,6 +253,36 @@ public class KeyValueHandler extends Handler {
     return this.blockManager;
   }
 
+  ContainerCommandResponseProto handleStreamInit(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer,
+      DispatcherContext dispatcherContext) {
+    if (!request.hasWriteChunk()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Malformed Write Chunk request. trace ID: {}",
+            request.getTraceID());
+      }
+      return malformedRequest(request);
+    }
+
+    String path = null;
+    try {
+      checkContainerOpen(kvContainer);
+
+      WriteChunkRequestProto writeChunk = request.getWriteChunk();
+      BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
+
+      path = chunkManager
+          .streamInit(kvContainer, blockID);
+
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    }
+
+    return getSuccessResponseBuilder(request)
+        .setMessage(path)
+        .build();
+  }
+
   /**
    * Handles Create Container Request. If successful, adds the container to
    * ContainerSet and sends an ICR to the SCM.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 763647313b..3e2ab46470 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -73,6 +73,12 @@ public class ChunkManagerDispatcher implements ChunkManager {
         .writeChunk(container, blockID, info, data, dispatcherContext);
   }
 
+  public String streamInit(Container container, BlockID blockID)
+      throws StorageContainerException {
+    return selectHandler(container)
+        .streamInit(container, blockID);
+  }
+
   @Override
   public void finishWriteChunks(KeyValueContainer kvContainer,
       BlockData blockData) throws IOException {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 51cd5708d3..9efc6bc351 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -89,6 +89,14 @@ public class FilePerBlockStrategy implements ChunkManager {
         container.getContainerData().getLayoutVersion() == FILE_PER_BLOCK);
   }
 
+  @Override
+  public String streamInit(Container container, BlockID blockID)
+      throws StorageContainerException {
+    checkLayoutVersion(container);
+    File chunkFile = getChunkFile(container, blockID, null);
+    return chunkFile.getAbsolutePath();
+  }
+
   @Override
   public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
       ChunkBuffer data, DispatcherContext dispatcherContext)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 15ff9d6b9d..ba06eebd69 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -104,6 +104,11 @@ public interface ChunkManager {
     // no-op
   }
 
+  default String streamInit(Container container, BlockID blockID)
+      throws StorageContainerException {
+    return null;
+  }
+
   static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo,
       long defaultReadBufferCapacity) {
     long bufferCapacity = 0;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 1337f28ad9..bb1145bb2b 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -81,6 +81,8 @@ public class TestDatanodeStateMachine {
         TimeUnit.MILLISECONDS);
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
+    conf.setBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
     serverAddresses = new ArrayList<>();
     scmServers = new ArrayList<>();
     mockServers = new ArrayList<>();
@@ -215,7 +217,6 @@ public class TestDatanodeStateMachine {
         OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     datanodeDetails.setPort(port);
     ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
-
     try (DatanodeStateMachine stateMachine =
              new DatanodeStateMachine(datanodeDetails, conf, null, null,
                  null)) {
@@ -424,6 +425,8 @@ public class TestDatanodeStateMachine {
         DatanodeDetails.Port.Name.RATIS, 0);
     DatanodeDetails.Port restPort = DatanodeDetails.newPort(
         DatanodeDetails.Port.Name.REST, 0);
+    DatanodeDetails.Port streamPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.RATIS_DATASTREAM, 0);
     return DatanodeDetails.newBuilder()
         .setUuid(UUID.randomUUID())
         .setHostName("localhost")
@@ -431,6 +434,7 @@ public class TestDatanodeStateMachine {
         .addPort(containerPort)
         .addPort(ratisPort)
         .addPort(restPort)
+        .addPort(streamPort)
         .build();
   }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index 25ed4776b7..205d92e955 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import java.time.Duration;
 
 import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATASTREAM;
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS;
@@ -123,6 +124,40 @@ public class DatanodeRatisServerConfig {
     this.leaderNumPendingRequests = leaderNumPendingRequests;
   }
 
+  @Config(key = "datastream.request.threads",
+      defaultValue = "20",
+      type = ConfigType.INT,
+      tags = {OZONE, DATANODE, RATIS, DATASTREAM},
+      description = "Maximum number of threads in the thread pool for " +
+          "datastream request."
+  )
+  private int streamRequestThreads;
+
+  public int getStreamRequestThreads() {
+    return streamRequestThreads;
+  }
+
+  public void setStreamRequestThreads(int streamRequestThreads) {
+    this.streamRequestThreads = streamRequestThreads;
+  }
+
+  @Config(key = "datastream.write.threads",
+      defaultValue = "20",
+      type = ConfigType.INT,
+      tags = {OZONE, DATANODE, RATIS, DATASTREAM},
+      description = "Maximum number of threads in the thread pool for " +
+          "datastream write."
+  )
+  private int streamWriteThreads;
+
+  public int getStreamWriteThreads() {
+    return streamWriteThreads;
+  }
+
+  public void setStreamWriteThreads(int streamWriteThreads) {
+    this.streamWriteThreads = streamWriteThreads;
+  }
+
   @Config(key = "delete.ratis.log.directory",
           defaultValue = "true",
           type = ConfigType.BOOLEAN,
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 3f6e9996d2..62e8b9a55e 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -100,6 +100,8 @@ enum Type {
   GetSmallFile = 16;
   CloseContainer = 17;
   GetCommittedBlockLength = 18;
+
+  StreamInit = 19;
 }
 
 
@@ -396,7 +398,7 @@ enum ChecksumType {
 
 message  WriteChunkRequestProto  {
   required DatanodeBlockID blockID = 1;
-  required ChunkInfo chunkData = 2;
+  optional ChunkInfo chunkData = 2;
   optional bytes data = 3;
 }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 0230109fa4..56a04de02c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -144,6 +144,8 @@ public class TestEndPoint {
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, 1000)) {
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
+      conf.setBoolean(
+          OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
       OzoneContainer ozoneContainer = new OzoneContainer(
           datanodeDetails, conf, getContext(datanodeDetails), null);
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
@@ -168,6 +170,8 @@ public class TestEndPoint {
         true);
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
         true);
+    conf.setBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
     conf.setFromObject(new ReplicationConfig().setPort(0));
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, 1000)) {
diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
index 3d3302030d..040b515b9f 100644
--- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
+++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
@@ -18,7 +18,7 @@
   <configuration default="false" name="Datanode2" type="Application" factoryName="Application">
     <option name="MAIN_CLASS_NAME" value="org.apache.hadoop.ozone.HddsDatanodeService" />
     <module name="ozone-datanode" />
-    <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode2 --set hdds.datanode.dir=/tmp/datanode2/storage --set hdds.datanode.http-address=127.0.0.1:10021 --set dfs.container.ratis.ipc=10022 --set dfs.container.ipc=10023 --set dfs.container.ratis.server.port=10024 --set dfs.container.ratis.admin.port=10025 --set hdds.datanode.replication.port=10026" />
+    <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode2 --set hdds.datanode.dir=/tmp/datanode2/storage --set hdds.datanode.http-address=127.0.0.1:10021 --set dfs.container.ratis.ipc=10022 --set dfs.container.ipc=10023 --set dfs.container.ratis.server.port=10024 --set dfs.container.ratis.admin.port=10025 --set hdds.datanode.replication.port=10026 --set dfs.container.ratis.datastream.port=10027" />
     <option name="VM_PARAMETERS" value="-Dlog4j.configuration=file:hadoop-ozone/dev-support/intellij/log4j.properties" />
     <extension name="coverage">
       <pattern>
diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
index 10b6682a0e..6a3116e0fd 100644
--- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
+++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
@@ -18,7 +18,7 @@
   <configuration default="false" name="Datanode3" type="Application" factoryName="Application">
     <option name="MAIN_CLASS_NAME" value="org.apache.hadoop.ozone.HddsDatanodeService" />
     <module name="ozone-datanode" />
-    <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode3 --set hdds.datanode.dir=/tmp/datanode3/storage --set hdds.datanode.http-address=127.0.0.1:10031 --set dfs.container.ratis.ipc=10032 --set dfs.container.ipc=10033 --set dfs.container.ratis.server.port=10034 --set dfs.container.ratis.admin.port=10035 --set hdds.datanode.replication.port=10036" />
+    <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode3 --set hdds.datanode.dir=/tmp/datanode3/storage --set hdds.datanode.http-address=127.0.0.1:10031 --set dfs.container.ratis.ipc=10032 --set dfs.container.ipc=10033 --set dfs.container.ratis.server.port=10034 --set dfs.container.ratis.admin.port=10035 --set hdds.datanode.replication.port=10036 --set dfs.container.ratis.datastream.port=10037" />
     <option name="VM_PARAMETERS" value="-Dlog4j.configuration=file:hadoop-ozone/dev-support/intellij/log4j.properties" />
     <extension name="coverage">
       <pattern>
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 556a7487c5..b1d9a74209 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -318,6 +318,7 @@ public interface MiniOzoneCluster {
     protected Optional<String> omId = Optional.empty();
     
     protected Boolean randomContainerPort = true;
+    protected Boolean randomContainerStreamPort = true;
     protected Optional<String> datanodeReservedSpace = Optional.empty();
     protected Optional<Integer> chunkSize = Optional.empty();
     protected OptionalInt streamBufferSize = OptionalInt.empty();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 6920dd4df9..a549767481 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -88,6 +88,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_INIT_D
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ADMIN_PORT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_PORT;
@@ -918,6 +919,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
           randomContainerPort);
       conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
           randomContainerPort);
+      conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT,
+          randomContainerStreamPort);
 
       conf.setFromObject(new ReplicationConfig().setPort(0));
     }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index 2f4df638e0..c06647bbe5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -215,6 +215,8 @@ public class TestMiniOzoneCluster {
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
         true);
+    ozoneConf.setBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
     List<DatanodeStateMachine> stateMachines = new ArrayList<>();
     try {
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index cd7c995544..cb2db30c10 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -216,6 +216,8 @@ public class TestSecureContainerServer {
       DatanodeDetails dn, OzoneConfiguration conf) throws IOException {
     conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
         dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
+    conf.setBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
     final String dir = TEST_DIR + dn.getUuid();
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
     final ContainerDispatcher dispatcher = createDispatcher(dn,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org