You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/11/22 11:55:55 UTC

[GitHub] [ozone] guohao-rosicky opened a new pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

guohao-rosicky opened a new pull request #2860:
URL: https://github.com/apache/ozone/pull/2860


   1. On the client side implement SmallFileDataStreamOutput, used to process the upload request less than ChunkSize:
   2. SmallFileDataStreamOutput use ContainerProtos.PutSmallFileRequestProto write data and metadata;
   3. Implement new SmallFileStreamDataChannel on Datanode, used to process Smallfile requests
   
   jira:  https://issues.apache.org/jira/browse/HDDS-4474


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1076122863


   @captainzmc , @guohao-rosicky , sure, I am reviewing this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky edited a comment on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky edited a comment on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1019719778


   > @guohao-rosicky , the change grew from 13kB to 65kB so that it becomes hard to review. The SmallFileDataStreamOutput class is really long.
   > 
   > How about we move the refactoring to a separated JIRA?
   
   How about splitting it into two parts @szetszwo 
   
   1. Data Channle abstraction on the server
   2. write small file (HDDS-4474)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1019836692


   @guohao-rosicky , sure, please do it.  Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1019719778


   > @guohao-rosicky , the change grew from 13kB to 65kB so that it becomes hard to review. The SmallFileDataStreamOutput class is really long.
   > 
   > How about we move the refactoring to a separated JIRA?
   
   How about splitting it into two parts @szetszwo 
   
   1. Data Channle abstraction on the server
   2. write small file (HDDS-4474)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1017448017


   @szetszwo @captainzmc please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] captainzmc commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
captainzmc commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1075909102


   The CI run was successful. @szetszwo Could you help take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] captainzmc commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
captainzmc commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-978946802


   Thanks @guohao-rosicky  for the contribution,
   I want to know why we don't use async write API to write small files? The async write API should use fewer RPC calls when writing small files. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] captainzmc edited a comment on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
captainzmc edited a comment on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-979653032


   > However, the data have to go through the leader so that the network path won't be optimal and the leader may become a hotspot.
   
   Thanks @szetszwo for your explanation, agree with you. let's using Streaming instead of async write.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r755845118



