You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/06 15:45:35 UTC

[ozone] 30/35: HDDS-6388. [Ozone-Streaming] Streaming write support both pipeline model and star model (#3145)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 0d9b3e0e2720bbf9a76fa8bfd15c440b78a97dfd
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Mar 2 23:26:48 2022 +0800

    HDDS-6388. [Ozone-Streaming] Streaming write support both pipeline model and star model (#3145)
---
 .../org/apache/hadoop/hdds/scm/OzoneClientConfig.java     | 15 +++++++++++++++
 .../hadoop/hdds/scm/storage/BlockDataStreamOutput.java    | 13 ++++++++++---
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index ffc4a3ef5a..fd3ab4751f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -89,6 +89,13 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private long streamWindowSize = 64 * 1024 * 1024;
 
+  @Config(key = "datastream.pipeline.mode",
+      defaultValue = "true",
+      description = "Streaming write support both pipeline mode(datanode1->" +
+          "datanode2->datanode3) and star mode(datanode1->datanode2, " +
+          "datanode1->datanode3). By default we use pipeline mode.",
+      tags = ConfigTag.CLIENT)
+  private boolean datastreamPipelineMode = true;
 
   @Config(key = "stream.buffer.increment",
       defaultValue = "0B",
@@ -355,4 +362,12 @@ public class OzoneClientConfig {
   public int getEcReconstructStripeReadPoolLimit() {
     return ecReconstructStripeReadPoolLimit;
   }
+
+  public boolean isDatastreamPipelineMode() {
+    return datastreamPipelineMode;
+  }
+
+  public void setDatastreamPipelineMode(boolean datastreamPipelineMode) {
+    this.datastreamPipelineMode = datastreamPipelineMode;
+  }
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 8b3e32cf41..3df5eb0e12 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -137,6 +137,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
   private XceiverClientMetrics metrics;
   // buffers for which putBlock is yet to be executed
   private List<StreamBuffer> buffersForPutBlock;
+  private boolean isDatastreamPipelineMode;
   /**
    * Creates a new BlockDataStreamOutput.
    *
@@ -154,6 +155,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
   ) throws IOException {
     this.xceiverClientFactory = xceiverClientManager;
     this.config = config;
+    this.isDatastreamPipelineMode = config.isDatastreamPipelineMode();
     this.blockID = new AtomicReference<>(blockID);
     KeyValue keyValue =
         KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
@@ -203,9 +205,14 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     ContainerCommandRequestMessage message =
         ContainerCommandRequestMessage.toMessage(builder.build(), null);
 
-    return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
-    .stream(message.getContent().asReadOnlyByteBuffer(),
-        getRoutingTable(pipeline));
+    if (isDatastreamPipelineMode) {
+      return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
+          .stream(message.getContent().asReadOnlyByteBuffer(),
+              getRoutingTable(pipeline));
+    } else {
+      return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
+          .stream(message.getContent().asReadOnlyByteBuffer());
+    }
   }
 
   public RoutingTable getRoutingTable(Pipeline pipeline) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org