You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2021/11/15 04:46:43 UTC
[ozone] 09/13: HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 5156051cfe167431335705a6a55dae1d377c647f
Author: micah zhao <mi...@tencent.com>
AuthorDate: Thu Sep 30 11:21:47 2021 +0800
HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682)
---
.../hdds/scm/storage/BlockDataStreamOutput.java | 40 ++++++++++++++++++++--
1 file changed, 37 insertions(+), 3 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index c69af90..41e2c48 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,7 +146,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
this.xceiverClient =
(XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
// Alternatively, stream setup can be delayed till the first chunk write.
- this.out = setupStream();
+ this.out = setupStream(pipeline);
this.token = token;
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
@@ -166,7 +168,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
config.getBytesPerChecksum());
}
- private DataStreamOutput setupStream() throws IOException {
+ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
// Execute a dummy WriteChunk request to get the path of the target file,
// but does NOT write any data to it.
ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest =
@@ -184,7 +186,39 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
ContainerCommandRequestMessage.toMessage(builder.build(), null);
return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
- .stream(message.getContent().asReadOnlyByteBuffer());
+ .stream(message.getContent().asReadOnlyByteBuffer(),
+ getRoutingTable(pipeline));
+ }
+
+ public RoutingTable getRoutingTable(Pipeline pipeline) {
+ RaftPeerId primaryId = null;
+ List<RaftPeerId> raftPeers = new ArrayList<>();
+
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
+ try {
+ if (dn == pipeline.getFirstNode()) {
+ primaryId = raftPeerId;
+ }
+ } catch (IOException e) {
+ LOG.error("Can not get FirstNode from the pipeline: {} with " +
+ "exception: {}", pipeline.toString(), e.getLocalizedMessage());
+ return null;
+ }
+ raftPeers.add(raftPeerId);
+ }
+
+ RoutingTable.Builder builder = RoutingTable.newBuilder();
+ RaftPeerId previousId = primaryId;
+ for (RaftPeerId peerId : raftPeers) {
+ if (peerId.equals(primaryId)) {
+ continue;
+ }
+ builder.addSuccessor(previousId, peerId);
+ previousId = peerId;
+ }
+
+ return builder.build();
}
public BlockID getBlockID() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org