##########
File path: hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
##########
@@ -451,7 +451,7 @@ RPC */
 message PutSmallFileRequestProto {
   required PutBlockRequestProto block = 1;
   required ChunkInfo chunkInfo = 2;
-  required bytes data = 3;
+  optional bytes data = 3;

Review comment:
       Data in PutSmallFileRequestProto is null in setupStream




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r755845118



##########
File path: hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
##########
@@ -451,7 +451,7 @@ RPC */
 message PutSmallFileRequestProto {
   required PutBlockRequestProto block = 1;
   required ChunkInfo chunkInfo = 2;
-  required bytes data = 3;
+  optional bytes data = 3;

Review comment:
       Data in PutSmallFileRequestProto is null in setupStream
   
   ```java
   
   ContainerProtos.PutSmallFileRequestProto putSmallFileRequest =
           ContainerProtos.PutSmallFileRequestProto.newBuilder()
               .setChunkInfo(chunk)
               .setBlock(createBlockRequest)  // data is empty
               .build();
   
       String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
       ContainerProtos.ContainerCommandRequestProto.Builder builder =
           ContainerProtos.ContainerCommandRequestProto.newBuilder()
               .setCmdType(ContainerProtos.Type.StreamInit)
               .setContainerID(blockID.get().getContainerID())
               .setDatanodeUuid(id)
               .setPutSmallFile(putSmallFileRequest);
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-976165716


   @szetszwo @captainzmc  please take a look,
   
   PutSmallFile contains writeChunk and putBlock, which is transmitted to Datanode through stream write.
   
   There is no applyTransaction that goes through containerStateMachine.
   
   Please check my PR,I don't know how to get the BCSID.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r758318422



##########
File path: hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
##########
@@ -451,7 +451,7 @@ RPC */
 message PutSmallFileRequestProto {
   required PutBlockRequestProto block = 1;
   required ChunkInfo chunkInfo = 2;
-  required bytes data = 3;
+  optional bytes data = 3;

Review comment:
       Yes, changing required to optional is backward compatible since the old code always provides the value and the new code can handle it.
   
   However, it is not forward compatible since the old code may not be able to handle the case that the value is missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-979644256


   @captainzmc , this change is an optimization of createStreamKey(..) and createStreamFile(..) in RpcClient for the case that  the given size is smaller than the chunk size.
   
   Using the Async API will have fewer RPC calls and we should implement it.  However, the data have to go through the leader so that the network path won't be optimal and the leader may become a hotspot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1077444324


   > > KeyDataStreamOutput can optionally not specify the data size, SmallFileStreamOutput can be retained, ...
   > 
   > That's why KeyDataStreamOutput is more powerful than SmallFileStreamOutput since it works even if the data size is unknown.
   
   Ok, how can we do the following work better? Does this PR code help us to achieve this function? Can I split it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r755766534



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalDataChannel.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.IOException;
+
+/**
+ * For write state machine data.
+ */
+public interface LocalDataChannel extends StateMachine.DataChannel {

Review comment:
       Let's call it StreamDataChannel.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/SmallFileStreamDataChannel.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.LocalDataChannel;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.ratis.proto.RaftProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class is used to get the DataChannel for streaming.
+ */
+class SmallFileStreamDataChannel implements LocalDataChannel {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SmallFileStreamDataChannel.class);
+
+  private final RandomAccessFile randomAccessFile;
+  private final File file;
+
+  private final Container kvContainer;
+  private final ContainerData containerData;
+  private final ContainerMetrics metrics;

Review comment:
       Move randomAccessFile, file, containerData and metrics and the shared code to a new base class.

##########
File path: hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
##########
@@ -451,7 +451,7 @@ RPC */
 message PutSmallFileRequestProto {
   required PutBlockRequestProto block = 1;
   required ChunkInfo chunkInfo = 2;
-  required bytes data = 3;
+  optional bytes data = 3;

Review comment:
       Why changing data to optional?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r833177203



##########
File path: hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
##########
@@ -566,6 +567,13 @@ public static String getFixedLengthString(String string, int length) {
     return String.format("%1$" + length + "s", string);
   }
 
+  public static byte[] getFixedLengthBytes(int length) {
+    byte[] bytes = new byte[length];
+    Random random = new Random();
+    random.nextBytes(bytes);
+    return bytes;
+  }
+

Review comment:
       Use ThreadLocalRandom and support non-random data as below:
   ```
     public static byte[] generateData(int length, boolean random) {
       final byte[] data = new byte[length];
       if (random) {
         ThreadLocalRandom.current().nextBytes(data);
       } else {
         for (int i = 0; i < length; i++) {
           data[i] = (byte) i;
         }
       }
       return data;
     }
   ```

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamRoutingTable.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class is used to get the RoutingTable for streaming.
+ */
+public final class StreamRoutingTable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(StreamRoutingTable.class);
+
+  private StreamRoutingTable() {
+  }
+
+  public static RoutingTable getRoutingTable(Pipeline pipeline) {

Review comment:
       Move it to RatisHelper.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
##########
@@ -549,18 +551,44 @@ private ContainerCommandResponseProto runCommand(
 
   @Override
   public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
-    return CompletableFuture.supplyAsync(() -> {
-      if (stream == null) {
-        return JavaUtils.completeExceptionally(
-            new IllegalStateException("DataStream is null"));
-      }
-      if (stream.getDataChannel().isOpen()) {
-        return JavaUtils.completeExceptionally(
-            new IllegalStateException(
-                "DataStream: " + stream + " is not closed properly"));
-      } else {
-        return CompletableFuture.completedFuture(null);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException(
+          "DataStream is null"));
+    }
+    final DataChannel dataChannel = stream.getDataChannel();
+    if (dataChannel.isOpen()) {
+      return JavaUtils.completeExceptionally(new IllegalStateException(
+          "DataStream: " + stream + " is not closed properly"));
+    }
+
+    final CompletableFuture<ContainerCommandResponseProto> f;
+    if (dataChannel instanceof SmallFileStreamDataChannel) {
+      f = link(entry, (SmallFileStreamDataChannel) dataChannel);
+    } else if (dataChannel instanceof KeyValueStreamDataChannel) {
+      return CompletableFuture.completedFuture(null);
+    } else {
+      return JavaUtils.completeExceptionally(new IllegalStateException(
+          "Unexpected DataChannel " + dataChannel.getClass()));
+    }
+    return f.whenComplete((res, e) -> {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("PutBlock {} Term: {} Index: {}",
+            res.getResult(), entry.getTerm(), entry.getIndex());
       }
+    });
+  }
+
+  private CompletableFuture<ContainerCommandResponseProto> link(
+      LogEntryProto entry, SmallFileStreamDataChannel smallFileChannel) {
+    return CompletableFuture.supplyAsync(() -> {
+      final DispatcherContext context = new DispatcherContext.Builder()
+          .setTerm(entry.getTerm())
+          .setLogIndex(entry.getIndex())
+          .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
+          .setContainer2BCSIDMap(container2BCSIDMap)
+          .build();
+
+      return runCommand(smallFileChannel.getPutBlockRequest(), context);
     }, executor);

Review comment:
       Pass ContainerCommandRequestProto instead and rename it to runCommandAsync(..)
   ```
     private CompletableFuture<ContainerCommandResponseProto> runCommandAsync(
         ContainerCommandRequestProto requestProto, LogEntryProto entry) {
       return CompletableFuture.supplyAsync(() -> {
         final DispatcherContext context = new DispatcherContext.Builder()
             .setTerm(entry.getTerm())
             .setLogIndex(entry.getIndex())
             .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
   
         return runCommand(requestProto, context);
       }, executor);
     }
   ```

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/SmallFileDataStreamOutput.java
##########
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.StreamBuffer;
+import org.apache.hadoop.hdds.scm.storage.StreamRoutingTable;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * SmallFileDataStreamOutput, only used to write requests smaller than ChunkSize
+ * <p>
+ * TODO : currently not support multi-thread access.
+ */
+public class SmallFileDataStreamOutput implements ByteBufferStreamOutput {

Review comment:
       It seems that the code in this class is copied from BlockDataStreamOutputEntryPool and KeyDataStreamOutput.  We should reuse the code but not copy them.  Otherwise, it is very hard to maintain.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1077432945


   > @guohao-rosicky , have you seen this comment [#2860 (review)](https://github.com/apache/ozone/pull/2860#pullrequestreview-918649078) ?
   
   @szetszwo 
   KeyDataStreamOutput can optionally not specify the data size, SmallFileStreamOutput can be retained, what do you think


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r800366033



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/SmallFileStreamDataChannel.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class is used to get the DataChannel for streaming.
+ */
+class SmallFileStreamDataChannel extends StreamDataChannelBase {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SmallFileStreamDataChannel.class);
+
+  private final Container kvContainer;
+  private final BlockManager blockManager;
+  private BlockData blockData;
+
+  private final int realLen;
+  private int writeLen = 0;
+  private final List<ByteBuffer> metadata = new ArrayList<>();
+  private int metadataLen = 0;
+  private boolean forceClose = false;
+
+  SmallFileStreamDataChannel(File file, Container container,
+                             BlockManager blockManager, long dataLen,
+                             ContainerMetrics metrics)
+      throws StorageContainerException {
+    super(file, container.getContainerData(), metrics);
+    this.blockManager = blockManager;
+    this.kvContainer = container;
+    this.realLen = (int) dataLen;
+  }
+
+  @Override
+  ContainerProtos.Type getType() {
+    return ContainerProtos.Type.PutSmallFile;
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    int srcLen = src.capacity();
+
+    if (srcLen == 0) {
+      forceClose = true;
+      return 0;
+    } else if (writeLen + srcLen > realLen) {
+
+      if (metadataLen > 0) {
+        metadataLen += srcLen;
+        metadata.add(src);
+      } else {
+        metadataLen += (writeLen + srcLen - realLen);
+
+        int dataLen = srcLen - metadataLen;
+        byte[] data = new byte[dataLen];
+        src.get(data, 0, dataLen);
+        super.write(ByteBuffer.wrap(data));
+
+        byte[] meta = new byte[metadataLen];
+        src.get(meta, dataLen, metadataLen);
+        metadata.add(ByteBuffer.wrap(meta));
+      }
+    } else {
+      super.write(src);
+    }
+    writeLen += srcLen;
+    return srcLen;
+  }
+
+  private ByteString asByteString() {
+    ByteBuffer buffer = ByteBuffer.allocate(metadataLen);
+    for (ByteBuffer b : metadata) {
+      buffer.put(b);
+    }
+    buffer.flip();
+    return ByteString.copyFrom(buffer);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (forceClose) {
+      super.close();
+      return;
+    }
+
+    if (writeLen <= realLen || metadataLen <= 0) {
+      String msg = "Put small file write length mismatch realLen: " +
+          realLen + " writeLen: " + writeLen + " metadataLen: " + metadataLen;
+      throw new StorageContainerException(msg,
+          ContainerProtos.Result.PUT_SMALL_FILE_ERROR);
+    }
+
+    ContainerProtos.ContainerCommandRequestProto request =
+        ContainerCommandRequestMessage.toProto(asByteString(), null);
+
+    if (!request.hasPutSmallFile()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Malformed Put Small File request. trace ID: {}",
+            request.getTraceID());
+      }
+      throw new StorageContainerException("Malformed Put Small File request.",
+          ContainerProtos.Result.PUT_SMALL_FILE_ERROR);
+    }
+
+    ContainerProtos.PutSmallFileRequestProto putSmallFileReq =
+        request.getPutSmallFile();
+
+    ContainerProtos.ChunkInfo chunkInfoProto = putSmallFileReq.getChunkInfo();
+    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+    Preconditions.checkNotNull(chunkInfo);
+
+    blockData = BlockData.getFromProtoBuf(
+        putSmallFileReq.getBlock().getBlockData());
+    Preconditions.checkNotNull(blockData);
+
+    List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
+    chunks.add(chunkInfoProto);
+    blockData.setChunks(chunks);
+
+    super.close();
+  }
+
+  @Override
+  public void link(RaftProtos.LogEntryProto entry) throws IOException {

Review comment:
       @szetszwo This is used to update the BCSID in blockData




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r800366861



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/SmallFileStreamDataChannel.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class is used to get the DataChannel for streaming.
+ */
+class SmallFileStreamDataChannel extends StreamDataChannelBase {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SmallFileStreamDataChannel.class);
+
+  private final Container kvContainer;
+  private final BlockManager blockManager;
+  private BlockData blockData;
+
+  private final int realLen;
+  private int writeLen = 0;
+  private final List<ByteBuffer> metadata = new ArrayList<>();
+  private int metadataLen = 0;
+  private boolean forceClose = false;
+
+  SmallFileStreamDataChannel(File file, Container container,
+                             BlockManager blockManager, long dataLen,
+                             ContainerMetrics metrics)
+      throws StorageContainerException {
+    super(file, container.getContainerData(), metrics);
+    this.blockManager = blockManager;
+    this.kvContainer = container;
+    this.realLen = (int) dataLen;
+  }
+
+  @Override
+  ContainerProtos.Type getType() {
+    return ContainerProtos.Type.PutSmallFile;
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {

Review comment:
       @szetszwo BlockData is also transmitted through stream and temporarily stored in channel




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky edited a comment on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky edited a comment on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1077444324


   > > KeyDataStreamOutput can optionally not specify the data size, SmallFileStreamOutput can be retained, ...
   > 
   > That's why KeyDataStreamOutput is more powerful than SmallFileStreamOutput since it works even if the data size is unknown.
   
   Ok, how can we do the following work better? Does this PR code help us to achieve this function? Can I split it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] captainzmc commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
captainzmc commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-979653032


   > However, the data have to go through the leader so that the network path won't be optimal and the leader may become a hotspot.
   
   Thanks @szetszwo for your explanation, agree with you. let's continue using Streaming instead of async write.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] bshashikant commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
bshashikant commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-984736618


   @guohao-rosicky , can you please rebase?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r801249806



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/SmallFileStreamDataChannel.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class is used to get the DataChannel for streaming.
+ */
+class SmallFileStreamDataChannel extends StreamDataChannelBase {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SmallFileStreamDataChannel.class);
+
+  private final Container kvContainer;
+  private final BlockManager blockManager;

Review comment:
       @guohao-rosicky , BlockManager is not used in ContainerStateMachine at all.  We should not use it here.  Otherwise, it is a violation of the abstraction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r801247916



##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/SmallFileStreamDataChannel.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class is used to get the DataChannel for streaming.
+ */
+class SmallFileStreamDataChannel extends StreamDataChannelBase {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SmallFileStreamDataChannel.class);
+
+  private final Container kvContainer;
+  private final BlockManager blockManager;
+  private BlockData blockData;
+
+  private final int realLen;
+  private int writeLen = 0;
+  private final List<ByteBuffer> metadata = new ArrayList<>();
+  private int metadataLen = 0;
+  private boolean forceClose = false;
+
+  SmallFileStreamDataChannel(File file, Container container,
+                             BlockManager blockManager, long dataLen,
+                             ContainerMetrics metrics)
+      throws StorageContainerException {
+    super(file, container.getContainerData(), metrics);
+    this.blockManager = blockManager;
+    this.kvContainer = container;
+    this.realLen = (int) dataLen;
+  }
+
+  @Override
+  ContainerProtos.Type getType() {
+    return ContainerProtos.Type.PutSmallFile;
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    int srcLen = src.capacity();
+
+    if (srcLen == 0) {
+      forceClose = true;
+      return 0;
+    } else if (writeLen + srcLen > realLen) {
+
+      if (metadataLen > 0) {
+        metadataLen += srcLen;
+        metadata.add(src);
+      } else {
+        metadataLen += (writeLen + srcLen - realLen);
+
+        int dataLen = srcLen - metadataLen;
+        byte[] data = new byte[dataLen];
+        src.get(data, 0, dataLen);
+        super.write(ByteBuffer.wrap(data));
+
+        byte[] meta = new byte[metadataLen];
+        src.get(meta, dataLen, metadataLen);
+        metadata.add(ByteBuffer.wrap(meta));
+      }
+    } else {
+      super.write(src);
+    }
+    writeLen += srcLen;
+    return srcLen;
+  }
+
+  private ByteString asByteString() {
+    ByteBuffer buffer = ByteBuffer.allocate(metadataLen);
+    for (ByteBuffer b : metadata) {
+      buffer.put(b);
+    }
+    buffer.flip();
+    return ByteString.copyFrom(buffer);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (forceClose) {
+      super.close();
+      return;
+    }
+
+    if (writeLen <= realLen || metadataLen <= 0) {
+      String msg = "Put small file write length mismatch realLen: " +
+          realLen + " writeLen: " + writeLen + " metadataLen: " + metadataLen;
+      throw new StorageContainerException(msg,
+          ContainerProtos.Result.PUT_SMALL_FILE_ERROR);
+    }
+
+    ContainerProtos.ContainerCommandRequestProto request =
+        ContainerCommandRequestMessage.toProto(asByteString(), null);
+
+    if (!request.hasPutSmallFile()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Malformed Put Small File request. trace ID: {}",
+            request.getTraceID());
+      }
+      throw new StorageContainerException("Malformed Put Small File request.",
+          ContainerProtos.Result.PUT_SMALL_FILE_ERROR);
+    }
+
+    ContainerProtos.PutSmallFileRequestProto putSmallFileReq =
+        request.getPutSmallFile();
+
+    ContainerProtos.ChunkInfo chunkInfoProto = putSmallFileReq.getChunkInfo();
+    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+    Preconditions.checkNotNull(chunkInfo);
+
+    blockData = BlockData.getFromProtoBuf(
+        putSmallFileReq.getBlock().getBlockData());
+    Preconditions.checkNotNull(blockData);
+
+    List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
+    chunks.add(chunkInfoProto);
+    blockData.setChunks(chunks);
+
+    super.close();
+  }
+
+  @Override
+  public void link(RaftProtos.LogEntryProto entry) throws IOException {

Review comment:
       We should update it using the same way as in ContainerStateMachine.applyTransaction(..).  It first build a context and the submit it for execution; see https://github.com/apache/ozone/blob/e8af5823c6bcddd067062138a55229b0565ee59b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java#L837-L871




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-977894698


   @guohao-rosicky, The change looks good but the new TestSmallFileDataStreamOutput failed.  Please take a look.  Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1019292484


   @guohao-rosicky , the change grew from 13kB to 65kB so that it becomes hard to review.  The SmallFileDataStreamOutput class is really long.
   
   How about we move the refactoring to a separated JIRA?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1074634249


   @szetszwo please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky edited a comment on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky edited a comment on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1077444324


   > > KeyDataStreamOutput can optionally not specify the data size, SmallFileStreamOutput can be retained, ...
   > 
   > That's why KeyDataStreamOutput is more powerful than SmallFileStreamOutput since it works even if the data size is unknown.
   
   Ok, how can we do the following work better? Does this PR code help us to achieve this function?  I  can split it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r755872824



##########
File path: hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
##########
@@ -451,7 +451,7 @@ RPC */
 message PutSmallFileRequestProto {
   required PutBlockRequestProto block = 1;
   required ChunkInfo chunkInfo = 2;
-  required bytes data = 3;
+  optional bytes data = 3;

Review comment:
       I see.  Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] bshashikant commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
bshashikant commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r758036277



##########
File path: hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
##########
@@ -451,7 +451,7 @@ RPC */
 message PutSmallFileRequestProto {
   required PutBlockRequestProto block = 1;
   required ChunkInfo chunkInfo = 2;
-  required bytes data = 3;
+  optional bytes data = 3;

Review comment:
       is this backward compatible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on a change in pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#discussion_r758346764



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/SmallFileDataStreamOutput.java
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * SmallFileDataStreamOutput, only used to write requests smaller than ChunkSize
+ * <p>
+ * TODO : currently not support multi-thread access.
+ */
+public class SmallFileDataStreamOutput implements ByteBufferStreamOutput {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SmallFileDataStreamOutput.class);
+
+  private final AtomicReference<BlockID> blockID;
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private XceiverClientRatis xceiverClient;
+  private final OzoneClientConfig config;
+
+  private final OzoneManagerProtocol omClient;
+
+  private final OpenKeySession openKeySession;
+  private OmKeyLocationInfo keyLocationInfo;
+  private final OmKeyArgs keyArgs;
+
+  private DataStreamOutput dataStreamOutput;
+
+  private boolean unsafeByteBufferConversion;
+
+  private final ByteBuffer currentBuffer;
+  private long versionID;
+  private final Token<OzoneBlockTokenIdentifier> token;
+
+  // error handler
+  private final ExcludeList excludeList;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+
+  public SmallFileDataStreamOutput(
+      OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager,
+      OzoneManagerProtocol omClient,
+      OzoneClientConfig config,
+      boolean unsafeByteBufferConversion
+  ) throws IOException {
+    this.xceiverClientFactory = xceiverClientManager;
+    this.omClient = omClient;
+    this.config = config;
+    this.openKeySession = handler;
+
+    this.keyLocationInfo = handler.getKeyInfo().getLatestVersionLocations()
+        .getLocationList(handler.getOpenVersion()).get(0);
+    this.blockID = new AtomicReference<>(keyLocationInfo.getBlockID());
+    this.versionID = keyLocationInfo.getCreateVersion();
+
+    this.unsafeByteBufferConversion = unsafeByteBufferConversion;
+
+    OmKeyInfo info = handler.getKeyInfo();
+    this.currentBuffer = ByteBuffer.allocate((int) info.getDataSize());
+
+    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+        .setReplicationConfig(info.getReplicationConfig())
+        .setDataSize(info.getDataSize())
+        .setIsMultipartKey(false).build();
+
+    this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
+        config.getMaxRetryCount(), config.getRetryInterval());
+    this.retryCount = 0;
+
+    this.excludeList = new ExcludeList();
+
+    this.token = null;
+  }
+
+
+  @VisibleForTesting
+  public BlockID getBlockID() {
+    return blockID.get();
+  }
+
+  @VisibleForTesting
+  public OmKeyLocationInfo getKeyLocationInfo() {
+    return keyLocationInfo;
+  }
+
+  private OmKeyLocationInfo allocateNewBlock() throws IOException {
+    if (!excludeList.isEmpty()) {
+      LOG.info("Allocating block with {}", excludeList);
+    }
+    OmKeyLocationInfo omKeyLocationInfo =
+        omClient.allocateBlock(keyArgs, openKeySession.getId(), excludeList);
+
+    this.keyLocationInfo = omKeyLocationInfo;
+    this.blockID.set(keyLocationInfo.getBlockID());
+    this.versionID = keyLocationInfo.getCreateVersion();
+
+    return omKeyLocationInfo;
+  }
+
+  @Override
+  public void write(ByteBuffer bb) throws IOException {
+    if (bb == null) {
+      throw new NullPointerException();
+    }
+
+    byte b = bb.array()[0];
+    currentBuffer.put(b);
+  }
+
+  @Override
+  public void write(ByteBuffer bb, int off, int len) throws IOException {
+    if (bb == null) {
+      throw new NullPointerException();
+    }
+
+    byte[] b = bb.array();

Review comment:
       We cannot call array() since the ByteBuffer may not have an array.
   
   Also, using an array means there must be buffer copying inside the code.  This is the reason that we use ByteBuffer but not byte[].

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/SmallFileDataStreamOutput.java
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.security.token.Token;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * SmallFileDataStreamOutput, only used to write requests smaller than ChunkSize
+ * <p>
+ * TODO : currently not support multi-thread access.
+ */
+public class SmallFileDataStreamOutput implements ByteBufferStreamOutput {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SmallFileDataStreamOutput.class);
+
+  private final AtomicReference<BlockID> blockID;
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private XceiverClientRatis xceiverClient;
+  private final OzoneClientConfig config;
+
+  private final OzoneManagerProtocol omClient;
+
+  private final OpenKeySession openKeySession;
+  private OmKeyLocationInfo keyLocationInfo;
+  private final OmKeyArgs keyArgs;
+
+  private DataStreamOutput dataStreamOutput;
+
+  private boolean unsafeByteBufferConversion;
+
+  private final ByteBuffer currentBuffer;
+  private long versionID;
+  private final Token<OzoneBlockTokenIdentifier> token;
+
+  // error handler
+  private final ExcludeList excludeList;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+
+  public SmallFileDataStreamOutput(
+      OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager,
+      OzoneManagerProtocol omClient,
+      OzoneClientConfig config,
+      boolean unsafeByteBufferConversion
+  ) throws IOException {
+    this.xceiverClientFactory = xceiverClientManager;
+    this.omClient = omClient;
+    this.config = config;
+    this.openKeySession = handler;
+
+    this.keyLocationInfo = handler.getKeyInfo().getLatestVersionLocations()
+        .getLocationList(handler.getOpenVersion()).get(0);
+    this.blockID = new AtomicReference<>(keyLocationInfo.getBlockID());
+    this.versionID = keyLocationInfo.getCreateVersion();
+
+    this.unsafeByteBufferConversion = unsafeByteBufferConversion;
+
+    OmKeyInfo info = handler.getKeyInfo();
+    this.currentBuffer = ByteBuffer.allocate((int) info.getDataSize());
+
+    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+        .setReplicationConfig(info.getReplicationConfig())
+        .setDataSize(info.getDataSize())
+        .setIsMultipartKey(false).build();
+
+    this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
+        config.getMaxRetryCount(), config.getRetryInterval());
+    this.retryCount = 0;
+
+    this.excludeList = new ExcludeList();
+
+    this.token = null;
+  }
+
+
+  @VisibleForTesting
+  public BlockID getBlockID() {
+    return blockID.get();
+  }
+
+  @VisibleForTesting
+  public OmKeyLocationInfo getKeyLocationInfo() {
+    return keyLocationInfo;
+  }
+
+  private OmKeyLocationInfo allocateNewBlock() throws IOException {
+    if (!excludeList.isEmpty()) {
+      LOG.info("Allocating block with {}", excludeList);
+    }
+    OmKeyLocationInfo omKeyLocationInfo =
+        omClient.allocateBlock(keyArgs, openKeySession.getId(), excludeList);
+
+    this.keyLocationInfo = omKeyLocationInfo;
+    this.blockID.set(keyLocationInfo.getBlockID());
+    this.versionID = keyLocationInfo.getCreateVersion();
+
+    return omKeyLocationInfo;
+  }
+
+  @Override
+  public void write(ByteBuffer bb) throws IOException {
+    if (bb == null) {
+      throw new NullPointerException();
+    }
+
+    byte b = bb.array()[0];
+    currentBuffer.put(b);
+  }
+
+  @Override
+  public void write(ByteBuffer bb, int off, int len) throws IOException {
+    if (bb == null) {
+      throw new NullPointerException();
+    }
+
+    byte[] b = bb.array();
+
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return;
+    }
+
+    if (len > 0) {
+      currentBuffer.put(b, off, len);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    while (true) {
+      try {
+        checkOpen();
+        DataStreamOutput out = maybeInitStream();
+        int i = handleWrite(out);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("write small file success, size: {}", i);
+        }
+      } catch (IOException ee) {
+        handleException(ee);
+        continue;
+      } finally {
+        if (dataStreamOutput != null) {
+          try {
+            dataStreamOutput.close();
+          } catch (Exception e) {
+            LOG.warn("close DataStreamOutput error:", e);
+            throw new IOException("close DataStreamOutput error:", e);
+          }
+        }
+      }
+      return;
+    }
+  }
+
+  private int handleWrite(DataStreamOutput out) throws IOException {
+    if (out != null) {
+      int size = currentBuffer.position();
+      byte[] b = new byte[size];
+      System.arraycopy(currentBuffer.array(), 0, b, 0, size);
+
+      ContainerProtos.ContainerCommandRequestProto putSmallFileRequest =
+          getPutSmallFileRequest(b);
+      putSmallFileToContainer(putSmallFileRequest, out);
+
+      keyArgs.setDataSize(size);
+      keyLocationInfo.setLength(size);
+
+      Map<String, String> metadata = keyArgs.getMetadata();
+      keyArgs.setMetadata(metadata);
+
+      keyArgs.setLocationInfoList(Collections.singletonList(keyLocationInfo));
+      omClient.commitKey(keyArgs, openKeySession.getId());
+      return b.length;
+    } else if (currentBuffer != null && currentBuffer.position() == 0) {
+      keyArgs.setDataSize(0);
+      keyLocationInfo.setLength(0);
+
+      Map<String, String> metadata = keyArgs.getMetadata();
+      keyArgs.setMetadata(metadata);
+
+      keyArgs.setLocationInfoList(Collections.emptyList());
+
+      omClient.commitKey(keyArgs, openKeySession.getId());
+      return 0;
+    }
+    return 0;
+  }
+
+  private void setExceptionAndThrow(IOException ioe) throws IOException {
+    throw ioe;
+  }
+
+  /**
+   * It performs following actions :
+   * a. Updates the committed length at datanode for the current stream in
+   * datanode.
+   * b. Reads the data from the underlying buffer and writes it the next stream.
+   *
+   * @param exception actual exception that occurred
+   * @throws IOException Throws IOException if Write fails
+   */
+  private void handleException(IOException exception) throws IOException {
+    Throwable t = HddsClientUtils.checkForException(exception);
+    Preconditions.checkNotNull(t);
+    boolean retryFailure = checkForRetryFailure(t);
+    boolean containerExclusionException = false;
+    if (!retryFailure) {
+      containerExclusionException = checkIfContainerToExclude(t);
+    }
+
+    long totalSuccessfulFlushedData = 0L;
+    long bufferedDataLen = currentBuffer.position();
+
+    if (containerExclusionException) {
+      LOG.debug(
+          "Encountered exception {}. The last committed block length is {}, "
+              + "uncommitted data length is {} retry count {}", exception,
+          totalSuccessfulFlushedData, bufferedDataLen, retryCount);
+      excludeList
+          .addConatinerId(ContainerID.valueOf(blockID.get().getContainerID()));
+    } else if (xceiverClient != null) {
+      LOG.warn(
+          "Encountered exception {} on the pipeline {}. "
+              + "The last committed block length is {}, "
+              + "uncommitted data length is {} retry count {}", exception,
+          xceiverClient.getPipeline(), totalSuccessfulFlushedData,
+          bufferedDataLen, retryCount);
+      excludeList.addPipeline(xceiverClient.getPipeline().getId());
+    }
+    allocateNewBlock();
+
+    // just clean up the current stream.
+    cleanup(retryFailure);
+
+    if (bufferedDataLen > 0) {
+      // If the data is still cached in the underlying stream, we need to
+      // allocate new block and write this data in the datanode.
+      handleRetry(exception);
+      // reset the retryCount after handling the exception
+      // retryCount = 0;
+    }
+  }
+
+  private void handleRetry(IOException exception) throws IOException {
+    RetryPolicy retryPolicy = retryPolicyMap
+        .get(HddsClientUtils.checkForException(exception).getClass());
+    if (retryPolicy == null) {
+      retryPolicy = retryPolicyMap.get(Exception.class);
+    }
+    RetryPolicy.RetryAction action = null;
+    try {
+      action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
+    } catch (Exception e) {
+      setExceptionAndThrow(new IOException(e));
+    }
+    if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+      String msg = "";
+      if (action.reason != null) {
+        msg = "Retry request failed. " + action.reason;
+        LOG.error(msg, exception);
+      }
+      setExceptionAndThrow(new IOException(msg, exception));
+    }
+
+    // Throw the exception if the thread is interrupted
+    if (Thread.currentThread().isInterrupted()) {
+      LOG.warn("Interrupted while trying for retry");
+      setExceptionAndThrow(exception);
+    }
+    Preconditions.checkArgument(
+        action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+    if (action.delayMillis > 0) {
+      try {
+        Thread.sleep(action.delayMillis);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        IOException ioe = (IOException) new InterruptedIOException(
+            "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+            .initCause(e);
+        setExceptionAndThrow(ioe);
+      }
+    }
+    retryCount++;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Retrying Write request. Already tried {} time(s); " +
+          "retry policy is {} ", retryCount, retryPolicy);
+    }
+    //handleWrite();
+  }
+
+  /**
+   * Checks if the provided exception signifies retry failure in ratis client.
+   * In case of retry failure, ratis client throws RaftRetryFailureException
+   * and all succeeding operations are failed with AlreadyClosedException.
+   */
+  private boolean checkForRetryFailure(Throwable t) {
+    return t instanceof RaftRetryFailureException
+        || t instanceof AlreadyClosedException;
+  }
+
+  // Every container specific exception from datatnode will be seen as
+  // StorageContainerException
+  private boolean checkIfContainerToExclude(Throwable t) {
+    return t instanceof StorageContainerException;
+  }
+
+  private void cleanup(boolean invalidateClient) throws IOException {
+    if (xceiverClientFactory != null) {
+      xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
+    }
+    if (dataStreamOutput != null) {
+      try {
+        dataStreamOutput.close();
+      } catch (Exception e) {
+        throw new IOException("close dataStreamOutput error", e);
+      }
+    }
+    dataStreamOutput = null;
+    xceiverClient = null;
+  }
+
+  private ContainerProtos.ContainerCommandRequestProto getPutSmallFileRequest(
+      byte[] b) throws IOException {
+
+    ContainerProtos.BlockData containerBlockData =
+        ContainerProtos.BlockData.newBuilder()
+            .setBlockID(blockID.get().getDatanodeBlockIDProtobuf())
+            .build();
+    ContainerProtos.PutBlockRequestProto.Builder createBlockRequest =
+        ContainerProtos.PutBlockRequestProto.newBuilder()
+            .setBlockData(containerBlockData);
+
+    int size = b.length;
+
+    ByteString bytes = ByteString.copyFrom(b);
+
+    Checksum checksum =
+        new Checksum(config.getChecksumType(), config.getBytesPerChecksum());
+    final ChecksumData checksumData = checksum.computeChecksum(b);
+
+    ContainerProtos.ChunkInfo chunk =
+        ContainerProtos.ChunkInfo.newBuilder()
+            .setChunkName(blockID.get().getLocalID() + "_chunk_0")
+            .setOffset(0)
+            .setLen(size)
+            .setChecksumData(checksumData.getProtoBufMessage())
+            .build();
+
+    ContainerProtos.PutSmallFileRequestProto putSmallFileRequest =
+        ContainerProtos.PutSmallFileRequestProto.newBuilder()
+            .setChunkInfo(chunk)
+            .setBlock(createBlockRequest)
+            .setData(bytes)

Review comment:
       Do not set the data inside the proto.  We should send only the header in the proto and then send the raw data to the stream.

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/SmallFileStreamDataChannel.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+
+/**
+ * This class is used to get the DataChannel for streaming.
+ */
+class SmallFileStreamDataChannel extends StreamDataChannelBase {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SmallFileStreamDataChannel.class);
+
+  private final Container kvContainer;
+  private final BlockManager blockManager;
+  private BlockData blockData;
+
+  private int len = 0;
+  private final List<ByteBuffer> bufferList = new ArrayList<>();
+
+  SmallFileStreamDataChannel(File file, Container container,
+                             BlockManager blockManager,
+                             ContainerMetrics metrics)
+      throws StorageContainerException {
+    super(file, container.getContainerData(), metrics);
+    this.blockManager = blockManager;
+    this.kvContainer = container;
+  }
+
+  @Override
+  ContainerProtos.Type getType() {
+    return ContainerProtos.Type.PutSmallFile;
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    int srcLen = src.capacity();
+    len += srcLen;
+    bufferList.add(src);
+    return srcLen;
+  }
+
+  private ByteString asByteString() {
+    ByteBuffer buffer = ByteBuffer.allocate(len);
+    for (ByteBuffer byteBuffer : bufferList) {
+      buffer.put(byteBuffer);
+    }
+    buffer.flip();
+    return ByteString.copyFrom(buffer);
+  }

Review comment:
       This method copies buffer twice so that it is not zero buffer copying.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-994595459


   > @guohao-rosicky , can you please rebase?
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo edited a comment on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo edited a comment on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1077441923


   > KeyDataStreamOutput can optionally not specify the data size, SmallFileStreamOutput can be retained, ...
   
   That's why KeyDataStreamOutput is more powerful than SmallFileStreamOutput since it works even if the data size is unknown.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1077441923


   > KeyDataStreamOutput can optionally not specify the data size, SmallFileStreamOutput can be retained, ...
   
   That's why KeyDataStreamOutput is more powerful than SmallFileStreamOutput since it works if the data size is unknown.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] szetszwo commented on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1077467852


   > ..., how can we do the following work better? Does this PR code help us to achieve this function? I can split it.
   
   Yes, I actually suggest you to split and move the common code to #3195 in this comment https://github.com/apache/ozone/pull/3195#issuecomment-1076399308
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] guohao-rosicky edited a comment on pull request #2860: HDDS-4474. [Ozone-Streaming] Use WriteSmallFile to write small file.

Posted by GitBox <gi...@apache.org>.
guohao-rosicky edited a comment on pull request #2860:
URL: https://github.com/apache/ozone/pull/2860#issuecomment-1077444324


   > > KeyDataStreamOutput can optionally not specify the data size, SmallFileStreamOutput can be retained, ...
   > 
   > That's why KeyDataStreamOutput is more powerful than SmallFileStreamOutput since it works even if the data size is unknown.
   
   @szetszwo    Ok, how can we do the following work better? Does this PR code help us to achieve this function?  I  can split it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org