You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/05/24 16:27:27 UTC

[ozone] 09/36: HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit a96e868eb8adb9a59887ddd52398f9ac4374f269
Author: micah zhao <mi...@tencent.com>
AuthorDate: Thu Sep 30 11:21:47 2021 +0800

    HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682)
---
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 40 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index c69af90a91..41e2c48bbb 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,7 +146,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     this.xceiverClient =
         (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
     // Alternatively, stream setup can be delayed till the first chunk write.
-    this.out = setupStream();
+    this.out = setupStream(pipeline);
     this.token = token;
 
     flushPeriod = (int) (config.getStreamBufferFlushSize() / config
@@ -166,7 +168,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
         config.getBytesPerChecksum());
   }
 
-  private DataStreamOutput setupStream() throws IOException {
+  private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
     // Execute a dummy WriteChunk request to get the path of the target file,
     // but does NOT write any data to it.
     ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest =
@@ -184,7 +186,39 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
         ContainerCommandRequestMessage.toMessage(builder.build(), null);
 
     return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
-        .stream(message.getContent().asReadOnlyByteBuffer());
+    .stream(message.getContent().asReadOnlyByteBuffer(),
+        getRoutingTable(pipeline));
+  }
+
+  public RoutingTable getRoutingTable(Pipeline pipeline) {
+    RaftPeerId primaryId = null;
+    List<RaftPeerId> raftPeers = new ArrayList<>();
+
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
+      try {
+        if (dn == pipeline.getFirstNode()) {
+          primaryId = raftPeerId;
+        }
+      } catch (IOException e) {
+        LOG.error("Can not get FirstNode from the pipeline: {} with " +
+            "exception: {}", pipeline.toString(), e.getLocalizedMessage());
+        return null;
+      }
+      raftPeers.add(raftPeerId);
+    }
+
+    RoutingTable.Builder builder = RoutingTable.newBuilder();
+    RaftPeerId previousId = primaryId;
+    for (RaftPeerId peerId : raftPeers) {
+      if (peerId.equals(primaryId)) {
+        continue;
+      }
+      builder.addSuccessor(previousId, peerId);
+      previousId = peerId;
+    }
+
+    return builder.build();
   }
 
   public BlockID getBlockID() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org