You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2021/11/15 04:46:34 UTC

[ozone] branch HDDS-4454 updated (871f2cd -> 1d9f870)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a change to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git.


    omit 871f2cd  HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782)
    omit 976b141  HDDS-5895. [Ozone-Streaming] Make raft.server.data-stream.client.pool.size configurable (#2766)
    omit 4480394  HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)
    omit 26dd5cd  HDDS-5849. [Ozone-Streaming]Write exceptions occur after checksum is enabled (#2729)
    omit 51664a3  HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682)
    omit 64dab05  HDDS-5742. Avoid unnecessary Bytebuffer conversions (#2673)
    omit c6950c7  HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBufferStreamOutput (#2603)
    omit f6f8715  HDDS-5599.  [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid buffer copying (#2557)
    omit 8f9924e  HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStream to support streaming api (#2495)
    omit b9368a5  HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452)
    omit 6211d74  HDDS-5481. Fix stream() and link() method in ContainerStateMachine. (#2451)
    omit 7787247  HDDS-5452. Add link method to ContainerStateMachine for Ratis streaming (#2422)
    omit 471cadb  HDDS-5366.  [Ozone-Streaming] Implement stream method to ContainerStateMachine. (#2358).  Contributed by mingchao zhao
     add a5d6707  HDDS-5901. Delete on volume/bucket throws fatal error on ofs (#2776)
     add 26824ca  HDDS-5884. OM Validate S3 Auth for write requests. (#2778)
     add d018d00  HDDS-5805. remove containerStateManager V1 code (#2727)
     add f153236  HDDS-5863. Error message having null fields on volume creation (#2752)
     add 5c76856  HDDS-5885. OM Auth Validate for read requests and handle non ratis enabled code. (#2779)
     add 5f0a588  HDDS-5908. MPU getKey can fail, if completeMPU result is still in cache. (#2787)
     add e427b71  HDDS-5883 Change S3G client to set S3 Auth per req (#2775)
     add 73a081a  HDDS-5864. Retry when DN connection issue during getBlock/ReadChunk call during Ozone key Read (#2746)
     add 83f8b04  HDDS-5736. Update navbar from Hadoop to Ozone on doc page (#2792)
     add 47cb26a  HDDS-3983 Ozone RocksDB Iterator wrapper should not expose key() and value() API. (#2402)
     add 565972c  Support more detailed error log when handleFlush in Ozone client (#2795)
     add 1cf43ac  HDDS-5931. Broken link in dist/README (#2799)
     add 9cea16d  HDDS-5933. Ozone Filesystem shaded jars include unnecessary dependencies (#2798)
     add eb7136d  HDDS-3369. Cleanup old write-path of volume in OM. (#2780)
     add c088cd1  HDDS-5925. Fix SecuringOzoneHTTP doc (#2796)
     add ac86851  HDDS-5943. Shutdown ResultHandlerExecutorService for StorageVolumeChecker. (#2806)
     add aa7e083  HDDS-5944. Fix typo in OzoneManagerServiceProviderImpl.java (#2807)
     add 6ebb316  HDDS-5945. bump rocksdb version to 6.25.3 (#2809)
     add 4c2181f  HDDS-5935. Bump Spring to 5.2.18 (#2801)
     add f3b482a  HDDS-5922. ignore deletetransaction when container is not found (#2793)
     add c63479c  HDDS-5839. Make sure buckets created from OFS are in FILE_SYSTEM_OPTIMIZED layout (#2730)
     add 96e2d7f  HDDS-5947. Remove unused mina-core and sshd-core dependencies (#2811)
     add 252fee4  HDDS-5910 Add additional verification for S3 Auth. (#2817)
     add 9d9e780  HDDS-5956. Speed up TestOzoneRpcClientAbstract#testZReadKeyWithUnhealthyContainerReplica (#2820)
     add 0eedab9  HDDS-5929. Make FSO and OBS bucket layouts independent of normalization config flag (#2819)
     add ba3ac30  HDDS-5960. Change option name to bucketlayout instead of type (#2825)
     add 4558e40  HDDS-5970. Remove OMKeyRequest#getBucketLayout overridden method in subclasses (#2830)
     add 9384bd7  HDDS-5820. Intermittent failure in TestPipelinePlacementPolicy#testPickLowestLoadAnchor (#2757)
     add d0e9140  HDDS-5958. Refine container scrub log message. (#2823)
     add 45c1899  HDDS-5937. Inaccurate bucket info returned from bucket list command in Shell (#2832)
     add c2db377  HDDS-5872. Do not failover on some RpcExceptions (#2772)
     add 71f47d4  HDDS-5981. Refactor usage of bucket type. (#2836)
     add 026d878  HDDS-5982. Remove isBucketFSOptimized flag. (#2838)
     new f508854  HDDS-5366.  [Ozone-Streaming] Implement stream method to ContainerStateMachine. (#2358).  Contributed by mingchao zhao
     new e166e91  HDDS-5452. Add link method to ContainerStateMachine for Ratis streaming (#2422)
     new 0cc389a  HDDS-5481. Fix stream() and link() method in ContainerStateMachine. (#2451)
     new 51d1013  HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452)
     new 6c36e82  HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStream to support streaming api (#2495)
     new 51f88cb  HDDS-5599.  [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid buffer copying (#2557)
     new d1cb3f9  HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBufferStreamOutput (#2603)
     new bdd2726  HDDS-5742. Avoid unnecessary Bytebuffer conversions (#2673)
     new 5156051  HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682)
     new ce6f566  HDDS-5849. [Ozone-Streaming]Write exceptions occur after checksum is enabled (#2729)
     new 0936be3  HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)
     new 9037764  HDDS-5895. [Ozone-Streaming] Make raft.server.data-stream.client.pool.size configurable (#2766)
     new 1d9f870  HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (871f2cd)
            \
             N -- N -- N   refs/heads/HDDS-4454 (1d9f870)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |  52 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |   8 +
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |   2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    |  14 +-
 .../hdds/scm/storage/TestBlockInputStream.java     |  32 +-
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |  51 ++
 .../java/org/apache/hadoop/hdds/NodeDetails.java   |   0
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |  31 +-
 .../hdds/scm/storage/ContainerProtocolCalls.java   |   5 -
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   6 +
 .../common/src/main/resources/ozone-default.xml    |  15 +
 .../common/volume/StorageVolumeChecker.java        |   1 +
 .../container/keyvalue/KeyValueContainerCheck.java |   2 +-
 .../common/volume/TestStorageVolumeChecker.java    |   4 +
 hadoop-hdds/docs/content/feature/Topology.md       |   2 +-
 .../docs/content/security/SecuringOzoneHTTP.md     |  20 +-
 hadoop-hdds/docs/content/start/FromSource.md       |   2 +-
 .../themes/ozonedoc/layouts/partials/navbar.html   |   6 +-
 .../themes/ozonedoc/layouts/partials/sidebar.html  |   4 +-
 .../hadoop/hdds/utils/db/RDBStoreIterator.java     |  19 +-
 .../apache/hadoop/hdds/utils/db/TableIterator.java |  12 -
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |  18 -
 .../hadoop/hdds/utils/db/TestRDBStoreIterator.java |  19 +-
 .../scm/block/DeletedBlockLogStateManagerImpl.java |  10 -
 .../hdds/scm/container/ContainerManager.java       |   2 +-
 .../hdds/scm/container/ContainerManagerImpl.java   |   4 +-
 .../hdds/scm/container/ContainerStateManager.java  | 570 ++++-----------------
 .../scm/container/ContainerStateManagerImpl.java   |  15 +-
 .../scm/container/ContainerStateManagerV2.java     | 189 -------
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  24 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  18 +-
 .../scm/container/TestContainerReportHandler.java  | 267 +++++-----
 .../scm/container/TestContainerStateManager.java   |  91 +++-
 .../TestIncrementalContainerReportHandler.java     | 126 +++--
 .../hdds/scm/container/TestReplicationManager.java | 370 +++++++------
 .../scm/container/TestUnknownContainerReport.java  |  47 +-
 .../hdds/scm/ha/TestReplicationAnnotation.java     |   8 +-
 .../org/apache/hadoop/ozone/client/BucketArgs.java |   8 +-
 .../ozone/client/protocol/ClientProtocol.java      |  19 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  50 +-
 hadoop-ozone/common/pom.xml                        |   4 -
 .../main/java/org/apache/hadoop/ozone/OFSPath.java |   3 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |  25 -
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   2 +-
 .../ozone/om/ha/OMFailoverProxyProvider.java       |  11 +-
 .../hadoop/ozone/om/helpers/BucketLayout.java      |  17 +
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |   3 +-
 .../hadoop/ozone/om/helpers/OzoneFSUtils.java      |   7 -
 .../OmDeleteKeys.java => protocol/S3Auth.java}     |  43 +-
 ...nsport.java => OzoneManagerClientProtocol.java} |  33 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  59 ++-
 .../hadoop/ozone/om/helpers/TestOmKeyInfo.java     |  41 +-
 hadoop-ozone/dist/README.md                        |   6 +-
 .../dist/src/main/compose/ozone-mr/common-config   |   1 +
 .../src/main/compose/ozonesecure/docker-config     |   1 +
 .../main/smoketest/security/ozone-secure-fs.robot  |   2 +-
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |  13 +-
 .../fs/ozone/TestOzoneFSWithObjectStoreCreate.java |   3 +
 .../hadoop/fs/ozone/TestOzoneFileInterfaces.java   |   3 +-
 .../fs/ozone/TestOzoneFileInterfacesWithFSO.java   |  25 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |   5 +-
 .../fs/ozone/TestOzoneFileSystemMetrics.java       |   3 +
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |  12 +-
 .../fs/ozone/TestRootedOzoneFileSystemWithFSO.java |   7 -
 .../TestContainerStateManagerIntegration.java      |   2 +-
 .../rpc/TestOzoneClientMultipartUploadWithFSO.java |   5 -
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  10 +-
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java | 100 +++-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |  16 +-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      |   5 +-
 .../hadoop/ozone/om/TestObjectStoreWithFSO.java    |   7 +-
 .../ozone/recon/TestReconWithOzoneManagerFSO.java  |   6 +-
 .../hadoop/ozone/shell/TestOzoneShellHA.java       |   2 +-
 .../src/main/proto/OmClientProtocol.proto          |   2 +-
 .../hadoop/ozone/om/codec/OmKeyInfoCodec.java      |   5 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  55 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   4 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  71 ++-
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |   5 +-
 .../org/apache/hadoop/ozone/om/VolumeManager.java  |  40 +-
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  | 300 -----------
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |   6 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     | 158 +++---
 .../hadoop/ozone/om/request/OMClientRequest.java   |  22 +-
 .../om/request/file/OMDirectoryCreateRequest.java  |   9 +-
 .../file/OMDirectoryCreateRequestWithFSO.java      |   6 +-
 .../ozone/om/request/file/OMFileCreateRequest.java |   5 +
 .../request/file/OMFileCreateRequestWithFSO.java   |   9 +-
 .../ozone/om/request/file/OMFileRequest.java       |  38 +-
 .../om/request/key/OMAllocateBlockRequest.java     |  15 +-
 .../request/key/OMAllocateBlockRequestWithFSO.java |  10 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  12 +-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  13 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |  19 +-
 .../om/request/key/OMKeyCreateRequestWithFSO.java  |  10 +-
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |  12 +-
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  |  10 +-
 .../ozone/om/request/key/OMKeyRenameRequest.java   |   5 +
 .../om/request/key/OMKeyRenameRequestWithFSO.java  |  10 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  10 +-
 .../om/request/key/OMPathsPurgeRequestWithFSO.java |  10 +-
 .../S3InitiateMultipartUploadRequest.java          |  15 +-
 .../S3InitiateMultipartUploadRequestWithFSO.java   |  10 +-
 .../multipart/S3MultipartUploadAbortRequest.java   |  18 +-
 .../S3MultipartUploadAbortRequestWithFSO.java      |  10 +-
 .../S3MultipartUploadCommitPartRequest.java        |  19 +-
 .../S3MultipartUploadCommitPartRequestWithFSO.java |  10 +-
 .../S3MultipartUploadCompleteRequest.java          |  23 +-
 .../S3MultipartUploadCompleteRequestWithFSO.java   |  10 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  48 +-
 .../hadoop/ozone/security/S3SecurityUtil.java      |  82 +++
 .../hadoop/ozone/om/TestOmMetadataManager.java     |   4 +-
 .../ozone/om/request/TestOMRequestUtils.java       |  22 +-
 .../file/TestOMDirectoryCreateRequestWithFSO.java  |  63 ++-
 .../file/TestOMFileCreateRequestWithFSO.java       |  17 +-
 .../om/request/key/TestOMAllocateBlockRequest.java |   7 +-
 .../key/TestOMAllocateBlockRequestWithFSO.java     |   7 +-
 .../om/request/key/TestOMKeyAclRequestWithFSO.java |  11 -
 .../om/request/key/TestOMKeyCommitRequest.java     |  59 ++-
 .../request/key/TestOMKeyCommitRequestWithFSO.java |  17 +-
 .../om/request/key/TestOMKeyCreateRequest.java     |  38 +-
 .../request/key/TestOMKeyCreateRequestWithFSO.java |  11 +-
 .../om/request/key/TestOMKeyDeleteRequest.java     |  17 +-
 .../request/key/TestOMKeyDeleteRequestWithFSO.java |   3 +-
 .../TestS3InitiateMultipartUploadRequest.java      |  27 +-
 ...estS3InitiateMultipartUploadRequestWithFSO.java |   6 +-
 .../s3/multipart/TestS3MultipartRequest.java       |  33 +-
 .../TestS3MultipartUploadAbortRequest.java         |   3 +-
 .../TestS3MultipartUploadAbortRequestWithFSO.java  |   7 +-
 .../TestS3MultipartUploadCommitPartRequest.java    |  18 +-
 ...tS3MultipartUploadCommitPartRequestWithFSO.java |  13 +-
 .../TestS3MultipartUploadCompleteRequest.java      |  22 +-
 ...estS3MultipartUploadCompleteRequestWithFSO.java |  28 +-
 .../file/TestOMFileCreateResponseWithFSO.java      |  13 -
 .../key/TestOMAllocateBlockResponseWithFSO.java    |  13 -
 .../key/TestOMKeyCommitResponseWithFSO.java        |  13 -
 .../key/TestOMKeyCreateResponseWithFSO.java        |  13 -
 .../key/TestOMKeyDeleteResponseWithFSO.java        |  14 -
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |   3 +-
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  17 +-
 .../fs/ozone/BasicRootedOzoneFileSystem.java       |   4 +-
 hadoop-ozone/pom.xml                               |   2 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |   4 +-
 .../ozone/recon/api/TestNSSummaryEndpoint.java     |   3 -
 .../ozone/recon/tasks/TestNSSummaryTask.java       |   2 -
 .../hadoop/ozone/s3/OzoneClientProducer.java       |  71 +--
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   |   5 +
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |  29 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |   1 +
 .../hadoop/ozone/s3/endpoint/RootEndpoint.java     |   5 +
 .../hadoop/ozone/s3/TestOzoneClientProducer.java   |   4 +-
 .../apache/hadoop/ozone/debug/PrefixParser.java    |   2 -
 .../ozone/genesis/BenchMarkOMKeyAllocation.java    | 137 -----
 .../org/apache/hadoop/ozone/genesis/Genesis.java   |   2 +-
 .../ozone/shell/bucket/CreateBucketHandler.java    |   4 +-
 pom.xml                                            |  12 +-
 156 files changed, 2076 insertions(+), 2471 deletions(-)
 rename hadoop-hdds/{framework => common}/src/main/java/org/apache/hadoop/hdds/NodeDetails.java (100%)
 delete mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
 copy hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/{helpers/OmDeleteKeys.java => protocol/S3Auth.java} (56%)
 copy hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/{OmTransport.java => OzoneManagerClientProtocol.java} (56%)
 create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/S3SecurityUtil.java
 delete mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOMKeyAllocation.java

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 02/13: HDDS-5452. Add link method to ContainerStateMachine for Ratis streaming (#2422)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit e166e91d3f96ef7a2fe53af6a1b679d8e2fba2ba
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Sun Jul 18 12:18:10 2021 +0800

    HDDS-5452. Add link method to ContainerStateMachine for Ratis streaming (#2422)
---
 .../transport/server/ratis/ContainerStateMachine.java   | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 1b8def4..5f4bac0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -88,6 +88,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferExce
 import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.ratis.util.TaskQueue;
 import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -523,6 +524,22 @@ public class ContainerStateMachine extends BaseStateMachine {
     }, executor);
   }
 
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    return CompletableFuture.supplyAsync(() -> {
+      if (stream == null) {
+        return JavaUtils.completeExceptionally(
+            new IllegalStateException("DataStream is null"));
+      }
+      if (stream.getDataChannel().isOpen()) {
+        return JavaUtils.completeExceptionally(
+            new IllegalStateException(
+                "DataStream: " + stream + " is not closed properly"));
+      } else {
+        return CompletableFuture.completedFuture(null);
+      }
+    }, executor);
+  }
+
   private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
     int i = (int)(req.getBlockID().getLocalID() % chunkExecutors.size());
     return chunkExecutors.get(i);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 09/13: HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 5156051cfe167431335705a6a55dae1d377c647f
Author: micah zhao <mi...@tencent.com>
AuthorDate: Thu Sep 30 11:21:47 2021 +0800

    HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682)
---
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 40 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index c69af90..41e2c48 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,7 +146,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     this.xceiverClient =
         (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
     // Alternatively, stream setup can be delayed till the first chunk write.
-    this.out = setupStream();
+    this.out = setupStream(pipeline);
     this.token = token;
 
     flushPeriod = (int) (config.getStreamBufferFlushSize() / config
@@ -166,7 +168,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
         config.getBytesPerChecksum());
   }
 
-  private DataStreamOutput setupStream() throws IOException {
+  private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
     // Execute a dummy WriteChunk request to get the path of the target file,
     // but does NOT write any data to it.
     ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest =
@@ -184,7 +186,39 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
         ContainerCommandRequestMessage.toMessage(builder.build(), null);
 
     return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
-        .stream(message.getContent().asReadOnlyByteBuffer());
+    .stream(message.getContent().asReadOnlyByteBuffer(),
+        getRoutingTable(pipeline));
+  }
+
+  public RoutingTable getRoutingTable(Pipeline pipeline) {
+    RaftPeerId primaryId = null;
+    List<RaftPeerId> raftPeers = new ArrayList<>();
+
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString());
+      try {
+        if (dn == pipeline.getFirstNode()) {
+          primaryId = raftPeerId;
+        }
+      } catch (IOException e) {
+        LOG.error("Can not get FirstNode from the pipeline: {} with " +
+            "exception: {}", pipeline.toString(), e.getLocalizedMessage());
+        return null;
+      }
+      raftPeers.add(raftPeerId);
+    }
+
+    RoutingTable.Builder builder = RoutingTable.newBuilder();
+    RaftPeerId previousId = primaryId;
+    for (RaftPeerId peerId : raftPeers) {
+      if (peerId.equals(primaryId)) {
+        continue;
+      }
+      builder.addSuccessor(previousId, peerId);
+      previousId = peerId;
+    }
+
+    return builder.build();
   }
 
   public BlockID getBlockID() {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 01/13: HDDS-5366. [Ozone-Streaming] Implement stream method to ContainerStateMachine. (#2358). Contributed by mingchao zhao

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit f508854a819bbbdf21a1568ba4066bab4319afde
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Jun 23 23:20:27 2021 +0800

    HDDS-5366.  [Ozone-Streaming] Implement stream method to ContainerStateMachine. (#2358).  Contributed by mingchao zhao
---
 .../server/ratis/ContainerStateMachine.java        | 25 ++++++++++
 .../common/transport/server/ratis/LocalStream.java | 50 +++++++++++++++++++
 .../transport/server/ratis/StreamDataChannel.java  | 57 ++++++++++++++++++++++
 3 files changed, 132 insertions(+)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 3b35c97..1b8def4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -23,10 +23,12 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -498,6 +500,29 @@ public class ContainerStateMachine extends BaseStateMachine {
     return raftFuture;
   }
 
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        ContainerCommandRequestProto requestProto =
+            getContainerCommandRequestProto(gid,
+                request.getMessage().getContent());
+        DispatcherContext context =
+            new DispatcherContext.Builder()
+                .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
+                .setContainer2BCSIDMap(container2BCSIDMap)
+                .build();
+
+        ContainerCommandResponseProto response = runCommand(
+            requestProto, context);
+        String path = response.getMessage();
+        return new LocalStream(new StreamDataChannel(Paths.get(path)));
+      } catch (IOException e) {
+        throw new CompletionException("Failed to create data stream", e);
+      }
+    }, executor);
+  }
+
   private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
     int i = (int)(req.getBlockID().getLocalID() % chunkExecutors.size());
     return chunkExecutors.get(i);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
new file mode 100644
index 0000000..baae013
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+class LocalStream implements StateMachine.DataStream {
+  private final StateMachine.DataChannel dataChannel;
+
+  LocalStream(StateMachine.DataChannel dataChannel) {
+    this.dataChannel = dataChannel;
+  }
+
+  @Override
+  public StateMachine.DataChannel getDataChannel() {
+    return dataChannel;
+  }
+
+  @Override
+  public CompletableFuture<?> cleanUp() {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        dataChannel.close();
+        return true;
+      } catch (IOException e) {
+        throw new CompletionException("Failed to close data channel", e);
+      }
+    });
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java
new file mode 100644
index 0000000..3df66e2
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/StreamDataChannel.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+
+class StreamDataChannel implements StateMachine.DataChannel {
+  private final Path path;
+  private final RandomAccessFile randomAccessFile;
+
+  StreamDataChannel(Path path) throws FileNotFoundException {
+    this.path = path;
+    this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
+  }
+
+  @Override
+  public void force(boolean metadata) throws IOException {
+    randomAccessFile.getChannel().force(metadata);
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    return randomAccessFile.getChannel().write(src);
+  }
+
+  @Override
+  public boolean isOpen() {
+    return randomAccessFile.getChannel().isOpen();
+  }
+
+  @Override
+  public void close() throws IOException {
+    randomAccessFile.close();
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 11/13: HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 0936be30064e01b9d343c53f88695aabea907d3e
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Thu Oct 21 12:57:30 2021 +0530

    HDDS-5674.[Ozone-Streaming] Handle client retries on exception (#2701)
---
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 62 ++++++++++++---
 .../hadoop/hdds/scm/storage/StreamBuffer.java      | 46 +++++++++++
 .../hdds/scm/storage/StreamCommitWatcher.java      | 93 ++++++++++++++++++----
 .../client/io/BlockDataStreamOutputEntry.java      | 33 +++++++-
 .../client/io/BlockDataStreamOutputEntryPool.java  | 14 +++-
 .../ozone/client/io/KeyDataStreamOutput.java       | 12 ++-
 .../client/rpc/TestBlockDataStreamOutput.java      | 30 +++++++
 .../apache/hadoop/ozone/container/TestHelper.java  | 20 +++++
 8 files changed, 279 insertions(+), 31 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 2ae0ba7..aada48e 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -92,6 +92,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
 
   private int chunkIndex;
   private final AtomicLong chunkOffset = new AtomicLong();
+
+  // Similar to 'BufferPool' but this list maintains only references
+  // to the ByteBuffers.
+  private List<StreamBuffer> bufferList;
+
   // The IOException will be set by response handling thread in case there is an
   // exception received in the response. If the exception is set, the next
   // request will fail upfront.
@@ -133,7 +138,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
       OzoneClientConfig config,
-      Token<? extends TokenIdentifier> token
+      Token<? extends TokenIdentifier> token,
+      List<StreamBuffer> bufferList
   ) throws IOException {
     this.xceiverClientFactory = xceiverClientManager;
     this.config = config;
@@ -148,7 +154,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     // Alternatively, stream setup can be delayed till the first chunk write.
     this.out = setupStream(pipeline);
     this.token = token;
-
+    this.bufferList = bufferList;
     flushPeriod = (int) (config.getStreamBufferFlushSize() / config
         .getStreamBufferSize());
 
@@ -159,7 +165,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
-    commitWatcher = new StreamCommitWatcher(xceiverClient);
+    commitWatcher = new StreamCommitWatcher(xceiverClient, bufferList);
     totalDataFlushedLength = 0;
     writtenDataLength = 0;
     failedServers = new ArrayList<>(0);
@@ -251,8 +257,11 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     if (len == 0) {
       return;
     }
-    writeChunkToContainer(
-            (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len));
+
+    final StreamBuffer buf = new StreamBuffer(b, off, len);
+    bufferList.add(buf);
+
+    writeChunkToContainer(buf.duplicate());
 
     writtenDataLength += len;
   }
@@ -261,6 +270,10 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     totalDataFlushedLength = writtenDataLength;
   }
 
+  @VisibleForTesting
+  public long getTotalDataFlushedLength() {
+    return totalDataFlushedLength;
+  }
   /**
    * Will be called on the retryPath in case closedContainerException/
    * TimeoutException.
@@ -268,8 +281,27 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
    * @throws IOException if error occurred
    */
 
-  // TODO: We need add new retry policy without depend on bufferPool.
   public void writeOnRetry(long len) throws IOException {
+    if (len == 0) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Retrying write length {} for blockID {}", len, blockID);
+    }
+    int count = 0;
+    while (len > 0) {
+      final StreamBuffer buf = bufferList.get(count);
+      final long writeLen = Math.min(buf.length(), len);
+      final ByteBuffer duplicated = buf.duplicate();
+      if (writeLen != buf.length()) {
+        duplicated.limit(Math.toIntExact(len));
+      }
+      writeChunkToContainer(duplicated);
+      len -= writeLen;
+      count++;
+      writtenDataLength += writeLen;
+    }
+
 
   }
 
@@ -314,6 +346,14 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
       boolean force) throws IOException {
     checkOpen();
     long flushPos = totalDataFlushedLength;
+    final List<StreamBuffer> byteBufferList;
+    if (!force) {
+      Preconditions.checkNotNull(bufferList);
+      byteBufferList = bufferList;
+      Preconditions.checkNotNull(byteBufferList);
+    } else {
+      byteBufferList = null;
+    }
     flush();
     if (close) {
       dataStreamCloseReply = out.closeAsync();
@@ -344,12 +384,12 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
                 "Adding index " + asyncReply.getLogIndex() + " commitMap size "
-                    + commitWatcher.getCommitInfoSetSize() + " flushLength "
+                    + commitWatcher.getCommitInfoMapSize() + " flushLength "
                     + flushPos + " blockID " + blockID);
           }
           // for standalone protocol, logIndex will always be 0.
-          commitWatcher.updateCommitInfoSet(
-              asyncReply.getLogIndex());
+          commitWatcher
+              .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
         }
         return e;
       }, responseExecutor).exceptionally(e -> {
@@ -589,4 +629,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
     setIoException(ex);
     throw getIoException();
   }
+
+  public long getTotalAckDataLength() {
+    return commitWatcher.getTotalAckDataLength();
+  }
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
new file mode 100644
index 0000000..f36019e
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Used for streaming write.
+ */
+public class StreamBuffer {
+  private final ByteBuffer buffer;
+
+  public StreamBuffer(ByteBuffer buffer) {
+    this.buffer = buffer.asReadOnlyBuffer();
+  }
+
+  public StreamBuffer(ByteBuffer buffer, int offset, int length) {
+    this((ByteBuffer) buffer.asReadOnlyBuffer().position(offset)
+        .limit(offset + length));
+  }
+
+  public ByteBuffer duplicate() {
+    return buffer.duplicate();
+  }
+
+  public int length() {
+    return buffer.limit() - buffer.position();
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index c187ffe..3a59d07 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -24,6 +24,7 @@
  */
 package org.apache.hadoop.hdds.scm.storage;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -31,13 +32,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Set;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 /**
  * This class executes watchForCommit on ratis pipeline and releases
@@ -48,7 +52,12 @@ public class StreamCommitWatcher {
   private static final Logger LOG =
       LoggerFactory.getLogger(StreamCommitWatcher.class);
 
-  private Set<Long> commitIndexSet;
+  private Map<Long, List<StreamBuffer>> commitIndexMap;
+  private List<StreamBuffer> bufferList;
+
+  // total data which has been successfully flushed and acknowledged
+  // by all servers
+  private long totalAckDataLength;
 
   // future Map to hold up all putBlock futures
   private ConcurrentHashMap<Long,
@@ -57,18 +66,22 @@ public class StreamCommitWatcher {
 
   private XceiverClientSpi xceiverClient;
 
-  public StreamCommitWatcher(XceiverClientSpi xceiverClient) {
+  public StreamCommitWatcher(XceiverClientSpi xceiverClient,
+      List<StreamBuffer> bufferList) {
     this.xceiverClient = xceiverClient;
-    commitIndexSet = new ConcurrentSkipListSet();
+    commitIndexMap = new ConcurrentSkipListMap<>();
     futureMap = new ConcurrentHashMap<>();
+    this.bufferList = bufferList;
+    totalAckDataLength = 0;
   }
 
-  public void updateCommitInfoSet(long index) {
-    commitIndexSet.add(index);
+  public void updateCommitInfoMap(long index, List<StreamBuffer> buffers) {
+    commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
+        .addAll(buffers);
   }
 
-  int getCommitInfoSetSize() {
-    return commitIndexSet.size();
+  int getCommitInfoMapSize() {
+    return commitIndexMap.size();
   }
 
   /**
@@ -78,12 +91,12 @@ public class StreamCommitWatcher {
    * @throws IOException in case watchForCommit fails
    */
   public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
-    if (!commitIndexSet.isEmpty()) {
+    if (!commitIndexMap.isEmpty()) {
       // wait for the  first commit index in the commitIndex2flushedDataMap
       // to get committed to all or majority of nodes in case timeout
       // happens.
       long index =
-          commitIndexSet.stream().mapToLong(v -> v).min()
+          commitIndexMap.keySet().stream().mapToLong(v -> v).min()
               .getAsLong();
       if (LOG.isDebugEnabled()) {
         LOG.debug("waiting for first index {} to catch up", index);
@@ -102,12 +115,12 @@ public class StreamCommitWatcher {
    */
   public XceiverClientReply streamWatchOnLastIndex()
       throws IOException {
-    if (!commitIndexSet.isEmpty()) {
+    if (!commitIndexMap.isEmpty()) {
       // wait for the  commit index in the commitIndex2flushedDataMap
       // to get committed to all or majority of nodes in case timeout
       // happens.
       long index =
-          commitIndexSet.stream().mapToLong(v -> v).max()
+          commitIndexMap.keySet().stream().mapToLong(v -> v).max()
               .getAsLong();
       if (LOG.isDebugEnabled()) {
         LOG.debug("waiting for last flush Index {} to catch up", index);
@@ -127,9 +140,16 @@ public class StreamCommitWatcher {
    */
   public XceiverClientReply streamWatchForCommit(long commitIndex)
       throws IOException {
+    final long index;
     try {
       XceiverClientReply reply =
           xceiverClient.watchForCommit(commitIndex);
+      if (reply == null) {
+        index = 0;
+      } else {
+        index = reply.getLogIndex();
+      }
+      adjustBuffers(index);
       return reply;
     } catch (InterruptedException e) {
       // Re-interrupt the thread while catching InterruptedException
@@ -140,11 +160,52 @@ public class StreamCommitWatcher {
     }
   }
 
+  void releaseBuffersOnException() {
+    adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+  }
+
+  private void adjustBuffers(long commitIndex) {
+    List<Long> keyList = commitIndexMap.keySet().stream()
+        .filter(p -> p <= commitIndex).collect(Collectors.toList());
+    if (!keyList.isEmpty()) {
+      releaseBuffers(keyList);
+    }
+  }
+
+  private long releaseBuffers(List<Long> indexes) {
+    Preconditions.checkArgument(!commitIndexMap.isEmpty());
+    for (long index : indexes) {
+      Preconditions.checkState(commitIndexMap.containsKey(index));
+      final List<StreamBuffer> buffers = commitIndexMap.remove(index);
+      final long length =
+          buffers.stream().mapToLong(StreamBuffer::length).sum();
+      totalAckDataLength += length;
+      // clear the future object from the future Map
+      final CompletableFuture<ContainerCommandResponseProto> remove =
+          futureMap.remove(totalAckDataLength);
+      if (remove == null) {
+        LOG.error("Couldn't find required future for " + totalAckDataLength);
+        for (Long key : futureMap.keySet()) {
+          LOG.error("Existing acknowledged data: " + key);
+        }
+      }
+      for (StreamBuffer byteBuffer : buffers) {
+        bufferList.remove(byteBuffer);
+      }
+    }
+    return totalAckDataLength;
+  }
+
+  public long getTotalAckDataLength() {
+    return totalAckDataLength;
+  }
+
   private IOException getIOExceptionForWatchForCommit(long commitIndex,
                                                        Exception e) {
     LOG.warn("watchForCommit failed for index {}", commitIndex, e);
     IOException ioException = new IOException(
         "Unexpected Storage Container Exception: " + e.toString(), e);
+    releaseBuffersOnException();
     return ioException;
   }
 
@@ -155,12 +216,12 @@ public class StreamCommitWatcher {
   }
 
   public void cleanup() {
-    if (commitIndexSet != null) {
-      commitIndexSet.clear();
+    if (commitIndexMap != null) {
+      commitIndexMap.clear();
     }
     if (futureMap != null) {
       futureMap.clear();
     }
-    commitIndexSet = null;
+    commitIndexMap = null;
   }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
index f0c3a43..2cd5630 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
 import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.StreamBuffer;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
@@ -32,6 +33,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * Helper class used inside {@link BlockDataStreamOutput}.
@@ -50,6 +52,7 @@ public final class BlockDataStreamOutputEntry
   // the current position of this stream 0 <= currentPosition < length
   private long currentPosition;
   private final Token<OzoneBlockTokenIdentifier> token;
+  private List<StreamBuffer> bufferList;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   private BlockDataStreamOutputEntry(
@@ -58,7 +61,8 @@ public final class BlockDataStreamOutputEntry
       Pipeline pipeline,
       long length,
       Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config
+      OzoneClientConfig config,
+      List<StreamBuffer> bufferList
   ) {
     this.config = config;
     this.byteBufferStreamOutput = null;
@@ -69,6 +73,7 @@ public final class BlockDataStreamOutputEntry
     this.token = token;
     this.length = length;
     this.currentPosition = 0;
+    this.bufferList = bufferList;
   }
 
   long getLength() {
@@ -92,8 +97,8 @@ public final class BlockDataStreamOutputEntry
   private void checkStream() throws IOException {
     if (this.byteBufferStreamOutput == null) {
       this.byteBufferStreamOutput =
-          new BlockDataStreamOutput(blockID, xceiverClientManager,
-              pipeline, config, token);
+          new BlockDataStreamOutput(blockID, xceiverClientManager, pipeline,
+              config, token, bufferList);
     }
   }
 
@@ -151,6 +156,20 @@ public final class BlockDataStreamOutputEntry
     }
   }
 
+  long getTotalAckDataLength() {
+    if (byteBufferStreamOutput != null) {
+      BlockDataStreamOutput out =
+          (BlockDataStreamOutput) this.byteBufferStreamOutput;
+      blockID = out.getBlockID();
+      return out.getTotalAckDataLength();
+    } else {
+      // For a pre allocated block for which no write has been initiated,
+      // the OutputStream will be null here.
+      // In such cases, the default blockCommitSequenceId will be 0
+      return 0;
+    }
+  }
+
   void cleanup(boolean invalidateClient) throws IOException {
     checkStream();
     BlockDataStreamOutput out =
@@ -180,6 +199,7 @@ public final class BlockDataStreamOutputEntry
     private long length;
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
+    private List<StreamBuffer> bufferList;
 
     public Builder setBlockID(BlockID bID) {
       this.blockID = bID;
@@ -219,13 +239,18 @@ public final class BlockDataStreamOutputEntry
       return this;
     }
 
+    public Builder setBufferList(List<StreamBuffer> bList) {
+      this.bufferList = bList;
+      return this;
+    }
+
     public BlockDataStreamOutputEntry build() {
       return new BlockDataStreamOutputEntry(blockID,
           key,
           xceiverClientManager,
           pipeline,
           length,
-          token, config);
+          token, config, bufferList);
     }
   }
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 4bc55de..e49b0b7 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.StreamBuffer;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -59,6 +60,7 @@ public class BlockDataStreamOutputEntryPool {
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private final long openID;
   private final ExcludeList excludeList;
+  private List<StreamBuffer> bufferList;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   public BlockDataStreamOutputEntryPool(
@@ -83,6 +85,7 @@ public class BlockDataStreamOutputEntryPool {
     this.requestID = requestId;
     this.openID = openID;
     this.excludeList = new ExcludeList();
+    this.bufferList = new ArrayList<>();
   }
 
   /**
@@ -142,7 +145,8 @@ public class BlockDataStreamOutputEntryPool {
             .setPipeline(subKeyInfo.getPipeline())
             .setConfig(config)
             .setLength(subKeyInfo.getLength())
-            .setToken(subKeyInfo.getToken());
+            .setToken(subKeyInfo.getToken())
+            .setBufferList(bufferList);
     streamEntries.add(builder.build());
   }
 
@@ -301,4 +305,12 @@ public class BlockDataStreamOutputEntryPool {
   boolean isEmpty() {
     return streamEntries.isEmpty();
   }
+
+  long computeBufferData() {
+    long totalDataLen =0;
+    for (StreamBuffer b : bufferList){
+      totalDataLen += b.length();
+    }
+    return totalDataLen;
+  }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 9bba89d..2540e42 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -278,11 +278,14 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
     }
     Pipeline pipeline = streamEntry.getPipeline();
     PipelineID pipelineId = pipeline.getId();
-
+    long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
+    //set the correct length for the current stream
+    streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
     ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList();
+    long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData();
     if (!failedServers.isEmpty()) {
       excludeList.addDatanodes(failedServers);
     }
@@ -316,6 +319,13 @@ public class KeyDataStreamOutput implements ByteBufferStreamOutput {
       blockDataStreamOutputEntryPool
           .discardPreallocatedBlocks(-1, pipelineId);
     }
+    if (bufferedDataLen > 0) {
+      // If the data is still cached in the underlying stream, we need to
+      // allocate new block and write this data in the datanode.
+      handleRetry(exception, bufferedDataLen);
+      // reset the retryCount after handling the exception
+      retryCount = 0;
+    }
   }
 
   private void markStreamClosed() {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index d3b2d22..05a1019 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -21,15 +21,19 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.TestHelper;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -127,21 +131,25 @@ public class TestBlockDataStreamOutput {
   @Test
   public void testHalfChunkWrite() throws Exception {
     testWrite(chunkSize / 2);
+    testWriteWithFailure(chunkSize/2);
   }
 
   @Test
   public void testSingleChunkWrite() throws Exception {
     testWrite(chunkSize);
+    testWriteWithFailure(chunkSize);
   }
 
   @Test
   public void testMultiChunkWrite() throws Exception {
     testWrite(chunkSize + 50);
+    testWriteWithFailure(chunkSize + 50);
   }
 
   @Test
   public void testMultiBlockWrite() throws Exception {
     testWrite(blockSize + 50);
+    testWriteWithFailure(blockSize + 50);
   }
 
   private void testWrite(int dataLength) throws Exception {
@@ -156,6 +164,28 @@ public class TestBlockDataStreamOutput {
     key.close();
     validateData(keyName, data);
   }
+
+  private void testWriteWithFailure(int dataLength) throws Exception {
+    String keyName = getKeyName();
+    OzoneDataStreamOutput key = createKey(
+        keyName, ReplicationType.RATIS, 0);
+    byte[] data =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    ByteBuffer b = ByteBuffer.wrap(data);
+    key.write(b);
+    KeyDataStreamOutput keyDataStreamOutput =
+        (KeyDataStreamOutput) key.getByteBufStreamOutput();
+    ByteBufferStreamOutput stream =
+        keyDataStreamOutput.getStreamEntries().get(0).getByteBufStreamOutput();
+    Assert.assertTrue(stream instanceof BlockDataStreamOutput);
+    TestHelper.waitForContainerClose(key, cluster);
+    key.write(b);
+    key.close();
+    String dataString = new String(data, UTF_8);
+    validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
+  }
+
   private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
       long size) throws Exception {
     return TestHelper.createStreamKey(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 0e48dd9..82fff08 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -40,7 +40,9 @@ import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.io.BlockDataStreamOutputEntry;
 import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
@@ -189,6 +191,24 @@ public final class TestHelper {
     waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
   }
 
+
+  public static void waitForContainerClose(OzoneDataStreamOutput outputStream,
+      MiniOzoneCluster cluster) throws Exception {
+    KeyDataStreamOutput keyOutputStream =
+        (KeyDataStreamOutput) outputStream.getByteBufStreamOutput();
+    List<BlockDataStreamOutputEntry> streamEntryList =
+        keyOutputStream.getStreamEntries();
+    List<Long> containerIdList = new ArrayList<>();
+    for (BlockDataStreamOutputEntry entry : streamEntryList) {
+      long id = entry.getBlockID().getContainerID();
+      if (!containerIdList.contains(id)) {
+        containerIdList.add(id);
+      }
+    }
+    Assert.assertTrue(!containerIdList.isEmpty());
+    waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+  }
+
   public static void waitForPipelineClose(OzoneOutputStream outputStream,
       MiniOzoneCluster cluster, boolean waitForContainerCreation)
       throws Exception {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 10/13: HDDS-5849. [Ozone-Streaming]Write exceptions occur after checksum is enabled (#2729)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit ce6f566b3dd38a0a829cdadf8964dcff57ae31d5
Author: micah zhao <mi...@tencent.com>
AuthorDate: Tue Oct 12 14:05:07 2021 +0800

    HDDS-5849. [Ozone-Streaming]Write exceptions occur after checksum is enabled (#2729)
---
 .../java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 3 ++-
 .../org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java  | 2 --
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 41e2c48..2ae0ba7 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -519,7 +519,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
       throws IOException {
     final int effectiveChunkSize = buf.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
-    ChecksumData checksumData = checksum.computeChecksum(buf);
+    ChecksumData checksumData = checksum.computeChecksum(
+        buf.asReadOnlyBuffer());
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
         .setOffset(offset)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 6d5401d..d3b2d22 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.client.rpc;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -82,7 +81,6 @@ public class TestBlockDataStreamOutput {
     blockSize = 2 * maxFlushSize;
 
     OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
-    clientConfig.setChecksumType(ChecksumType.NONE);
     clientConfig.setStreamBufferFlushDelay(false);
     conf.setFromObject(clientConfig);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 04/13: HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 51d1013c409929dbc9b72966133d26744503d004
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Jul 28 20:22:53 2021 +0800

    HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  9 +++--
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  | 16 ++++++--
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   | 18 +++++++++
 .../org/apache/hadoop/ozone/audit/DNAction.java    |  3 +-
 .../helpers/ContainerCommandRequestPBHelper.java   |  1 +
 .../common/src/main/resources/ozone-default.xml    | 20 ++++++++++
 .../org/apache/hadoop/hdds/conf/ConfigTag.java     |  3 +-
 .../container/common/impl/HddsDispatcher.java      |  3 +-
 .../transport/server/ratis/XceiverServerRatis.java | 46 +++++++++++++++++++++-
 .../ozone/container/keyvalue/KeyValueHandler.java  | 33 ++++++++++++++++
 .../keyvalue/impl/ChunkManagerDispatcher.java      |  6 +++
 .../keyvalue/impl/FilePerBlockStrategy.java        |  8 ++++
 .../keyvalue/interfaces/ChunkManager.java          |  5 +++
 .../container/common/TestDatanodeStateMachine.java |  6 ++-
 .../TestCreatePipelineCommandHandler.java          |  3 ++
 .../hdds/conf/DatanodeRatisServerConfig.java       | 35 ++++++++++++++++
 .../src/main/proto/DatanodeClientProtocol.proto    |  4 +-
 .../ozone/container/common/TestEndPoint.java       |  4 ++
 .../intellij/runConfigurations/Datanode2.xml       |  2 +-
 .../intellij/runConfigurations/Datanode3.xml       |  2 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |  1 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  3 ++
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |  2 +
 .../server/TestSecureContainerServer.java          |  2 +
 24 files changed, 220 insertions(+), 15 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index aef3c29..835a82c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -273,8 +273,10 @@ public class DatanodeDetails extends NodeImpl implements
         return port;
       }
     }
-    // if no separate admin/server port, return single Ratis one for compat
-    if (name == Name.RATIS_ADMIN || name == Name.RATIS_SERVER) {
+    // if no separate admin/server/datastream port, return single Ratis one for
+    // compat
+    if (name == Name.RATIS_ADMIN || name == Name.RATIS_SERVER ||
+        name == Name.RATIS_DATASTREAM) {
       return getPort(Name.RATIS);
     }
     return null;
@@ -783,7 +785,8 @@ public class DatanodeDetails extends NodeImpl implements
      * Ports that are supported in DataNode.
      */
     public enum Name {
-      STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER;
+      STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER,
+      RATIS_DATASTREAM;
 
       public static final Set<Name> ALL_PORTS = ImmutableSet.copyOf(
           Name.values());
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index e310cc9..138eacd 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -43,6 +43,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.proto.RaftProtos;
@@ -118,7 +119,9 @@ public final class RatisHelper {
         .setId(toRaftPeerId(dn))
         .setAddress(toRaftPeerAddress(dn, Port.Name.RATIS_SERVER))
         .setAdminAddress(toRaftPeerAddress(dn, Port.Name.RATIS_ADMIN))
-        .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS));
+        .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS))
+        .setDataStreamAddress(
+            toRaftPeerAddress(dn, Port.Name.RATIS_DATASTREAM));
   }
 
   private static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
@@ -172,6 +175,7 @@ public final class RatisHelper {
       ConfigurationSource ozoneConfiguration) throws IOException {
     return newRaftClient(rpcType,
         toRaftPeerId(pipeline.getLeaderNode()),
+        toRaftPeer(pipeline.getFirstNode()),
         newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
             pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration);
   }
@@ -191,7 +195,7 @@ public final class RatisHelper {
   public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
       RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
       ConfigurationSource configuration) {
-    return newRaftClient(rpcType, leader.getId(),
+    return newRaftClient(rpcType, leader.getId(), leader,
         newRaftGroup(Collections.singletonList(leader)), retryPolicy,
         tlsConfig, configuration);
   }
@@ -199,14 +203,14 @@ public final class RatisHelper {
   public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
       RetryPolicy retryPolicy,
       ConfigurationSource ozoneConfiguration) {
-    return newRaftClient(rpcType, leader.getId(),
+    return newRaftClient(rpcType, leader.getId(), leader,
         newRaftGroup(Collections.singletonList(leader)), retryPolicy, null,
         ozoneConfiguration);
   }
 
   @SuppressWarnings("checkstyle:ParameterNumber")
   private static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
-      RaftGroup group, RetryPolicy retryPolicy,
+      RaftPeer primary, RaftGroup group, RetryPolicy retryPolicy,
       GrpcTlsConfig tlsConfig, ConfigurationSource ozoneConfiguration) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("newRaftClient: {}, leader={}, group={}",
@@ -214,6 +218,9 @@ public final class RatisHelper {
     }
     final RaftProperties properties = new RaftProperties();
 
+    RaftConfigKeys.DataStream.setType(properties,
+        SupportedDataStreamType.NETTY);
+
     RaftConfigKeys.Rpc.setType(properties, rpcType);
 
     // Set the ratis client headers which are matching with regex.
@@ -223,6 +230,7 @@ public final class RatisHelper {
         .setRaftGroup(group)
         .setLeaderId(leader)
         .setProperties(properties)
+        .setPrimaryDataStreamServer(primary)
         .setRetryPolicy(retryPolicy);
 
     // TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 2bcba59..b60f754 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -57,6 +57,12 @@ public final class OzoneConfigKeys {
   public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
       false;
 
+  public static final String DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT =
+      "dfs.container.ratis.datastream.random.port";
+  public static final boolean
+      DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT_DEFAULT =
+      false;
+
   public static final String DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY =
       "dfs.container.chunk.write.sync";
   public static final boolean DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT = false;
@@ -80,6 +86,18 @@ public final class OzoneConfigKeys {
   public static final int DFS_CONTAINER_RATIS_SERVER_PORT_DEFAULT = 9856;
 
   /**
+   * Ratis Port where containers listen to datastream requests.
+   */
+  public static final String DFS_CONTAINER_RATIS_DATASTREAM_ENABLE
+      = "dfs.container.ratis.datastream.enable";
+  public static final boolean DFS_CONTAINER_RATIS_DATASTREAM_ENABLE_DEFAULT
+      = true;
+  public static final String DFS_CONTAINER_RATIS_DATASTREAM_PORT
+      = "dfs.container.ratis.datastream.port";
+  public static final int DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT
+      = 9855;
+
+  /**
    * When set to true, allocate a random free port for ozone container, so that
    * a mini cluster is able to launch multiple containers on a node.
    */
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index 1c87f2b..73aff9a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -38,7 +38,8 @@ public enum DNAction implements AuditAction {
   PUT_SMALL_FILE,
   GET_SMALL_FILE,
   CLOSE_CONTAINER,
-  GET_COMMITTED_BLOCK_LENGTH;
+  GET_COMMITTED_BLOCK_LENGTH,
+  STREAM_INIT;
 
   @Override
   public String getAction() {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
index 7773828..b2f4674 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
@@ -187,6 +187,7 @@ public final class ContainerCommandRequestPBHelper {
     case GetSmallFile     : return DNAction.GET_SMALL_FILE;
     case CloseContainer   : return DNAction.CLOSE_CONTAINER;
     case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
+    case StreamInit       : return DNAction.STREAM_INIT;
     default :
       LOG.debug("Invalid command type - {}", cmdType);
       return null;
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 4cff33d..54f8e5c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -54,6 +54,26 @@
     <description>The ipc port number of container.</description>
   </property>
   <property>
+    <name>dfs.container.ratis.datastream.enable</name>
+    <value>true</value>
+    <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+    <description>If enable datastream ipc of container.</description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.datastream.port</name>
+    <value>9855</value>
+    <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+    <description>The datastream port number of container.</description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.datastream.random.port</name>
+    <value>false</value>
+    <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+    <description>Allocates a random free port for ozone container datastream.
+      This is used only while running unit tests.
+    </description>
+  </property>
+  <property>
     <name>dfs.container.ipc.random.port</name>
     <value>false</value>
     <tag>OZONE, DEBUG, CONTAINER</tag>
diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 8cf584d..3728a0b 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@ -46,5 +46,6 @@ public enum ConfigTag {
   DELETION,
   HA,
   BALANCER,
-  UPGRADE
+  UPGRADE,
+  DATASTREAM
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 5d03d60..72a8107 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -199,7 +199,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
     boolean isWriteStage =
         (cmdType == Type.WriteChunk && dispatcherContext != null
             && dispatcherContext.getStage()
-            == DispatcherContext.WriteChunkStage.WRITE_DATA);
+            == DispatcherContext.WriteChunkStage.WRITE_DATA)
+            || (cmdType == Type.StreamInit);
     boolean isWriteCommitStage =
         (cmdType == Type.WriteChunk && dispatcherContext != null
             && dispatcherContext.getStage()
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 8c6d3fe..35d3627 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -79,6 +79,7 @@ import io.opentracing.util.GlobalTracer;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.netty.NettyConfigKeys;
@@ -98,6 +99,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
@@ -129,6 +131,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private int serverPort;
   private int adminPort;
   private int clientPort;
+  private int dataStreamPort;
   private final RaftServer server;
   private final List<ThreadPoolExecutor> chunkExecutors;
   private final ContainerDispatcher dispatcher;
@@ -148,6 +151,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   // Timeout used while calling submitRequest directly.
   private long requestTimeout;
   private boolean shouldDeleteRatisLogDirectory;
+  private boolean streamEnable;
 
   private XceiverServerRatis(DatanodeDetails dd,
       ContainerDispatcher dispatcher, ContainerController containerController,
@@ -157,6 +161,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     Objects.requireNonNull(dd, "id == null");
     datanodeDetails = dd;
     assignPorts();
+    this.streamEnable = conf.getBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE_DEFAULT);
     RaftProperties serverProperties = newRaftProperties();
     this.context = context;
     this.dispatcher = dispatcher;
@@ -212,6 +219,34 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         chunkExecutors, this, conf);
   }
 
+  private void setUpRatisStream(RaftProperties properties) {
+    // set the datastream config
+    if (conf.getBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT,
+        OzoneConfigKeys.
+            DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT_DEFAULT)) {
+      dataStreamPort = 0;
+    } else {
+      dataStreamPort = conf.getInt(
+          OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT,
+          OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT);
+    }
+    NettyConfigKeys.DataStream.setPort(properties, dataStreamPort);
+    RaftConfigKeys.DataStream.setType(properties,
+        SupportedDataStreamType.NETTY);
+    int dataStreamAsyncRequestThreadPoolSize =
+        conf.getObject(DatanodeRatisServerConfig.class)
+            .getStreamRequestThreads();
+    RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties,
+        dataStreamAsyncRequestThreadPoolSize);
+    int dataStreamWriteRequestThreadPoolSize =
+        conf.getObject(DatanodeRatisServerConfig.class)
+            .getStreamWriteThreads();
+    RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
+        dataStreamWriteRequestThreadPoolSize);
+  }
+
+  @SuppressWarnings("checkstyle:methodlength")
   private RaftProperties newRaftProperties() {
     final RaftProperties properties = new RaftProperties();
 
@@ -230,6 +265,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
     // set the configs enable and set the stateMachineData sync timeout
     RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true);
+    if (streamEnable) {
+      setUpRatisStream(properties);
+    }
+
     timeUnit = OzoneConfigKeys.
         DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT.getUnit();
     duration = conf.getTimeDuration(
@@ -494,7 +533,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
           Port.Name.RATIS_ADMIN);
       serverPort = getRealPort(serverRpc.getInetSocketAddress(),
           Port.Name.RATIS_SERVER);
-
+      if (streamEnable) {
+        DataStreamServerRpc dataStreamServerRpc =
+            server.getDataStreamServerRpc();
+        dataStreamPort = getRealPort(dataStreamServerRpc.getInetSocketAddress(),
+            Port.Name.RATIS_DATASTREAM);
+      }
       isStarted = true;
     }
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index b499755..a2b82e5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -99,6 +99,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse;
+import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
@@ -221,6 +222,8 @@ public class KeyValueHandler extends Handler {
       return handler.handleDeleteChunk(request, kvContainer);
     case WriteChunk:
       return handler.handleWriteChunk(request, kvContainer, dispatcherContext);
+    case StreamInit:
+      return handler.handleStreamInit(request, kvContainer, dispatcherContext);
     case ListChunk:
       return handler.handleUnsupportedOp(request);
     case CompactChunk:
@@ -247,6 +250,36 @@ public class KeyValueHandler extends Handler {
     return this.blockManager;
   }
 
+  ContainerCommandResponseProto handleStreamInit(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer,
+      DispatcherContext dispatcherContext) {
+    if (!request.hasWriteChunk()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Malformed Write Chunk request. trace ID: {}",
+            request.getTraceID());
+      }
+      return malformedRequest(request);
+    }
+
+    String path = null;
+    try {
+      checkContainerOpen(kvContainer);
+
+      WriteChunkRequestProto writeChunk = request.getWriteChunk();
+      BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
+
+      path = chunkManager
+          .streamInit(kvContainer, blockID);
+
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    }
+
+    return getSuccessResponseBuilder(request)
+        .setMessage(path)
+        .build();
+  }
+
   /**
    * Handles Create Container Request. If successful, adds the container to
    * ContainerSet and sends an ICR to the SCM.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 27fe0d9..92d2606 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -74,6 +74,12 @@ public class ChunkManagerDispatcher implements ChunkManager {
         .writeChunk(container, blockID, info, data, dispatcherContext);
   }
 
+  public String streamInit(Container container, BlockID blockID)
+      throws StorageContainerException {
+    return selectHandler(container)
+        .streamInit(container, blockID);
+  }
+
   @Override
   public void finishWriteChunks(KeyValueContainer kvContainer,
       BlockData blockData) throws IOException {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 5fd23b5..8b13c93 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -90,6 +90,14 @@ public class FilePerBlockStrategy implements ChunkManager {
   }
 
   @Override
+  public String streamInit(Container container, BlockID blockID)
+      throws StorageContainerException {
+    checkLayoutVersion(container);
+    File chunkFile = getChunkFile(container, blockID, null);
+    return chunkFile.getAbsolutePath();
+  }
+
+  @Override
   public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
       ChunkBuffer data, DispatcherContext dispatcherContext)
       throws StorageContainerException {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 15ff9d6..ba06eeb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -104,6 +104,11 @@ public interface ChunkManager {
     // no-op
   }
 
+  default String streamInit(Container container, BlockID blockID)
+      throws StorageContainerException {
+    return null;
+  }
+
   static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo,
       long defaultReadBufferCapacity) {
     long bufferCapacity = 0;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 82c9e6e..00db161 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -81,6 +81,8 @@ public class TestDatanodeStateMachine {
         TimeUnit.MILLISECONDS);
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
+    conf.setBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
     serverAddresses = new ArrayList<>();
     scmServers = new ArrayList<>();
     mockServers = new ArrayList<>();
@@ -215,7 +217,6 @@ public class TestDatanodeStateMachine {
         OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
     datanodeDetails.setPort(port);
     ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
-
     try (DatanodeStateMachine stateMachine =
              new DatanodeStateMachine(datanodeDetails, conf, null, null,
                  null)) {
@@ -429,6 +430,8 @@ public class TestDatanodeStateMachine {
         DatanodeDetails.Port.Name.RATIS, 0);
     DatanodeDetails.Port restPort = DatanodeDetails.newPort(
         DatanodeDetails.Port.Name.REST, 0);
+    DatanodeDetails.Port streamPort = DatanodeDetails.newPort(
+        DatanodeDetails.Port.Name.RATIS_DATASTREAM, 0);
     return DatanodeDetails.newBuilder()
         .setUuid(UUID.randomUUID())
         .setHostName("localhost")
@@ -436,6 +439,7 @@ public class TestDatanodeStateMachine {
         .addPort(containerPort)
         .addPort(ratisPort)
         .addPort(restPort)
+        .addPort(streamPort)
         .build();
   }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
index d23f1c4..ce62640 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
@@ -38,6 +38,7 @@ import org.apache.ratis.client.api.GroupManagementApi;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.retry.RetryPolicy;
 import org.junit.Before;
@@ -98,6 +99,8 @@ public class TestCreatePipelineCommandHandler {
         .thenReturn(builder);
     Mockito.when(builder.setRetryPolicy(Mockito.any(RetryPolicy.class)))
         .thenReturn(builder);
+    Mockito.when(builder.setPrimaryDataStreamServer(
+        Mockito.any(RaftPeer.class))).thenReturn(builder);
     return builder;
   }
 
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index 25ed477..205d92e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import java.time.Duration;
 
 import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATASTREAM;
 import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS;
@@ -123,6 +124,40 @@ public class DatanodeRatisServerConfig {
     this.leaderNumPendingRequests = leaderNumPendingRequests;
   }
 
+  @Config(key = "datastream.request.threads",
+      defaultValue = "20",
+      type = ConfigType.INT,
+      tags = {OZONE, DATANODE, RATIS, DATASTREAM},
+      description = "Maximum number of threads in the thread pool for " +
+          "datastream request."
+  )
+  private int streamRequestThreads;
+
+  public int getStreamRequestThreads() {
+    return streamRequestThreads;
+  }
+
+  public void setStreamRequestThreads(int streamRequestThreads) {
+    this.streamRequestThreads = streamRequestThreads;
+  }
+
+  @Config(key = "datastream.write.threads",
+      defaultValue = "20",
+      type = ConfigType.INT,
+      tags = {OZONE, DATANODE, RATIS, DATASTREAM},
+      description = "Maximum number of threads in the thread pool for " +
+          "datastream write."
+  )
+  private int streamWriteThreads;
+
+  public int getStreamWriteThreads() {
+    return streamWriteThreads;
+  }
+
+  public void setStreamWriteThreads(int streamWriteThreads) {
+    this.streamWriteThreads = streamWriteThreads;
+  }
+
   @Config(key = "delete.ratis.log.directory",
           defaultValue = "true",
           type = ConfigType.BOOLEAN,
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 9f57e14..90ecbfc 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -100,6 +100,8 @@ enum Type {
   GetSmallFile = 16;
   CloseContainer = 17;
   GetCommittedBlockLength = 18;
+
+  StreamInit = 19;
 }
 
 
@@ -392,7 +394,7 @@ enum ChecksumType {
 
 message  WriteChunkRequestProto  {
   required DatanodeBlockID blockID = 1;
-  required ChunkInfo chunkData = 2;
+  optional ChunkInfo chunkData = 2;
   optional bytes data = 3;
 }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index dc11ece..92fefc7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -145,6 +145,8 @@ public class TestEndPoint {
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, 1000)) {
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
+      conf.setBoolean(
+          OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
       OzoneContainer ozoneContainer = new OzoneContainer(
           datanodeDetails, conf, getContext(datanodeDetails), null);
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
@@ -169,6 +171,8 @@ public class TestEndPoint {
         true);
     conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
         true);
+    conf.setBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
     conf.setFromObject(new ReplicationConfig().setPort(0));
     try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
         serverAddress, 1000)) {
diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
index 3d33020..040b515 100644
--- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
+++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
@@ -18,7 +18,7 @@
   <configuration default="false" name="Datanode2" type="Application" factoryName="Application">
     <option name="MAIN_CLASS_NAME" value="org.apache.hadoop.ozone.HddsDatanodeService" />
     <module name="ozone-datanode" />
-    <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode2 --set hdds.datanode.dir=/tmp/datanode2/storage --set hdds.datanode.http-address=127.0.0.1:10021 --set dfs.container.ratis.ipc=10022 --set dfs.container.ipc=10023 --set dfs.container.ratis.server.port=10024 --set dfs.container.ratis.admin.port=10025 --set hdds.datanode.replication.port=10026" />
+    <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode2 --set hdds.datanode.dir=/tmp/datanode2/storage --set hdds.datanode.http-address=127.0.0.1:10021 --set dfs.container.ratis.ipc=10022 --set dfs.container.ipc=10023 --set dfs.container.ratis.server.port=10024 --set dfs.container.ratis.admin.port=10025 --set hdds.datanode.replication.port=10026 --set dfs.container.ratis.datastream.port=10027" />
     <option name="VM_PARAMETERS" value="-Dlog4j.configuration=file:hadoop-ozone/dev-support/intellij/log4j.properties" />
     <extension name="coverage">
       <pattern>
diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
index 10b6682..6a3116e 100644
--- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
+++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
@@ -18,7 +18,7 @@
   <configuration default="false" name="Datanode3" type="Application" factoryName="Application">
     <option name="MAIN_CLASS_NAME" value="org.apache.hadoop.ozone.HddsDatanodeService" />
     <module name="ozone-datanode" />
-    <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode3 --set hdds.datanode.dir=/tmp/datanode3/storage --set hdds.datanode.http-address=127.0.0.1:10031 --set dfs.container.ratis.ipc=10032 --set dfs.container.ipc=10033 --set dfs.container.ratis.server.port=10034 --set dfs.container.ratis.admin.port=10035 --set hdds.datanode.replication.port=10036" />
+    <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode3 --set hdds.datanode.dir=/tmp/datanode3/storage --set hdds.datanode.http-address=127.0.0.1:10031 --set dfs.container.ratis.ipc=10032 --set dfs.container.ipc=10033 --set dfs.container.ratis.server.port=10034 --set dfs.container.ratis.admin.port=10035 --set hdds.datanode.replication.port=10036 --set dfs.container.ratis.datastream.port=10037" />
     <option name="VM_PARAMETERS" value="-Dlog4j.configuration=file:hadoop-ozone/dev-support/intellij/log4j.properties" />
     <extension name="coverage">
       <pattern>
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 7e11b3c..9fbba8c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -315,6 +315,7 @@ public interface MiniOzoneCluster {
     protected Optional<String> omId = Optional.empty();
     
     protected Boolean randomContainerPort = true;
+    protected Boolean randomContainerStreamPort = true;
     protected Optional<String> datanodeReservedSpace = Optional.empty();
     protected Optional<Integer> chunkSize = Optional.empty();
     protected OptionalInt streamBufferSize = OptionalInt.empty();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index d9b5b3c..cff7c35 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -86,6 +86,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_INIT_D
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ADMIN_PORT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_PORT;
@@ -915,6 +916,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
           randomContainerPort);
       conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
           randomContainerPort);
+      conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT,
+          randomContainerStreamPort);
 
       conf.setFromObject(new ReplicationConfig().setPort(0));
     }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index e7436e0..5fc8ead 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -218,6 +218,8 @@ public class TestMiniOzoneCluster {
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
         true);
+    ozoneConf.setBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
     List<DatanodeStateMachine> stateMachines = new ArrayList<>();
     try {
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index f8f2d0a..1a5af25 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -216,6 +216,8 @@ public class TestSecureContainerServer {
       DatanodeDetails dn, OzoneConfiguration conf) throws IOException {
     conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
         dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
+    conf.setBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
     final String dir = TEST_DIR + dn.getUuid();
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
     final ContainerDispatcher dispatcher = createDispatcher(dn,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 07/13: HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBufferStreamOutput (#2603)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit d1cb3f9e97c3315ed435314721de02e23fba2da0
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Wed Sep 8 09:25:17 2021 +0800

    HDDS-5705. [Ozone-Streaming] Change ByteBufStreamOutput to ByteBufferStreamOutput (#2603)
---
 hadoop-hdds/client/pom.xml                         |  4 --
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 26 +++++----
 ...reamOutput.java => ByteBufferStreamOutput.java} | 15 +++--
 .../client/io/BlockDataStreamOutputEntry.java      | 51 ++++++++---------
 .../ozone/client/io/KeyDataStreamOutput.java       | 15 +++--
 .../ozone/client/io/OzoneDataStreamOutput.java     | 30 +++++-----
 .../client/rpc/TestBlockDataStreamOutput.java      | 66 +++++++++-------------
 .../hadoop/ozone/shell/keys/PutKeyHandler.java     |  8 +--
 8 files changed, 98 insertions(+), 117 deletions(-)

diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 482c067..4e75e42 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -66,10 +66,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <version>${spotbugs.version}</version>
       <scope>provided</scope>
     </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-buffer</artifactId>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 39ec2f9..d0419fa 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.storage;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -46,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
 
 /**
- * An {@link ByteBufStreamOutput} used by the REST service in combination
+ * An {@link ByteBufferStreamOutput} used by the REST service in combination
  * with the SCMClient to write the value of a key to a sequence
  * of container chunks.  Writes are buffered locally and periodically written to
  * the container as a new chunk.  In order to preserve the semantics that
@@ -74,7 +74,7 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock
  * This class encapsulates all state management for buffering and writing
  * through to the container.
  */
-public class BlockDataStreamOutput implements ByteBufStreamOutput {
+public class BlockDataStreamOutput implements ByteBufferStreamOutput {
   public static final Logger LOG =
       LoggerFactory.getLogger(BlockDataStreamOutput.class);
   public static final String EXCEPTION_MSG =
@@ -209,16 +209,16 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
   }
 
   @Override
-  public void write(ByteBuf buf) throws IOException {
+  public void write(ByteBuffer b, int off, int len) throws IOException {
     checkOpen();
-    if (buf == null) {
+    if (b == null) {
       throw new NullPointerException();
     }
-    final int len = buf.readableBytes();
     if (len == 0) {
       return;
     }
-    writeChunkToContainer(buf);
+    writeChunkToContainer(
+            (ByteBuffer) b.asReadOnlyBuffer().position(off).limit(off + len));
 
     writtenDataLength += len;
   }
@@ -476,15 +476,17 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    * Writes buffered data as a new chunk to the container and saves chunk
    * information to be used later in putKey call.
    *
+   * @param buf chunk data to write, from position to limit
    * @throws IOException if there is an I/O error while performing the call
    * @throws OzoneChecksumException if there is an error while computing
    * checksum
    */
-  private void writeChunkToContainer(ByteBuf buf)
+  private void writeChunkToContainer(ByteBuffer buf)
       throws IOException {
-    ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer());
-    int effectiveChunkSize = buf.readableBytes();
+    final int effectiveChunkSize = buf.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
+    ChecksumData checksumData =
+        checksum.computeChecksum(buf.asReadOnlyBuffer());
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
         .setOffset(offset)
@@ -499,8 +501,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
     CompletableFuture<DataStreamReply> future =
         (needSync(offset + effectiveChunkSize) ?
-            out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) :
-            out.writeAsync(buf.nioBuffer()))
+            out.writeAsync(buf, StandardWriteOption.SYNC) :
+            out.writeAsync(buf))
             .whenCompleteAsync((r, e) -> {
               if (e != null || !r.isSuccess()) {
                 if (e == null) {
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
similarity index 82%
rename from hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
index 7f40737..0650a68 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufferStreamOutput.java
@@ -18,23 +18,24 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import io.netty.buffer.ByteBuf;
-
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
 * This interface is for writing an output stream of ByteBuffers.
-* An ByteBufStreamOutput accepts Netty ByteBuf and sends them to some sink.
+* An ByteBufferStreamOutput accepts nio ByteBuffer and sends them to some sink.
 */
-public interface ByteBufStreamOutput extends Closeable {
+public interface ByteBufferStreamOutput extends Closeable {
   /**
    * Try to write all the bytes in ByteBuf b to DataStream.
    *
    * @param b the data.
    * @exception IOException if an I/O error occurs.
    */
-  void write(ByteBuf b) throws IOException;
+  default void write(ByteBuffer b) throws IOException {
+    write(b, b.position(), b.remaining());
+  }
 
   /**
    * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
@@ -44,9 +45,7 @@ public interface ByteBufStreamOutput extends Closeable {
    * @param len the number of bytes to write.
    * @exception  IOException  if an I/O error occurs.
    */
-  default void write(ByteBuf b, int off, int len) throws IOException {
-    write(b.slice(off, len));
-  }
+  void write(ByteBuffer b, int off, int len) throws IOException;
 
   /**
    * Flushes this DataStream output and forces any buffered output bytes
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
index 98907bf..f0c3a43 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -18,18 +18,18 @@
 package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.buffer.ByteBuf;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -37,10 +37,10 @@ import java.util.Collections;
  * Helper class used inside {@link BlockDataStreamOutput}.
  * */
 public final class BlockDataStreamOutputEntry
-    implements ByteBufStreamOutput {
+    implements ByteBufferStreamOutput {
 
   private final OzoneClientConfig config;
-  private ByteBufStreamOutput byteBufStreamOutput;
+  private ByteBufferStreamOutput byteBufferStreamOutput;
   private BlockID blockID;
   private final String key;
   private final XceiverClientFactory xceiverClientManager;
@@ -61,7 +61,7 @@ public final class BlockDataStreamOutputEntry
       OzoneClientConfig config
   ) {
     this.config = config;
-    this.byteBufStreamOutput = null;
+    this.byteBufferStreamOutput = null;
     this.blockID = blockID;
     this.key = key;
     this.xceiverClientManager = xceiverClientManager;
@@ -90,63 +90,62 @@ public final class BlockDataStreamOutputEntry
    * @throws IOException if xceiverClient initialization fails
    */
   private void checkStream() throws IOException {
-    if (this.byteBufStreamOutput == null) {
-      this.byteBufStreamOutput =
+    if (this.byteBufferStreamOutput == null) {
+      this.byteBufferStreamOutput =
           new BlockDataStreamOutput(blockID, xceiverClientManager,
               pipeline, config, token);
     }
   }
 
   @Override
-  public void write(ByteBuf b) throws IOException {
+  public void write(ByteBuffer b, int off, int len) throws IOException {
     checkStream();
-    final int len = b.readableBytes();
-    byteBufStreamOutput.write(b);
+    byteBufferStreamOutput.write(b, off, len);
     this.currentPosition += len;
   }
 
   @Override
   public void flush() throws IOException {
-    if (this.byteBufStreamOutput != null) {
-      this.byteBufStreamOutput.flush();
+    if (this.byteBufferStreamOutput != null) {
+      this.byteBufferStreamOutput.flush();
     }
   }
 
   @Override
   public void close() throws IOException {
-    if (this.byteBufStreamOutput != null) {
-      this.byteBufStreamOutput.close();
+    if (this.byteBufferStreamOutput != null) {
+      this.byteBufferStreamOutput.close();
       // after closing the chunkOutPutStream, blockId would have been
       // reconstructed with updated bcsId
       this.blockID =
-          ((BlockDataStreamOutput) byteBufStreamOutput).getBlockID();
+          ((BlockDataStreamOutput) byteBufferStreamOutput).getBlockID();
     }
   }
 
   boolean isClosed() {
-    if (byteBufStreamOutput != null) {
-      return  ((BlockDataStreamOutput) byteBufStreamOutput).isClosed();
+    if (byteBufferStreamOutput != null) {
+      return  ((BlockDataStreamOutput) byteBufferStreamOutput).isClosed();
     }
     return false;
   }
 
   Collection<DatanodeDetails> getFailedServers() {
-    if (byteBufStreamOutput != null) {
+    if (byteBufferStreamOutput != null) {
       BlockDataStreamOutput out =
-          (BlockDataStreamOutput) this.byteBufStreamOutput;
+          (BlockDataStreamOutput) this.byteBufferStreamOutput;
       return out.getFailedServers();
     }
     return Collections.emptyList();
   }
 
   long getWrittenDataLength() {
-    if (byteBufStreamOutput != null) {
+    if (byteBufferStreamOutput != null) {
       BlockDataStreamOutput out =
-          (BlockDataStreamOutput) this.byteBufStreamOutput;
+          (BlockDataStreamOutput) this.byteBufferStreamOutput;
       return out.getWrittenDataLength();
     } else {
       // For a pre allocated block for which no write has been initiated,
-      // the ByteBufStreamOutput will be null here.
+      // the ByteBufferStreamOutput will be null here.
       // In such cases, the default blockCommitSequenceId will be 0
       return 0;
     }
@@ -155,7 +154,7 @@ public final class BlockDataStreamOutputEntry
   void cleanup(boolean invalidateClient) throws IOException {
     checkStream();
     BlockDataStreamOutput out =
-        (BlockDataStreamOutput) this.byteBufStreamOutput;
+        (BlockDataStreamOutput) this.byteBufferStreamOutput;
     out.cleanup(invalidateClient);
 
   }
@@ -163,7 +162,7 @@ public final class BlockDataStreamOutputEntry
   void writeOnRetry(long len) throws IOException {
     checkStream();
     BlockDataStreamOutput out =
-        (BlockDataStreamOutput) this.byteBufStreamOutput;
+        (BlockDataStreamOutput) this.byteBufferStreamOutput;
     out.writeOnRetry(len);
     this.currentPosition += len;
 
@@ -231,8 +230,8 @@ public final class BlockDataStreamOutputEntry
   }
 
   @VisibleForTesting
-  public ByteBufStreamOutput getByteBufStreamOutput() {
-    return byteBufStreamOutput;
+  public ByteBufferStreamOutput getByteBufStreamOutput() {
+    return byteBufferStreamOutput;
   }
 
   public BlockID getBlockID() {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index c37f9cd..9bba89d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -32,7 +31,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -48,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -63,7 +63,7 @@ import java.util.stream.Collectors;
  *
  * TODO : currently not support multi-thread access.
  */
-public class KeyDataStreamOutput implements ByteBufStreamOutput {
+public class KeyDataStreamOutput implements ByteBufferStreamOutput {
 
   private OzoneClientConfig config;
 
@@ -185,17 +185,16 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
   }
 
   @Override
-  public void write(ByteBuf b) throws IOException {
+  public void write(ByteBuffer b, int off, int len) throws IOException {
     checkNotClosed();
     if (b == null) {
       throw new NullPointerException();
     }
-    final int len = b.readableBytes();
-    handleWrite(b, b.readerIndex(), len, false);
+    handleWrite(b, off, len, false);
     writeOffset += len;
   }
 
-  private void handleWrite(ByteBuf b, int off, long len, boolean retry)
+  private void handleWrite(ByteBuffer b, int off, long len, boolean retry)
       throws IOException {
     while (len > 0) {
       try {
@@ -227,7 +226,7 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
   }
 
   private int writeToDataStreamOutput(BlockDataStreamOutputEntry current,
-      boolean retry, long len, ByteBuf b, int writeLen, int off,
+      boolean retry, long len, ByteBuffer b, int writeLen, int off,
       long currentPos) throws IOException {
     try {
       if (retry) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index 378b868..d40ac2b 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -16,55 +16,55 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
-import io.netty.buffer.ByteBuf;
-import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
  * OzoneDataStreamOutput is used to write data into Ozone.
  * It uses SCM's {@link KeyDataStreamOutput} for writing the data.
  */
-public class OzoneDataStreamOutput implements ByteBufStreamOutput {
+public class OzoneDataStreamOutput implements ByteBufferStreamOutput {
 
-  private final ByteBufStreamOutput byteBufStreamOutput;
+  private final ByteBufferStreamOutput byteBufferStreamOutput;
 
   /**
    * Constructs OzoneDataStreamOutput with KeyDataStreamOutput.
    *
-   * @param byteBufStreamOutput
+   * @param byteBufferStreamOutput the underlying ByteBufferStreamOutput
    */
-  public OzoneDataStreamOutput(ByteBufStreamOutput byteBufStreamOutput) {
-    this.byteBufStreamOutput = byteBufStreamOutput;
+  public OzoneDataStreamOutput(ByteBufferStreamOutput byteBufferStreamOutput) {
+    this.byteBufferStreamOutput = byteBufferStreamOutput;
   }
 
   @Override
-  public void write(ByteBuf b) throws IOException {
-    byteBufStreamOutput.write(b);
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    byteBufferStreamOutput.write(b, off, len);
   }
 
   @Override
   public synchronized void flush() throws IOException {
-    byteBufStreamOutput.flush();
+    byteBufferStreamOutput.flush();
   }
 
   @Override
   public synchronized void close() throws IOException {
     //commitKey can be done here, if needed.
-    byteBufStreamOutput.close();
+    byteBufferStreamOutput.close();
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    if (byteBufStreamOutput instanceof KeyDataStreamOutput) {
+    if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
       return ((KeyDataStreamOutput)
-              byteBufStreamOutput).getCommitUploadPartInfo();
+              byteBufferStreamOutput).getCommitUploadPartInfo();
     }
     // Otherwise return null.
     return null;
   }
 
-  public ByteBufStreamOutput getByteBufStreamOutput() {
-    return byteBufStreamOutput;
+  public ByteBufferStreamOutput getByteBufStreamOutput() {
+    return byteBufferStreamOutput;
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 4d52d89..6d5401d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -17,7 +17,6 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
-import io.netty.buffer.Unpooled;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -38,6 +37,7 @@ import org.junit.Test;
 import org.junit.rules.Timeout;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -127,47 +127,37 @@ public class TestBlockDataStreamOutput {
   }
 
   @Test
+  public void testHalfChunkWrite() throws Exception {
+    testWrite(chunkSize / 2);
+  }
+
+  @Test
+  public void testSingleChunkWrite() throws Exception {
+    testWrite(chunkSize);
+  }
+
+  @Test
   public void testMultiChunkWrite() throws Exception {
-    // write data less than 1 chunk size use streaming.
-    String keyName1 = getKeyName();
-    OzoneDataStreamOutput key1 = createKey(
-        keyName1, ReplicationType.RATIS, 0);
-    int dataLength1 = chunkSize/2;
-    byte[] data1 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength1)
-            .getBytes(UTF_8);
-    key1.write(Unpooled.copiedBuffer(data1));
-    // now close the stream, It will update the key length.
-    key1.close();
-    validateData(keyName1, data1);
-
-    // write data more than 1 chunk size use streaming.
-    String keyName2 = getKeyName();
-    OzoneDataStreamOutput key2 = createKey(
-        keyName2, ReplicationType.RATIS, 0);
-    int dataLength2 = chunkSize + 50;
-    byte[] data2 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength2)
-            .getBytes(UTF_8);
-    key2.write(Unpooled.copiedBuffer(data2));
-    // now close the stream, It will update the key length.
-    key2.close();
-    validateData(keyName2, data2);
-
-    // write data more than 1 block size use streaming.
-    String keyName3 = getKeyName();
-    OzoneDataStreamOutput key3 = createKey(
-        keyName3, ReplicationType.RATIS, 0);
-    int dataLength3 = blockSize + 50;
-    byte[] data3 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength3)
+    testWrite(chunkSize + 50);
+  }
+
+  @Test
+  public void testMultiBlockWrite() throws Exception {
+    testWrite(blockSize + 50);
+  }
+
+  private void testWrite(int dataLength) throws Exception {
+    String keyName = getKeyName();
+    OzoneDataStreamOutput key = createKey(
+        keyName, ReplicationType.RATIS, 0);
+    byte[] data =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
-    key3.write(Unpooled.copiedBuffer(data3));
+    key.write(ByteBuffer.wrap(data));
     // now close the stream, It will update the key length.
-    key3.close();
-    validateData(keyName3, data3);
+    key.close();
+    validateData(keyName, data);
   }
-
   private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
       long size) throws Exception {
     return TestHelper.createStreamKey(
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index 56bc834..af6a461 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -29,8 +29,6 @@ import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Map;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfigValidator;
@@ -141,10 +139,8 @@ public class PutKeyHandler extends KeyHandler {
         long off = 0;
         while (len > 0) {
           long writeLen = Math.min(len, chunkSize);
-          ByteBuffer segment =
-              ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
-          ByteBuf buf = Unpooled.wrappedBuffer(segment);
-          out.write(buf);
+          ByteBuffer bb = ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
+          out.write(bb);
           off += writeLen;
           len -= writeLen;
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 05/13: HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStream to support streaming api (#2495)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 6c36e82308c18c631de22bfd4e0632c35896240a
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Thu Aug 12 18:09:38 2021 +0800

    HDDS-5488. [Ozone-Streaming] Add a new BlockOutputStream/KeyOutputStream to support streaming api (#2495)
---
 hadoop-hdds/client/pom.xml                         |   4 +
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |   5 +
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 760 +++++++++++++++++++++
 .../hdds/scm/storage/ByteBufStreamOutput.java      |  58 ++
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  19 +
 .../client/io/BlockDataStreamOutputEntry.java      | 294 ++++++++
 .../client/io/BlockDataStreamOutputEntryPool.java  | 324 +++++++++
 .../ozone/client/io/KeyDataStreamOutput.java       | 629 +++++++++++++++++
 .../ozone/client/io/OzoneDataStreamOutput.java     |  70 ++
 .../ozone/client/protocol/ClientProtocol.java      |  15 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  62 ++
 .../client/rpc/TestBlockDataStreamOutput.java      | 181 +++++
 .../apache/hadoop/ozone/container/TestHelper.java  |  24 +-
 .../hadoop/ozone/shell/keys/PutKeyHandler.java     |  40 +-
 14 files changed, 2479 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 4e75e42..482c067 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -66,6 +66,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <version>${spotbugs.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 6982d41..f37cd1c 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdds.tracing.TracingUtil;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.api.DataStreamApi;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
@@ -359,4 +360,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     throw new UnsupportedOperationException(
             "Operation Not supported for ratis client");
   }
+
+  public DataStreamApi getDataStreamApi() {
+    return this.getClient().getDataStreamApi();
+  }
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
new file mode 100644
index 0000000..f658df1
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
+
+/**
+ * An {@link ByteBufStreamOutput} used by the REST service in combination
+ * with the SCMClient to write the value of a key to a sequence
+ * of container chunks.  Writes are buffered locally and periodically written to
+ * the container as a new chunk.  In order to preserve the semantics that
+ * replacement of a pre-existing key is atomic, each instance of the stream has
+ * an internal unique identifier.  This unique identifier and a monotonically
+ * increasing chunk index form a composite key that is used as the chunk name.
+ * After all data is written, a putKey call creates or updates the corresponding
+ * container key, and this call includes the full list of chunks that make up
+ * the key data.  The list of chunks is updated all at once.  Therefore, a
+ * concurrent reader never can see an intermediate state in which different
+ * chunks of data from different versions of the key data are interleaved.
+ * This class encapsulates all state management for buffering and writing
+ * through to the container.
+ */
+public class BlockDataStreamOutput implements ByteBufStreamOutput {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockDataStreamOutput.class);
+  public static final String EXCEPTION_MSG =
+      "Unexpected Storage Container Exception: ";
+  private static final CompletableFuture[] EMPTY_FUTURE_ARRAY = {};
+
+  private AtomicReference<BlockID> blockID;
+
+  private final BlockData.Builder containerBlockData;
+  private XceiverClientFactory xceiverClientFactory;
+  private XceiverClientRatis xceiverClient;
+  private OzoneClientConfig config;
+
+  private int chunkIndex;
+  private final AtomicLong chunkOffset = new AtomicLong();
+  private final BufferPool bufferPool;
+  // The IOException will be set by response handling thread in case there is an
+  // exception received in the response. If the exception is set, the next
+  // request will fail upfront.
+  private final AtomicReference<IOException> ioException;
+  private final ExecutorService responseExecutor;
+
+  // the effective length of data flushed so far
+  private long totalDataFlushedLength;
+
+  // effective data write attempted so far for the block
+  private long writtenDataLength;
+
+  // List containing buffers for which the putBlock call will
+  // update the length in the datanodes. This list will just maintain
+  // references to the buffers in the BufferPool which will be cleared
+  // when the watchForCommit acknowledges a putBlock logIndex has been
+  // committed on all datanodes. This list will be a  place holder for buffers
+  // which got written between successive putBlock calls.
+  private List<ChunkBuffer> bufferList;
+
+  // This object will maintain the commitIndexes and byteBufferList in order
+  // Also, corresponding to the logIndex, the corresponding list of buffers will
+  // be released from the buffer pool.
+  private final CommitWatcher commitWatcher;
+
+  private final List<DatanodeDetails> failedServers;
+  private final Checksum checksum;
+
+  //number of buffers used before doing a flush/putBlock.
+  private int flushPeriod;
+  //bytes remaining to write in the current buffer.
+  private int currentBufferRemaining;
+  //current buffer allocated to write
+  private ChunkBuffer currentBuffer;
+  private final Token<? extends TokenIdentifier> token;
+  private final DataStreamOutput out;
+  private CompletableFuture<DataStreamReply> dataStreamCloseReply;
+  private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+  private final long syncSize = 0; // TODO: disk sync is disabled for now
+  private long syncPosition = 0;
+
+  /**
+   * Creates a new BlockDataStreamOutput.
+   *
+   * @param blockID              block ID
+   * @param xceiverClientManager client manager that controls client
+   * @param pipeline             pipeline where block will be written
+   * @param bufferPool           pool of buffers
+   */
+  public BlockDataStreamOutput(
+      BlockID blockID,
+      XceiverClientFactory xceiverClientManager,
+      Pipeline pipeline,
+      BufferPool bufferPool,
+      OzoneClientConfig config,
+      Token<? extends TokenIdentifier> token
+  ) throws IOException {
+    this.xceiverClientFactory = xceiverClientManager;
+    this.config = config;
+    this.blockID = new AtomicReference<>(blockID);
+    KeyValue keyValue =
+        KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
+    this.containerBlockData =
+        BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
+            .addMetadata(keyValue);
+    this.xceiverClient =
+        (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
+    // Alternatively, stream setup can be delayed till the first chunk write.
+    this.out = setupStream();
+    this.bufferPool = bufferPool;
+    this.token = token;
+
+    //number of buffers used before doing a flush
+    refreshCurrentBuffer(bufferPool);
+    flushPeriod = (int) (config.getStreamBufferFlushSize() / config
+        .getStreamBufferSize());
+
+    Preconditions
+        .checkArgument(
+            (long) flushPeriod * config.getStreamBufferSize() == config
+                .getStreamBufferFlushSize());
+
+    // A single thread executor handle the responses of async requests
+    responseExecutor = Executors.newSingleThreadExecutor();
+    commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
+    bufferList = null;
+    totalDataFlushedLength = 0;
+    writtenDataLength = 0;
+    failedServers = new ArrayList<>(0);
+    ioException = new AtomicReference<>(null);
+    checksum = new Checksum(config.getChecksumType(),
+        config.getBytesPerChecksum());
+  }
+
+  private DataStreamOutput setupStream() throws IOException {
+    // Execute a dummy WriteChunk request to get the path of the target file,
+    // but does NOT write any data to it.
+    ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest =
+        ContainerProtos.WriteChunkRequestProto.newBuilder()
+            .setBlockID(blockID.get().getDatanodeBlockIDProtobuf());
+
+    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    ContainerProtos.ContainerCommandRequestProto.Builder builder =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(ContainerProtos.Type.StreamInit)
+            .setContainerID(blockID.get().getContainerID())
+            .setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
+
+    ContainerCommandRequestMessage message =
+        ContainerCommandRequestMessage.toMessage(builder.build(), null);
+
+    return Preconditions.checkNotNull(xceiverClient.getDataStreamApi())
+        .stream(message.getContent().asReadOnlyByteBuffer());
+  }
+
+  private void refreshCurrentBuffer(BufferPool pool) {
+    currentBuffer = pool.getCurrentBuffer();
+    currentBufferRemaining =
+        currentBuffer != null ? currentBuffer.remaining() : 0;
+  }
+
+  public BlockID getBlockID() {
+    return blockID.get();
+  }
+
+  public long getTotalAckDataLength() {
+    return commitWatcher.getTotalAckDataLength();
+  }
+
+  public long getWrittenDataLength() {
+    return writtenDataLength;
+  }
+
+  public List<DatanodeDetails> getFailedServers() {
+    return failedServers;
+  }
+
+  @VisibleForTesting
+  public XceiverClientRatis getXceiverClient() {
+    return xceiverClient;
+  }
+
+  @VisibleForTesting
+  public long getTotalDataFlushedLength() {
+    return totalDataFlushedLength;
+  }
+
+  @VisibleForTesting
+  public BufferPool getBufferPool() {
+    return bufferPool;
+  }
+
+  public IOException getIoException() {
+    return ioException.get();
+  }
+
+  @VisibleForTesting
+  public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
+    return commitWatcher.getCommitIndex2flushedDataMap();
+  }
+
+  @Override
+  public void write(ByteBuf b) throws IOException {
+    checkOpen();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    int off = b.readerIndex();
+    int len = b.readableBytes();
+
+    while (len > 0) {
+      allocateNewBufferIfNeeded();
+      final int writeLen = Math.min(currentBufferRemaining, len);
+      // TODO: avoid buffer copy here
+      currentBuffer.put(b.nioBuffer(off, writeLen));
+      currentBufferRemaining -= writeLen;
+      writeChunkIfNeeded();
+      off += writeLen;
+      len -= writeLen;
+      writtenDataLength += writeLen;
+      doFlushOrWatchIfNeeded();
+    }
+  }
+
+  private void writeChunkIfNeeded() throws IOException {
+    if (currentBufferRemaining == 0) {
+      writeChunk(currentBuffer);
+    }
+  }
+
+  private void doFlushOrWatchIfNeeded() throws IOException {
+    if (currentBufferRemaining == 0) {
+      if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
+        updateFlushLength();
+        executePutBlock(false, false);
+      }
+      // Data in the bufferPool can not exceed streamBufferMaxSize
+      if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
+        handleFullBuffer();
+      }
+    }
+  }
+
+  private void allocateNewBufferIfNeeded() {
+    if (currentBufferRemaining == 0) {
+      currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
+      currentBufferRemaining = currentBuffer.remaining();
+    }
+  }
+
+  private void updateFlushLength() {
+    totalDataFlushedLength = writtenDataLength;
+  }
+
+  private boolean isBufferPoolFull() {
+    return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
+  }
+
+  /**
+   * Will be called on the retryPath in case closedContainerException/
+   * TimeoutException.
+   * @param len length of data to write
+   * @throws IOException if error occurred
+   */
+
+  // In this case, the data is already cached in the currentBuffer.
+  public void writeOnRetry(long len) throws IOException {
+    if (len == 0) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Retrying write length {} for blockID {}", len, blockID);
+    }
+    Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
+    int count = 0;
+    while (len > 0) {
+      ChunkBuffer buffer = bufferPool.getBuffer(count);
+      long writeLen = Math.min(buffer.position(), len);
+      if (!buffer.hasRemaining()) {
+        writeChunk(buffer);
+      }
+      len -= writeLen;
+      count++;
+      writtenDataLength += writeLen;
+      // we should not call isBufferFull/shouldFlush here.
+      // The buffer might already be full as whole data is already cached in
+      // the buffer. We should just validate
+      // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
+      // call for handling full buffer/flush buffer condition.
+      if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
+        // reset the position to zero as now we will be reading the
+        // next buffer in the list
+        updateFlushLength();
+        executePutBlock(false, false);
+      }
+      if (writtenDataLength == config.getStreamBufferMaxSize()) {
+        handleFullBuffer();
+      }
+    }
+  }
+
+  /**
+   * This is a blocking call. It will wait for the flush till the commit index
+   * at the head of the commitIndex2flushedDataMap gets replicated to all or
+   * majority.
+   * @throws IOException
+   */
+  private void handleFullBuffer() throws IOException {
+    try {
+      checkOpen();
+      if (!commitWatcher.getFutureMap().isEmpty()) {
+        waitOnFlushFutures();
+      }
+    } catch (ExecutionException e) {
+      handleExecutionException(e);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      handleInterruptedException(ex, true);
+    }
+    watchForCommit(true);
+  }
+
+
+  // It may happen that once the exception is encountered , we still might
+  // have successfully flushed up to a certain index. Make sure the buffers
+  // only contain data which have not been sufficiently replicated
+  private void adjustBuffersOnException() {
+    commitWatcher.releaseBuffersOnException();
+    refreshCurrentBuffer(bufferPool);
+  }
+
+  /**
+   * calls watchForCommit API of the Ratis Client. For Standalone client,
+   * it is a no op.
+   * @param bufferFull flag indicating whether bufferFull condition is hit or
+   *              its called as part flush/close
+   * @return minimum commit index replicated to all nodes
+   * @throws IOException IOException in case watch gets timed out
+   */
+  private void watchForCommit(boolean bufferFull) throws IOException {
+    checkOpen();
+    try {
+      XceiverClientReply reply = bufferFull ?
+          commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
+      if (reply != null) {
+        List<DatanodeDetails> dnList = reply.getDatanodes();
+        if (!dnList.isEmpty()) {
+          Pipeline pipe = xceiverClient.getPipeline();
+
+          LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}",
+              blockID, pipe, dnList);
+          failedServers.addAll(dnList);
+        }
+      }
+    } catch (IOException ioe) {
+      setIoException(ioe);
+      throw getIoException();
+    }
+    refreshCurrentBuffer(bufferPool);
+
+  }
+
+  /**
+   * @param close whether putBlock is happening as part of closing the stream
+   * @param force true if no data was written since most recent putBlock and
+   *            stream is being closed
+   */
+  private CompletableFuture<ContainerProtos.
+      ContainerCommandResponseProto> executePutBlock(boolean close,
+      boolean force) throws IOException {
+    checkOpen();
+    long flushPos = totalDataFlushedLength;
+    final List<ChunkBuffer> byteBufferList;
+    if (!force) {
+      Preconditions.checkNotNull(bufferList);
+      byteBufferList = bufferList;
+      bufferList = null;
+      Preconditions.checkNotNull(byteBufferList);
+    } else {
+      byteBufferList = null;
+    }
+
+    try {
+      CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
+    } catch (Exception e) {
+      LOG.warn("Failed to write all chunks through stream: " + e);
+      throw new IOException(e);
+    }
+    if (close) {
+      dataStreamCloseReply = out.closeAsync();
+    }
+
+    CompletableFuture<ContainerProtos.
+        ContainerCommandResponseProto> flushFuture = null;
+    try {
+      BlockData blockData = containerBlockData.build();
+      XceiverClientReply asyncReply =
+          putBlockAsync(xceiverClient, blockData, close, token);
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+          asyncReply.getResponse();
+      flushFuture = future.thenApplyAsync(e -> {
+        try {
+          validateResponse(e);
+        } catch (IOException sce) {
+          throw new CompletionException(sce);
+        }
+        // if the ioException is not set, putBlock is successful
+        if (getIoException() == null && !force) {
+          BlockID responseBlockID = BlockID.getFromProtobuf(
+              e.getPutBlock().getCommittedBlockLength().getBlockID());
+          Preconditions.checkState(blockID.get().getContainerBlockID()
+              .equals(responseBlockID.getContainerBlockID()));
+          // updates the bcsId of the block
+          blockID.set(responseBlockID);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "Adding index " + asyncReply.getLogIndex() + " commitMap size "
+                    + commitWatcher.getCommitInfoMapSize() + " flushLength "
+                    + flushPos + " numBuffers " + byteBufferList.size()
+                    + " blockID " + blockID + " bufferPool size" + bufferPool
+                    .getSize() + " currentBufferIndex " + bufferPool
+                    .getCurrentBufferIndex());
+          }
+          // for standalone protocol, logIndex will always be 0.
+          commitWatcher
+              .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
+        }
+        return e;
+      }, responseExecutor).exceptionally(e -> {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("putBlock failed for blockID {} with exception {}",
+              blockID, e.getLocalizedMessage());
+        }
+        CompletionException ce = new CompletionException(e);
+        setIoException(ce);
+        throw ce;
+      });
+    } catch (IOException | ExecutionException e) {
+      throw new IOException(EXCEPTION_MSG + e.toString(), e);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      handleInterruptedException(ex, false);
+    }
+    commitWatcher.getFutureMap().put(flushPos, flushFuture);
+    return flushFuture;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (xceiverClientFactory != null && xceiverClient != null
+        && bufferPool != null && bufferPool.getSize() > 0
+        && (!config.isStreamBufferFlushDelay() ||
+            writtenDataLength - totalDataFlushedLength
+                >= config.getStreamBufferSize())) {
+      try {
+        handleFlush(false);
+      } catch (ExecutionException e) {
+        // just set the exception here as well in order to maintain sanctity of
+        // ioException field
+        handleExecutionException(e);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        handleInterruptedException(ex, true);
+      }
+    }
+  }
+
+  private void writeChunk(ChunkBuffer buffer)
+      throws IOException {
+    // This data in the buffer will be pushed to datanode and a reference will
+    // be added to the bufferList. Once putBlock gets executed, this list will
+    // be marked null. Hence, during first writeChunk call after every putBlock
+    // call or during the first call to writeChunk here, the list will be null.
+
+    if (bufferList == null) {
+      bufferList = new ArrayList<>();
+    }
+    bufferList.add(buffer);
+    writeChunkToContainer(buffer.duplicate(0, buffer.position()));
+  }
+
+  /**
+   * @param close whether the flush is happening as part of closing the stream
+   */
+  private void handleFlush(boolean close)
+      throws IOException, InterruptedException, ExecutionException {
+    checkOpen();
+    // flush the last chunk data residing on the currentBuffer
+    if (totalDataFlushedLength < writtenDataLength) {
+      refreshCurrentBuffer(bufferPool);
+      Preconditions.checkArgument(currentBuffer.position() > 0);
+      if (currentBuffer.hasRemaining()) {
+        writeChunk(currentBuffer);
+      }
+      // This can be a partially filled chunk. Since we are flushing the buffer
+      // here, we just limit this buffer to the current position. So that next
+      // write will happen in new buffer
+      updateFlushLength();
+      executePutBlock(close, false);
+    } else if (close) {
+      // forcing an "empty" putBlock if stream is being closed without new
+      // data since latest flush - we need to send the "EOF" flag
+      executePutBlock(true, true);
+    }
+    waitOnFlushFutures();
+    watchForCommit(false);
+    // just check again if the exception is hit while waiting for the
+    // futures to ensure flush has indeed succeeded
+
+    // irrespective of whether the commitIndex2flushedDataMap is empty
+    // or not, ensure there is no exception set
+    checkOpen();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (xceiverClientFactory != null && xceiverClient != null
+        && bufferPool != null && bufferPool.getSize() > 0) {
+      try {
+        handleFlush(true);
+        dataStreamCloseReply.get();
+      } catch (ExecutionException e) {
+        handleExecutionException(e);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        handleInterruptedException(ex, true);
+      } finally {
+        cleanup(false);
+      }
+      // TODO: Turn the below buffer empty check on when Standalone pipeline
+      // is removed in the write path in tests
+      // Preconditions.checkArgument(buffer.position() == 0);
+      // bufferPool.checkBufferPoolEmpty();
+
+    }
+  }
+
+  private void waitOnFlushFutures()
+      throws InterruptedException, ExecutionException {
+    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+        commitWatcher.getFutureMap().values().toArray(
+            new CompletableFuture[commitWatcher.getFutureMap().size()]));
+    // wait for all the transactions to complete
+    combinedFuture.get();
+  }
+
+  private void validateResponse(
+      ContainerProtos.ContainerCommandResponseProto responseProto)
+      throws IOException {
+    try {
+      // if the ioException is already set, it means a prev request has failed
+      // just throw the exception. The current operation will fail with the
+      // original error
+      IOException exception = getIoException();
+      if (exception != null) {
+        throw exception;
+      }
+      ContainerProtocolCalls.validateContainerResponse(responseProto);
+    } catch (StorageContainerException sce) {
+      setIoException(sce);
+      throw sce;
+    }
+  }
+
+
+  private void setIoException(Exception e) {
+    IOException ioe = getIoException();
+    if (ioe == null) {
+      IOException exception =  new IOException(EXCEPTION_MSG + e.toString(), e);
+      ioException.compareAndSet(null, exception);
+    } else {
+      LOG.debug("Previous request had already failed with " + ioe.toString()
+          + " so subsequent request also encounters"
+          + " Storage Container Exception ", e);
+    }
+  }
+
+  public void cleanup(boolean invalidateClient) {
+    if (xceiverClientFactory != null) {
+      xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
+    }
+    xceiverClientFactory = null;
+    xceiverClient = null;
+    commitWatcher.cleanup();
+    if (bufferList !=  null) {
+      bufferList.clear();
+    }
+    bufferList = null;
+    responseExecutor.shutdown();
+  }
+
+  /**
+   * Checks if the stream is open or exception has occurred.
+   * If not, throws an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  private void checkOpen() throws IOException {
+    if (isClosed()) {
+      throw new IOException("BlockDataStreamOutput has been closed.");
+    } else if (getIoException() != null) {
+      adjustBuffersOnException();
+      throw getIoException();
+    }
+  }
+
+  public boolean isClosed() {
+    return xceiverClient == null;
+  }
+
+  private boolean needSync(long position) {
+    if (syncSize > 0) {
+      // TODO: or position >= fileLength
+      if (position - syncPosition >= syncSize) {
+        syncPosition = position;
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Writes buffered data as a new chunk to the container and saves chunk
+   * information to be used later in putKey call.
+   *
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneChecksumException if there is an error while computing
+   * checksum
+   */
+  private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
+    int effectiveChunkSize = chunk.remaining();
+    final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
+    final ByteString data = chunk.toByteString(
+        bufferPool.byteStringConversion());
+    ChecksumData checksumData = checksum.computeChecksum(chunk);
+    ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+        .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
+        .setOffset(offset)
+        .setLen(effectiveChunkSize)
+        .setChecksumData(checksumData.getProtoBufMessage())
+        .build();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Writing chunk {} length {} at offset {}",
+          chunkInfo.getChunkName(), effectiveChunkSize, offset);
+    }
+
+    CompletableFuture<DataStreamReply> future =
+        (needSync(offset + effectiveChunkSize) ?
+        out.writeAsync(data.asReadOnlyByteBuffer(), StandardWriteOption.SYNC) :
+        out.writeAsync(data.asReadOnlyByteBuffer()))
+        .whenCompleteAsync((r, e) -> {
+          if (e != null || !r.isSuccess()) {
+            if (e == null) {
+              e = new IOException("result is not success");
+            }
+            String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
+                " " + "into block " + blockID;
+            LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+            CompletionException ce = new CompletionException(msg, e);
+            setIoException(ce);
+            throw ce;
+          }
+        }, responseExecutor);
+
+    futures.add(future);
+    containerBlockData.addChunks(chunkInfo);
+  }
+
+  @VisibleForTesting
+  public void setXceiverClient(XceiverClientRatis xceiverClient) {
+    this.xceiverClient = xceiverClient;
+  }
+
+  /**
+   * Handles InterruptedExecution.
+   *
+   * @param ex
+   * @param processExecutionException is optional, if passed as TRUE, then
+   * handle ExecutionException else skip it.
+   * @throws IOException
+   */
+  private void handleInterruptedException(Exception ex,
+      boolean processExecutionException)
+      throws IOException {
+    LOG.error("Command execution was interrupted.");
+    if(processExecutionException) {
+      handleExecutionException(ex);
+    } else {
+      throw new IOException(EXCEPTION_MSG + ex.toString(), ex);
+    }
+  }
+
+  /**
+   * Handles ExecutionException by adjusting buffers.
+   * @param ex
+   * @throws IOException
+   */
+  private void handleExecutionException(Exception ex) throws IOException {
+    setIoException(ex);
+    adjustBuffersOnException();
+    throw getIoException();
+  }
+}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
new file mode 100644
index 0000000..7f40737
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ByteBufStreamOutput.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+* This interface is for writing an output stream of ByteBuffers.
+* An ByteBufStreamOutput accepts Netty ByteBuf and sends them to some sink.
+*/
+public interface ByteBufStreamOutput extends Closeable {
+  /**
+   * Try to write all the bytes in ByteBuf b to DataStream.
+   *
+   * @param b the data.
+   * @exception IOException if an I/O error occurs.
+   */
+  void write(ByteBuf b) throws IOException;
+
+  /**
+   * Try to write the [off:off + len) slice in ByteBuf b to DataStream.
+   *
+   * @param b the data.
+   * @param off the start offset in the data.
+   * @param len the number of bytes to write.
+   * @exception  IOException  if an I/O error occurs.
+   */
+  default void write(ByteBuf b, int off, int len) throws IOException {
+    write(b.slice(off, len));
+  }
+
+  /**
+   * Flushes this DataStream output and forces any buffered output bytes
+   * to be written out.
+   *
+   * @exception  IOException  if an I/O error occurs.
+   */
+  void flush() throws IOException;
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 8a8cde9..61cbb69 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -529,6 +530,24 @@ public class OzoneBucket extends WithMetadata {
   }
 
   /**
+   * Creates a new key in the bucket.
+   *
+   * @param key               Name of the key to be created.
+   * @param size              Size of the data the key will point to.
+   * @param replicationConfig Replication configuration.
+   * @return OzoneDataStreamOutput to which the data has to be written.
+   * @throws IOException
+   */
+  public OzoneDataStreamOutput createStreamKey(String key, long size,
+      ReplicationConfig replicationConfig,
+      Map<String, String> keyMetadata)
+      throws IOException {
+    return proxy
+        .createStreamKey(volumeName, name, key, size, replicationConfig,
+            keyMetadata);
+  }
+
+  /**
    * Reads an existing key from the bucket.
    *
    * @param key Name of the key to be read.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
new file mode 100644
index 0000000..6954742
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Helper class used inside {@link BlockDataStreamOutput}.
+ * */
+public final class BlockDataStreamOutputEntry
+    implements ByteBufStreamOutput {
+
+  private final OzoneClientConfig config;
+  private ByteBufStreamOutput byteBufStreamOutput;
+  private BlockID blockID;
+  private final String key;
+  private final XceiverClientFactory xceiverClientManager;
+  private final Pipeline pipeline;
+  // total number of bytes that should be written to this stream
+  private final long length;
+  // the current position of this stream 0 <= currentPosition < length
+  private long currentPosition;
+  private final Token<OzoneBlockTokenIdentifier> token;
+
+  private BufferPool bufferPool;
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  private BlockDataStreamOutputEntry(
+      BlockID blockID, String key,
+      XceiverClientFactory xceiverClientManager,
+      Pipeline pipeline,
+      long length,
+      BufferPool bufferPool,
+      Token<OzoneBlockTokenIdentifier> token,
+      OzoneClientConfig config
+  ) {
+    this.config = config;
+    this.byteBufStreamOutput = null;
+    this.blockID = blockID;
+    this.key = key;
+    this.xceiverClientManager = xceiverClientManager;
+    this.pipeline = pipeline;
+    this.token = token;
+    this.length = length;
+    this.currentPosition = 0;
+    this.bufferPool = bufferPool;
+  }
+
+  long getLength() {
+    return length;
+  }
+
+  Token<OzoneBlockTokenIdentifier> getToken() {
+    return token;
+  }
+
+  long getRemaining() {
+    return length - currentPosition;
+  }
+
+  /**
+   * BlockDataStreamOutput is initialized in this function. This makes sure that
+   * xceiverClient initialization is not done during preallocation and only
+   * done when data is written.
+   * @throws IOException if xceiverClient initialization fails
+   */
+  private void checkStream() throws IOException {
+    if (this.byteBufStreamOutput == null) {
+      this.byteBufStreamOutput =
+          new BlockDataStreamOutput(blockID, xceiverClientManager,
+              pipeline, bufferPool, config, token);
+    }
+  }
+
+  @Override
+  public void write(ByteBuf b) throws IOException {
+    checkStream();
+    final int len = b.readableBytes();
+    byteBufStreamOutput.write(b);
+    this.currentPosition += len;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (this.byteBufStreamOutput != null) {
+      this.byteBufStreamOutput.flush();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.byteBufStreamOutput != null) {
+      this.byteBufStreamOutput.close();
+      // after closing the chunkOutPutStream, blockId would have been
+      // reconstructed with updated bcsId
+      this.blockID =
+          ((BlockDataStreamOutput) byteBufStreamOutput).getBlockID();
+    }
+  }
+
+  boolean isClosed() {
+    if (byteBufStreamOutput != null) {
+      return  ((BlockDataStreamOutput) byteBufStreamOutput).isClosed();
+    }
+    return false;
+  }
+
+  long getTotalAckDataLength() {
+    if (byteBufStreamOutput != null) {
+      BlockDataStreamOutput out =
+          (BlockDataStreamOutput) this.byteBufStreamOutput;
+      blockID = out.getBlockID();
+      return out.getTotalAckDataLength();
+    } else {
+      // For a pre allocated block for which no write has been initiated,
+      // the ByteBufStreamOutput will be null here.
+      // In such cases, the default blockCommitSequenceId will be 0
+      return 0;
+    }
+  }
+
+  Collection<DatanodeDetails> getFailedServers() {
+    if (byteBufStreamOutput != null) {
+      BlockDataStreamOutput out =
+          (BlockDataStreamOutput) this.byteBufStreamOutput;
+      return out.getFailedServers();
+    }
+    return Collections.emptyList();
+  }
+
+  long getWrittenDataLength() {
+    if (byteBufStreamOutput != null) {
+      BlockDataStreamOutput out =
+          (BlockDataStreamOutput) this.byteBufStreamOutput;
+      return out.getWrittenDataLength();
+    } else {
+      // For a pre allocated block for which no write has been initiated,
+      // the ByteBufStreamOutput will be null here.
+      // In such cases, the default blockCommitSequenceId will be 0
+      return 0;
+    }
+  }
+
+  void cleanup(boolean invalidateClient) throws IOException {
+    checkStream();
+    BlockDataStreamOutput out =
+        (BlockDataStreamOutput) this.byteBufStreamOutput;
+    out.cleanup(invalidateClient);
+
+  }
+
+  void writeOnRetry(long len) throws IOException {
+    checkStream();
+    BlockDataStreamOutput out =
+        (BlockDataStreamOutput) this.byteBufStreamOutput;
+    out.writeOnRetry(len);
+    this.currentPosition += len;
+
+  }
+
+  /**
+   * Builder class for BlockDataStreamOutputEntry.
+   * */
+  public static class Builder {
+
+    private BlockID blockID;
+    private String key;
+    private XceiverClientFactory xceiverClientManager;
+    private Pipeline pipeline;
+    private long length;
+    private BufferPool bufferPool;
+    private Token<OzoneBlockTokenIdentifier> token;
+    private OzoneClientConfig config;
+
+    public Builder setBlockID(BlockID bID) {
+      this.blockID = bID;
+      return this;
+    }
+
+    public Builder setKey(String keys) {
+      this.key = keys;
+      return this;
+    }
+
+    public Builder setXceiverClientManager(
+        XceiverClientFactory
+        xClientManager) {
+      this.xceiverClientManager = xClientManager;
+      return this;
+    }
+
+    public Builder setPipeline(Pipeline ppln) {
+      this.pipeline = ppln;
+      return this;
+    }
+
+
+    public Builder setLength(long len) {
+      this.length = len;
+      return this;
+    }
+
+
+    public Builder setBufferPool(BufferPool pool) {
+      this.bufferPool = pool;
+      return this;
+    }
+
+    public Builder setConfig(OzoneClientConfig clientConfig) {
+      this.config = clientConfig;
+      return this;
+    }
+
+    public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
+      this.token = bToken;
+      return this;
+    }
+
+    public BlockDataStreamOutputEntry build() {
+      return new BlockDataStreamOutputEntry(blockID,
+          key,
+          xceiverClientManager,
+          pipeline,
+          length,
+          bufferPool,
+          token, config);
+    }
+  }
+
+  @VisibleForTesting
+  public ByteBufStreamOutput getByteBufStreamOutput() {
+    return byteBufStreamOutput;
+  }
+
+  public BlockID getBlockID() {
+    return blockID;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public XceiverClientFactory getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  public long getCurrentPosition() {
+    return currentPosition;
+  }
+
+  public BufferPool getBufferPool() {
+    return bufferPool;
+  }
+
+  public void setCurrentPosition(long curPosition) {
+    this.currentPosition = curPosition;
+  }
+}
+
+
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
new file mode 100644
index 0000000..94c505f
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -0,0 +1,324 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * This class manages the stream entries list and handles block allocation
+ * from OzoneManager.
+ */
+public class BlockDataStreamOutputEntryPool {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockDataStreamOutputEntryPool.class);
+
+  private final List<BlockDataStreamOutputEntry> streamEntries;
+  private final OzoneClientConfig config;
+  private int currentStreamIndex;
+  private final OzoneManagerProtocol omClient;
+  private final OmKeyArgs keyArgs;
+  private final XceiverClientFactory xceiverClientFactory;
+  private final String requestID;
+  private final BufferPool bufferPool;
+  private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
+  private final long openID;
+  private final ExcludeList excludeList;
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public BlockDataStreamOutputEntryPool(
+      OzoneClientConfig config,
+      OzoneManagerProtocol omClient,
+      String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber,
+      boolean isMultipart, OmKeyInfo info,
+      boolean unsafeByteBufferConversion,
+      XceiverClientFactory xceiverClientFactory, long openID
+  ) {
+    this.config = config;
+    this.xceiverClientFactory = xceiverClientFactory;
+    streamEntries = new ArrayList<>();
+    currentStreamIndex = 0;
+    this.omClient = omClient;
+    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+        .setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
+        .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
+        .setMultipartUploadPartNumber(partNumber).build();
+    this.requestID = requestId;
+    this.openID = openID;
+    this.excludeList = new ExcludeList();
+
+    this.bufferPool =
+        new BufferPool(config.getStreamBufferSize(),
+            (int) (config.getStreamBufferMaxSize() / config
+                .getStreamBufferSize()),
+            ByteStringConversion
+                .createByteBufferConversion(unsafeByteBufferConversion));
+  }
+
+  /**
+   * A constructor for testing purpose only.
+   *
+   * @see KeyDataStreamOutput#KeyDataStreamOutput()
+   */
+  @VisibleForTesting
+  BlockDataStreamOutputEntryPool() {
+    streamEntries = new ArrayList<>();
+    omClient = null;
+    keyArgs = null;
+    xceiverClientFactory = null;
+    config =
+        new OzoneConfiguration().getObject(OzoneClientConfig.class);
+    config.setStreamBufferSize(0);
+    config.setStreamBufferMaxSize(0);
+    config.setStreamBufferFlushSize(0);
+    config.setStreamBufferFlushDelay(false);
+    requestID = null;
+    int chunkSize = 0;
+    bufferPool = new BufferPool(chunkSize, 1);
+
+    currentStreamIndex = 0;
+    openID = -1;
+    excludeList = new ExcludeList();
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    // server may return any number of blocks, (0 to any)
+    // only the blocks allocated in this open session (block createVersion
+    // equals to open session version)
+    for (OmKeyLocationInfo subKeyInfo : version.getLocationList(openVersion)) {
+      addKeyLocationInfo(subKeyInfo);
+    }
+  }
+
+  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
+    BlockDataStreamOutputEntry.Builder builder =
+        new BlockDataStreamOutputEntry.Builder()
+            .setBlockID(subKeyInfo.getBlockID())
+            .setKey(keyArgs.getKeyName())
+            .setXceiverClientManager(xceiverClientFactory)
+            .setPipeline(subKeyInfo.getPipeline())
+            .setConfig(config)
+            .setLength(subKeyInfo.getLength())
+            .setBufferPool(bufferPool)
+            .setToken(subKeyInfo.getToken());
+    streamEntries.add(builder.build());
+  }
+
+  public List<OmKeyLocationInfo> getLocationInfoList()  {
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    for (BlockDataStreamOutputEntry streamEntry : streamEntries) {
+      long length = streamEntry.getCurrentPosition();
+
+      // Commit only those blocks to OzoneManager which are not empty
+      if (length != 0) {
+        OmKeyLocationInfo info =
+            new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
+                .setLength(streamEntry.getCurrentPosition()).setOffset(0)
+                .setToken(streamEntry.getToken())
+                .setPipeline(streamEntry.getPipeline()).build();
+        locationInfoList.add(info);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "block written " + streamEntry.getBlockID() + ", length " + length
+                + " bcsID " + streamEntry.getBlockID()
+                .getBlockCommitSequenceId());
+      }
+    }
+    return locationInfoList;
+  }
+
+  /**
+   * Discards the subsequent pre allocated blocks and removes the streamEntries
+   * from the streamEntries list for the container which is closed.
+   * @param containerID id of the closed container
+   * @param pipelineId id of the associated pipeline
+   */
+  void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) {
+    // currentStreamIndex < streamEntries.size() signifies that, there are still
+    // pre allocated blocks available.
+
+    // This will be called only to discard the next subsequent unused blocks
+    // in the streamEntryList.
+    if (currentStreamIndex + 1 < streamEntries.size()) {
+      ListIterator<BlockDataStreamOutputEntry> streamEntryIterator =
+          streamEntries.listIterator(currentStreamIndex + 1);
+      while (streamEntryIterator.hasNext()) {
+        BlockDataStreamOutputEntry streamEntry = streamEntryIterator.next();
+        Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
+        if ((streamEntry.getPipeline().getId().equals(pipelineId)) ||
+            (containerID != -1 &&
+                streamEntry.getBlockID().getContainerID() == containerID)) {
+          streamEntryIterator.remove();
+        }
+      }
+    }
+  }
+
+  List<BlockDataStreamOutputEntry> getStreamEntries() {
+    return streamEntries;
+  }
+
+  XceiverClientFactory getXceiverClientFactory() {
+    return xceiverClientFactory;
+  }
+
+  String getKeyName() {
+    return keyArgs.getKeyName();
+  }
+
+  long getKeyLength() {
+    return streamEntries.stream().mapToLong(
+        BlockDataStreamOutputEntry::getCurrentPosition).sum();
+  }
+  /**
+   * Contact OM to get a new block. Set the new block with the index (e.g.
+   * first block has index = 0, second has index = 1 etc.)
+   *
+   * The returned block is made to new BlockDataStreamOutputEntry to write.
+   *
+   * @throws IOException
+   */
+  private void allocateNewBlock() throws IOException {
+    if (!excludeList.isEmpty()) {
+      LOG.debug("Allocating block with {}", excludeList);
+    }
+    OmKeyLocationInfo subKeyInfo =
+        omClient.allocateBlock(keyArgs, openID, excludeList);
+    addKeyLocationInfo(subKeyInfo);
+  }
+
+
+  void commitKey(long offset) throws IOException {
+    if (keyArgs != null) {
+      // in test, this could be null
+      long length = getKeyLength();
+      Preconditions.checkArgument(offset == length);
+      keyArgs.setDataSize(length);
+      keyArgs.setLocationInfoList(getLocationInfoList());
+      // When the key is multipart upload part file upload, we should not
+      // commit the key, as this is not an actual key, this is a just a
+      // partial key of a large file.
+      if (keyArgs.getIsMultipartKey()) {
+        commitUploadPartInfo =
+            omClient.commitMultipartUploadPart(keyArgs, openID);
+      } else {
+        omClient.commitKey(keyArgs, openID);
+      }
+    } else {
+      LOG.warn("Closing KeyDataStreamOutput, but key args is null");
+    }
+  }
+
+  public BlockDataStreamOutputEntry getCurrentStreamEntry() {
+    if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
+      return null;
+    } else {
+      return streamEntries.get(currentStreamIndex);
+    }
+  }
+
+  BlockDataStreamOutputEntry allocateBlockIfNeeded() throws IOException {
+    BlockDataStreamOutputEntry streamEntry = getCurrentStreamEntry();
+    if (streamEntry != null && streamEntry.isClosed()) {
+      // a stream entry gets closed either by :
+      // a. If the stream gets full
+      // b. it has encountered an exception
+      currentStreamIndex++;
+    }
+    if (streamEntries.size() <= currentStreamIndex) {
+      Preconditions.checkNotNull(omClient);
+      // allocate a new block, if a exception happens, log an error and
+      // throw exception to the caller directly, and the write fails.
+      allocateNewBlock();
+    }
+    // in theory, this condition should never violate due the check above
+    // still do a sanity check.
+    Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+    return streamEntries.get(currentStreamIndex);
+  }
+
+  long computeBufferData() {
+    return bufferPool.computeBufferData();
+  }
+
+  void cleanup() {
+    if (excludeList != null) {
+      excludeList.clear();
+    }
+    if (bufferPool != null) {
+      bufferPool.clearBufferPool();
+    }
+
+    if (streamEntries != null) {
+      streamEntries.clear();
+    }
+  }
+
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return commitUploadPartInfo;
+  }
+
+  public ExcludeList getExcludeList() {
+    return excludeList;
+  }
+
+  boolean isEmpty() {
+    return streamEntries.isEmpty();
+  }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
new file mode 100644
index 0000000..a9be116
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Maintaining a list of BlockInputStream. Write based on offset.
+ *
+ * Note that this may write to multiple containers in one write call. In case
+ * that first container succeeded but later ones failed, the succeeded writes
+ * are not rolled back.
+ *
+ * TODO : currently not support multi-thread access.
+ */
+public class KeyDataStreamOutput implements ByteBufStreamOutput {
+
+  private OzoneClientConfig config;
+
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(KeyDataStreamOutput.class);
+
+  private boolean closed;
+  private FileEncryptionInfo feInfo;
+  private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
+  private int retryCount;
+  // how much of data is actually written yet to underlying stream
+  private long offset;
+  // how much data has been ingested into the stream
+  private long writeOffset;
+  // whether an exception is encountered while write and whole write could
+  // not succeed
+  private boolean isException;
+  private final BlockDataStreamOutputEntryPool blockDataStreamOutputEntryPool;
+
+  private long clientID;
+
+  /**
+   * A constructor for testing purpose only.
+   */
+  @VisibleForTesting
+  public KeyDataStreamOutput() {
+    closed = false;
+    this.retryPolicyMap = HddsClientUtils.getExceptionList()
+        .stream()
+        .collect(Collectors.toMap(Function.identity(),
+            e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
+    retryCount = 0;
+    offset = 0;
+    blockDataStreamOutputEntryPool = new BlockDataStreamOutputEntryPool();
+  }
+
+  @VisibleForTesting
+  public List<BlockDataStreamOutputEntry> getStreamEntries() {
+    return blockDataStreamOutputEntryPool.getStreamEntries();
+  }
+
+  @VisibleForTesting
+  public XceiverClientFactory getXceiverClientFactory() {
+    return blockDataStreamOutputEntryPool.getXceiverClientFactory();
+  }
+
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockDataStreamOutputEntryPool.getLocationInfoList();
+  }
+
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
+
+  @VisibleForTesting
+  public long getClientID() {
+    return clientID;
+  }
+
+  @SuppressWarnings({"parameternumber", "squid:S00107"})
+  public KeyDataStreamOutput(
+      OzoneClientConfig config,
+      OpenKeySession handler,
+      XceiverClientFactory xceiverClientManager,
+      OzoneManagerProtocol omClient, int chunkSize,
+      String requestId, ReplicationConfig replicationConfig,
+      String uploadID, int partNumber, boolean isMultipart,
+      boolean unsafeByteBufferConversion
+  ) {
+    this.config = config;
+    OmKeyInfo info = handler.getKeyInfo();
+    blockDataStreamOutputEntryPool =
+        new BlockDataStreamOutputEntryPool(
+            config,
+            omClient,
+            requestId, replicationConfig,
+            uploadID, partNumber,
+            isMultipart, info,
+            unsafeByteBufferConversion,
+            xceiverClientManager,
+            handler.getId());
+
+    // Retrieve the file encryption key info, null if file is not in
+    // encrypted bucket.
+    this.feInfo = info.getFileEncryptionInfo();
+    this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
+        config.getMaxRetryCount(), config.getRetryInterval());
+    this.retryCount = 0;
+    this.isException = false;
+    this.writeOffset = 0;
+    this.clientID = handler.getId();
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    blockDataStreamOutputEntryPool.addPreallocateBlocks(version, openVersion);
+  }
+
+  @Override
+  public void write(ByteBuf b) throws IOException {
+    checkNotClosed();
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    final int len = b.readableBytes();
+    handleWrite(b, b.readerIndex(), len, false);
+    writeOffset += len;
+  }
+
+  private void handleWrite(ByteBuf b, int off, long len, boolean retry)
+      throws IOException {
+    while (len > 0) {
+      try {
+        BlockDataStreamOutputEntry current =
+            blockDataStreamOutputEntryPool.allocateBlockIfNeeded();
+        // length(len) will be in int range if the call is happening through
+        // write API of blockDataStreamOutput. Length can be in long range
+        // if it comes via Exception path.
+        int expectedWriteLen = Math.min((int) len,
+                (int) current.getRemaining());
+        long currentPos = current.getWrittenDataLength();
+        // writeLen will be updated based on whether the write was succeeded
+        // or if it sees an exception, how much the actual write was
+        // acknowledged.
+        int writtenLength =
+            writeToDataStreamOutput(current, retry, len, b,
+                expectedWriteLen, off, currentPos);
+        if (current.getRemaining() <= 0) {
+          // since the current block is already written close the stream.
+          handleFlushOrClose(StreamAction.FULL);
+        }
+        len -= writtenLength;
+        off += writtenLength;
+      } catch (Exception e) {
+        markStreamClosed();
+        throw new IOException(e);
+      }
+    }
+  }
+
+  private int writeToDataStreamOutput(BlockDataStreamOutputEntry current,
+      boolean retry, long len, ByteBuf b, int writeLen, int off,
+      long currentPos) throws IOException {
+    try {
+      if (retry) {
+        current.writeOnRetry(len);
+      } else {
+        current.write(b, off, writeLen);
+        offset += writeLen;
+      }
+    } catch (IOException ioe) {
+      // for the current iteration, totalDataWritten - currentPos gives the
+      // amount of data already written to the buffer
+
+      // In the retryPath, the total data to be written will always be equal
+      // to or less than the max length of the buffer allocated.
+      // The len specified here is the combined sum of the data length of
+      // the buffers
+      Preconditions.checkState(!retry || len <= config
+          .getStreamBufferMaxSize());
+      int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+      writeLen = retry ? (int) len : dataWritten;
+      // In retry path, the data written is already accounted in offset.
+      if (!retry) {
+        offset += writeLen;
+      }
+      LOG.debug("writeLen {}, total len {}", writeLen, len);
+      handleException(current, ioe);
+    }
+    return writeLen;
+  }
+
+  /**
+   * It performs following actions :
+   * a. Updates the committed length at datanode for the current stream in
+   * datanode.
+   * b. Reads the data from the underlying buffer and writes it the next stream.
+   *
+   * @param streamEntry StreamEntry
+   * @param exception   actual exception that occurred
+   * @throws IOException Throws IOException if Write fails
+   */
+  private void handleException(BlockDataStreamOutputEntry streamEntry,
+      IOException exception) throws IOException {
+    Throwable t = HddsClientUtils.checkForException(exception);
+    Preconditions.checkNotNull(t);
+    boolean retryFailure = checkForRetryFailure(t);
+    boolean containerExclusionException = false;
+    if (!retryFailure) {
+      containerExclusionException = checkIfContainerToExclude(t);
+    }
+    Pipeline pipeline = streamEntry.getPipeline();
+    PipelineID pipelineId = pipeline.getId();
+    long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
+    //set the correct length for the current stream
+    streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
+    long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData();
+    if (containerExclusionException) {
+      LOG.debug(
+          "Encountered exception {}. The last committed block length is {}, "
+              + "uncommitted data length is {} retry count {}", exception,
+          totalSuccessfulFlushedData, bufferedDataLen, retryCount);
+    } else {
+      LOG.warn(
+          "Encountered exception {} on the pipeline {}. "
+              + "The last committed block length is {}, "
+              + "uncommitted data length is {} retry count {}", exception,
+          pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
+    }
+    Preconditions.checkArgument(
+        bufferedDataLen <= config.getStreamBufferMaxSize());
+    Preconditions.checkArgument(
+        offset - blockDataStreamOutputEntryPool.getKeyLength() ==
+        bufferedDataLen);
+    long containerId = streamEntry.getBlockID().getContainerID();
+    Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
+    Preconditions.checkNotNull(failedServers);
+    ExcludeList excludeList = blockDataStreamOutputEntryPool.getExcludeList();
+    if (!failedServers.isEmpty()) {
+      excludeList.addDatanodes(failedServers);
+    }
+
+    // if the container needs to be excluded , add the container to the
+    // exclusion list , otherwise add the pipeline to the exclusion list
+    if (containerExclusionException) {
+      excludeList.addConatinerId(ContainerID.valueOf(containerId));
+    } else {
+      excludeList.addPipeline(pipelineId);
+    }
+    // just clean up the current stream.
+    streamEntry.cleanup(retryFailure);
+
+    // discard all subsequent blocks the containers and pipelines which
+    // are in the exclude list so that, the very next retry should never
+    // write data on the  closed container/pipeline
+    if (containerExclusionException) {
+      // discard subsequent pre allocated blocks from the streamEntries list
+      // from the closed container
+      blockDataStreamOutputEntryPool
+          .discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
+              null);
+    } else {
+      // In case there is timeoutException or Watch for commit happening over
+      // majority or the client connection failure to the leader in the
+      // pipeline, just discard all the pre allocated blocks on this pipeline.
+      // Next block allocation will happen with excluding this specific pipeline
+      // This will ensure if 2 way commit happens , it cannot span over multiple
+      // blocks
+      blockDataStreamOutputEntryPool
+          .discardPreallocatedBlocks(-1, pipelineId);
+    }
+    if (bufferedDataLen > 0) {
+      // If the data is still cached in the underlying stream, we need to
+      // allocate new block and write this data in the datanode.
+      handleRetry(exception, bufferedDataLen);
+      // reset the retryCount after handling the exception
+      retryCount = 0;
+    }
+  }
+
+  private void markStreamClosed() {
+    blockDataStreamOutputEntryPool.cleanup();
+    closed = true;
+  }
+
+  private void handleRetry(IOException exception, long len) throws IOException {
+    RetryPolicy retryPolicy = retryPolicyMap
+        .get(HddsClientUtils.checkForException(exception).getClass());
+    if (retryPolicy == null) {
+      retryPolicy = retryPolicyMap.get(Exception.class);
+    }
+    RetryPolicy.RetryAction action = null;
+    try {
+      action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
+    } catch (Exception e) {
+      setExceptionAndThrow(new IOException(e));
+    }
+    if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+      String msg = "";
+      if (action.reason != null) {
+        msg = "Retry request failed. " + action.reason;
+        LOG.error(msg, exception);
+      }
+      setExceptionAndThrow(new IOException(msg, exception));
+    }
+
+    // Throw the exception if the thread is interrupted
+    if (Thread.currentThread().isInterrupted()) {
+      LOG.warn("Interrupted while trying for retry");
+      setExceptionAndThrow(exception);
+    }
+    Preconditions.checkArgument(
+        action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+    if (action.delayMillis > 0) {
+      try {
+        Thread.sleep(action.delayMillis);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        IOException ioe =  (IOException) new InterruptedIOException(
+            "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+            .initCause(e);
+        setExceptionAndThrow(ioe);
+      }
+    }
+    retryCount++;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Retrying Write request. Already tried {} time(s); " +
+          "retry policy is {} ", retryCount, retryPolicy);
+    }
+    handleWrite(null, 0, len, true);
+  }
+
+  private void setExceptionAndThrow(IOException ioe) throws IOException {
+    isException = true;
+    throw ioe;
+  }
+
+  /**
+   * Checks if the provided exception signifies retry failure in ratis client.
+   * In case of retry failure, ratis client throws RaftRetryFailureException
+   * and all succeeding operations are failed with AlreadyClosedException.
+   */
+  private boolean checkForRetryFailure(Throwable t) {
+    return t instanceof RaftRetryFailureException
+        || t instanceof AlreadyClosedException;
+  }
+
+  // Every container specific exception from datatnode will be seen as
+  // StorageContainerException
+  private boolean checkIfContainerToExclude(Throwable t) {
+    return t instanceof StorageContainerException;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    checkNotClosed();
+    handleFlushOrClose(StreamAction.FLUSH);
+  }
+
+  /**
+   * Close or Flush the latest outputStream depending upon the action.
+   * This function gets called when while write is going on, the current stream
+   * gets full or explicit flush or close request is made by client. when the
+   * stream gets full and we try to close the stream , we might end up hitting
+   * an exception in the exception handling path, we write the data residing in
+   * in the buffer pool to a new Block. In cases, as such, when the data gets
+   * written to new stream , it will be at max half full. In such cases, we
+   * should just write the data and not close the stream as the block won't be
+   * completely full.
+   *
+   * @param op Flag which decides whether to call close or flush on the
+   *           outputStream.
+   * @throws IOException In case, flush or close fails with exception.
+   */
+  @SuppressWarnings("squid:S1141")
+  private void handleFlushOrClose(StreamAction op) throws IOException {
+    if (!blockDataStreamOutputEntryPool.isEmpty()) {
+      while (true) {
+        try {
+          BlockDataStreamOutputEntry entry =
+              blockDataStreamOutputEntryPool.getCurrentStreamEntry();
+          if (entry != null) {
+            try {
+              handleStreamAction(entry, op);
+            } catch (IOException ioe) {
+              handleException(entry, ioe);
+              continue;
+            }
+          }
+          return;
+        } catch (Exception e) {
+          markStreamClosed();
+          throw e;
+        }
+      }
+    }
+  }
+
+  private void handleStreamAction(BlockDataStreamOutputEntry entry,
+                                  StreamAction op) throws IOException {
+    Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+    // failed servers can be null in case there is no data written in
+    // the stream
+    if (!failedServers.isEmpty()) {
+      blockDataStreamOutputEntryPool.getExcludeList().addDatanodes(
+          failedServers);
+    }
+    switch (op) {
+    case CLOSE:
+      entry.close();
+      break;
+    case FULL:
+      if (entry.getRemaining() == 0) {
+        entry.close();
+      }
+      break;
+    case FLUSH:
+      entry.flush();
+      break;
+    default:
+      throw new IOException("Invalid Operation");
+    }
+  }
+
+  /**
+   * Commit the key to OM, this will add the blocks as the new key blocks.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      handleFlushOrClose(StreamAction.CLOSE);
+      if (!isException) {
+        Preconditions.checkArgument(writeOffset == offset);
+      }
+      blockDataStreamOutputEntryPool.commitKey(offset);
+    } finally {
+      blockDataStreamOutputEntryPool.cleanup();
+    }
+  }
+
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return blockDataStreamOutputEntryPool.getCommitUploadPartInfo();
+  }
+
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return feInfo;
+  }
+
+  @VisibleForTesting
+  public ExcludeList getExcludeList() {
+    return blockDataStreamOutputEntryPool.getExcludeList();
+  }
+
+  /**
+   * Builder class of KeyDataStreamOutput.
+   */
+  public static class Builder {
+    private OpenKeySession openHandler;
+    private XceiverClientFactory xceiverManager;
+    private OzoneManagerProtocol omClient;
+    private int chunkSize;
+    private String requestID;
+    private String multipartUploadID;
+    private int multipartNumber;
+    private boolean isMultipartKey;
+    private boolean unsafeByteBufferConversion;
+    private OzoneClientConfig clientConfig;
+    private ReplicationConfig replicationConfig;
+
+    public Builder setMultipartUploadID(String uploadID) {
+      this.multipartUploadID = uploadID;
+      return this;
+    }
+
+    public Builder setMultipartNumber(int partNumber) {
+      this.multipartNumber = partNumber;
+      return this;
+    }
+
+    public Builder setHandler(OpenKeySession handler) {
+      this.openHandler = handler;
+      return this;
+    }
+
+    public Builder setXceiverClientManager(XceiverClientFactory manager) {
+      this.xceiverManager = manager;
+      return this;
+    }
+
+    public Builder setOmClient(OzoneManagerProtocol client) {
+      this.omClient = client;
+      return this;
+    }
+
+    public Builder setChunkSize(int size) {
+      this.chunkSize = size;
+      return this;
+    }
+
+    public Builder setRequestID(String id) {
+      this.requestID = id;
+      return this;
+    }
+
+    public Builder setIsMultipartKey(boolean isMultipart) {
+      this.isMultipartKey = isMultipart;
+      return this;
+    }
+
+    public Builder setConfig(OzoneClientConfig config) {
+      this.clientConfig = config;
+      return this;
+    }
+
+    public Builder enableUnsafeByteBufferConversion(boolean enabled) {
+      this.unsafeByteBufferConversion = enabled;
+      return this;
+    }
+
+
+    public Builder setReplicationConfig(ReplicationConfig replConfig) {
+      this.replicationConfig = replConfig;
+      return this;
+    }
+
+    public KeyDataStreamOutput build() {
+      return new KeyDataStreamOutput(
+          clientConfig,
+          openHandler,
+          xceiverManager,
+          omClient,
+          chunkSize,
+          requestID,
+          replicationConfig,
+          multipartUploadID,
+          multipartNumber,
+          isMultipartKey,
+          unsafeByteBufferConversion);
+    }
+
+  }
+
+  /**
+   * Verify that the output stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+              + blockDataStreamOutputEntryPool.getKeyName());
+    }
+  }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
new file mode 100644
index 0000000..378b868
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+
+import java.io.IOException;
+
+/**
+ * OzoneDataStreamOutput is used to write data into Ozone.
+ * It uses SCM's {@link KeyDataStreamOutput} for writing the data.
+ */
+public class OzoneDataStreamOutput implements ByteBufStreamOutput {
+
+  private final ByteBufStreamOutput byteBufStreamOutput;
+
+  /**
+   * Constructs OzoneDataStreamOutput with KeyDataStreamOutput.
+   *
+   * @param byteBufStreamOutput
+   */
+  public OzoneDataStreamOutput(ByteBufStreamOutput byteBufStreamOutput) {
+    this.byteBufStreamOutput = byteBufStreamOutput;
+  }
+
+  @Override
+  public void write(ByteBuf b) throws IOException {
+    byteBufStreamOutput.write(b);
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    byteBufStreamOutput.flush();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    //commitKey can be done here, if needed.
+    byteBufStreamOutput.close();
+  }
+
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    if (byteBufStreamOutput instanceof KeyDataStreamOutput) {
+      return ((KeyDataStreamOutput)
+              byteBufStreamOutput).getCommitUploadPartInfo();
+    }
+    // Otherwise return null.
+    return null;
+  }
+
+  public ByteBufStreamOutput getByteBufStreamOutput() {
+    return byteBufStreamOutput;
+  }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 2feb577..6de6d22 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -292,6 +293,20 @@ public interface ClientProtocol {
       Map<String, String> metadata)
       throws IOException;
 
+  /**
+   * Writes a key in an existing bucket.
+   * @param volumeName Name of the Volume
+   * @param bucketName Name of the Bucket
+   * @param keyName Name of the Key
+   * @param size Size of the data
+   * @param metadata custom key value metadata
+   * @return {@link OzoneDataStreamOutput}
+   *
+   */
+  OzoneDataStreamOutput createStreamKey(String volumeName, String bucketName,
+      String keyName, long size, ReplicationConfig replicationConfig,
+      Map<String, String> metadata)
+      throws IOException;
 
   /**
    * Reads a key from an existing bucket.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 54a5f4d..43bb3ec 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -72,11 +72,13 @@ import org.apache.hadoop.ozone.client.OzoneMultipartUploadList;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.MultipartCryptoKeyInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneCryptoInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -818,6 +820,48 @@ public class RpcClient implements ClientProtocol {
     return createOutputStream(openKey, requestId, replicationConfig);
   }
 
+  @Override
+  public OzoneDataStreamOutput createStreamKey(
+      String volumeName, String bucketName, String keyName, long size,
+      ReplicationConfig replicationConfig,
+      Map<String, String> metadata)
+      throws IOException {
+    verifyVolumeName(volumeName);
+    verifyBucketName(bucketName);
+    if (checkKeyNameEnabled) {
+      HddsClientUtils.verifyKeyName(keyName);
+    }
+    HddsClientUtils.checkNotNull(keyName, replicationConfig);
+    String requestId = UUID.randomUUID().toString();
+
+    OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(size)
+        .setReplicationConfig(replicationConfig)
+        .addAllMetadata(metadata)
+        .setAcls(getAclList());
+
+    if (Boolean.parseBoolean(metadata.get(OzoneConsts.GDPR_FLAG))) {
+      try{
+        GDPRSymmetricKey gKey = new GDPRSymmetricKey(new SecureRandom());
+        builder.addAllMetadata(gKey.getKeyDetails());
+      } catch (Exception e) {
+        if (e instanceof InvalidKeyException &&
+            e.getMessage().contains("Illegal key size or default parameters")) {
+          LOG.error("Missing Unlimited Strength Policy jars. Please install " +
+              "Java Cryptography Extension (JCE) Unlimited Strength " +
+              "Jurisdiction Policy Files");
+        }
+        throw new IOException(e);
+      }
+    }
+
+    OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
+    return createDataStreamOutput(openKey, requestId, replicationConfig);
+  }
+
   private KeyProvider.KeyVersion getDEK(FileEncryptionInfo feInfo)
       throws IOException {
     // check crypto protocol version
@@ -1400,6 +1444,24 @@ public class RpcClient implements ClientProtocol {
           cryptoInputStreams);
     }
   }
+  private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey,
+      String requestId, ReplicationConfig replicationConfig)
+      throws IOException {
+    KeyDataStreamOutput keyOutputStream =
+        new KeyDataStreamOutput.Builder()
+            .setHandler(openKey)
+            .setXceiverClientManager(xceiverClientManager)
+            .setOmClient(ozoneManagerClient)
+            .setRequestID(requestId)
+            .setReplicationConfig(replicationConfig)
+            .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+            .setConfig(clientConfig)
+            .build();
+    keyOutputStream
+        .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
+            openKey.getOpenVersion());
+    return new OzoneDataStreamOutput(keyOutputStream);
+  }
 
   private OzoneOutputStream createOutputStream(OpenKeySession openKey,
       String requestId, ReplicationConfig replicationConfig)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
new file mode 100644
index 0000000..4d52d89
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import io.netty.buffer.Unpooled;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests BlockDataStreamOutput class.
+ */
+public class TestBlockDataStreamOutput {
+
+  /**
+    * Set a timeout for each test.
+    */
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf = new OzoneConfiguration();
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static int flushSize;
+  private static int maxFlushSize;
+  private static int blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    chunkSize = 100;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+
+    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+    clientConfig.setChecksumType(ChecksumType.NONE);
+    clientConfig.setStreamBufferFlushDelay(false);
+    conf.setFromObject(clientConfig);
+
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+        StorageUnit.MB);
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getRpcClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "testblockoutputstream";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  private String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testMultiChunkWrite() throws Exception {
+    // write data less than 1 chunk size use streaming.
+    String keyName1 = getKeyName();
+    OzoneDataStreamOutput key1 = createKey(
+        keyName1, ReplicationType.RATIS, 0);
+    int dataLength1 = chunkSize/2;
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength1)
+            .getBytes(UTF_8);
+    key1.write(Unpooled.copiedBuffer(data1));
+    // now close the stream, It will update the key length.
+    key1.close();
+    validateData(keyName1, data1);
+
+    // write data more than 1 chunk size use streaming.
+    String keyName2 = getKeyName();
+    OzoneDataStreamOutput key2 = createKey(
+        keyName2, ReplicationType.RATIS, 0);
+    int dataLength2 = chunkSize + 50;
+    byte[] data2 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength2)
+            .getBytes(UTF_8);
+    key2.write(Unpooled.copiedBuffer(data2));
+    // now close the stream, It will update the key length.
+    key2.close();
+    validateData(keyName2, data2);
+
+    // write data more than 1 block size use streaming.
+    String keyName3 = getKeyName();
+    OzoneDataStreamOutput key3 = createKey(
+        keyName3, ReplicationType.RATIS, 0);
+    int dataLength3 = blockSize + 50;
+    byte[] data3 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength3)
+            .getBytes(UTF_8);
+    key3.write(Unpooled.copiedBuffer(data3));
+    // now close the stream, It will update the key length.
+    key3.close();
+    validateData(keyName3, data3);
+  }
+
+  private OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return TestHelper.createStreamKey(
+        keyName, type, size, objectStore, volumeName, bucketName);
+  }
+  private void validateData(String keyName, byte[] data) throws Exception {
+    TestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 85d46ca..0e48dd9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -40,6 +42,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
@@ -123,8 +126,23 @@ public final class TestHelper {
         type == ReplicationType.STAND_ALONE ?
             org.apache.hadoop.hdds.client.ReplicationFactor.ONE :
             org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+    ReplicationConfig config =
+            ReplicationConfig.fromTypeAndFactor(type, factor);
+    return objectStore.getVolume(volumeName).getBucket(bucketName)
+        .createKey(keyName, size, config, new HashMap<>());
+  }
+
+  public static OzoneDataStreamOutput createStreamKey(String keyName,
+      ReplicationType type, long size, ObjectStore objectStore,
+      String volumeName, String bucketName) throws Exception {
+    org.apache.hadoop.hdds.client.ReplicationFactor factor =
+        type == ReplicationType.STAND_ALONE ?
+            org.apache.hadoop.hdds.client.ReplicationFactor.ONE :
+            org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+    ReplicationConfig config =
+        ReplicationConfig.fromTypeAndFactor(type, factor);
     return objectStore.getVolume(volumeName).getBucket(bucketName)
-        .createKey(keyName, size, type, factor, new HashMap<>());
+        .createStreamKey(keyName, size, config, new HashMap<>());
   }
 
   public static OzoneOutputStream createKey(String keyName,
@@ -132,8 +150,10 @@ public final class TestHelper {
       org.apache.hadoop.hdds.client.ReplicationFactor factor, long size,
       ObjectStore objectStore, String volumeName, String bucketName)
       throws Exception {
+    ReplicationConfig config =
+            ReplicationConfig.fromTypeAndFactor(type, factor);
     return objectStore.getVolume(volumeName).getBucket(bucketName)
-        .createKey(keyName, size, type, factor, new HashMap<>());
+        .createKey(keyName, size, config, new HashMap<>());
   }
 
   public static void validateData(String keyName, byte[] data,
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
index c575b6e..56bc834 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/keys/PutKeyHandler.java
@@ -23,9 +23,14 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfigValidator;
@@ -36,6 +41,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientException;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.shell.OzoneAddress;
 
 import org.apache.commons.codec.digest.DigestUtils;
@@ -113,10 +119,36 @@ public class PutKeyHandler extends KeyHandler {
 
     int chunkSize = (int) getConf().getStorageSize(OZONE_SCM_CHUNK_SIZE_KEY,
         OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES);
-    try (InputStream input = new FileInputStream(dataFile);
-        OutputStream output = bucket.createKey(keyName, dataFile.length(),
-            replicationConfig, keyMetadata)) {
-      IOUtils.copyBytes(input, output, chunkSize);
+
+    if (dataFile.length() <= chunkSize) {
+      if (isVerbose()) {
+        out().println("API: async");
+      }
+      try (InputStream input = new FileInputStream(dataFile);
+           OutputStream output = bucket.createKey(keyName, dataFile.length(),
+               replicationConfig, keyMetadata)) {
+        IOUtils.copyBytes(input, output, chunkSize);
+      }
+    } else {
+      if (isVerbose()) {
+        out().println("API: streaming");
+      }
+      try (RandomAccessFile raf = new RandomAccessFile(dataFile, "r");
+           OzoneDataStreamOutput out = bucket.createStreamKey(keyName,
+               dataFile.length(), replicationConfig, keyMetadata)) {
+        FileChannel ch = raf.getChannel();
+        long len = raf.length();
+        long off = 0;
+        while (len > 0) {
+          long writeLen = Math.min(len, chunkSize);
+          ByteBuffer segment =
+              ch.map(FileChannel.MapMode.READ_ONLY, off, writeLen);
+          ByteBuf buf = Unpooled.wrappedBuffer(segment);
+          out.write(buf);
+          off += writeLen;
+          len -= writeLen;
+        }
+      }
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 08/13: HDDS-5742. Avoid unnecessary Bytebuffer conversions (#2673)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit bdd2726151581ec847b53129d667529c16c6a1e0
Author: micah zhao <mi...@tencent.com>
AuthorDate: Thu Sep 23 11:26:43 2021 +0800

    HDDS-5742. Avoid unnecessary Bytebuffer conversions (#2673)
---
 .../org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java    | 3 +--
 .../src/main/java/org/apache/hadoop/ozone/common/Checksum.java       | 5 +++++
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index d0419fa..c69af90 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -485,8 +485,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
       throws IOException {
     final int effectiveChunkSize = buf.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
-    ChecksumData checksumData =
-        checksum.computeChecksum(buf.asReadOnlyBuffer());
+    ChecksumData checksumData = checksum.computeChecksum(buf);
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
         .setOffset(offset)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index 76f84c4..d300b9e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -139,6 +139,11 @@ public class Checksum {
    */
   public ChecksumData computeChecksum(ByteBuffer data)
       throws OzoneChecksumException {
+    // If type is set to NONE, we do not need to compute the checksums. We also
+    // need to avoid unnecessary conversions.
+    if (checksumType == ChecksumType.NONE) {
+      return new ChecksumData(checksumType, bytesPerChecksum);
+    }
     if (!data.isReadOnly()) {
       data = data.asReadOnlyBuffer();
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 06/13: HDDS-5599. [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid buffer copying (#2557)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 51f88cbe11628a7d61bd0f047019604d765a9115
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Aug 25 23:39:10 2021 +0800

    HDDS-5599.  [Ozone-Streaming]drop BufferPool and ChunkBuffer to avoid buffer copying (#2557)
---
 .../hdds/scm/storage/BlockDataStreamOutput.java    | 290 +++------------------
 .../hdds/scm/storage/StreamCommitWatcher.java      | 166 ++++++++++++
 .../client/io/BlockDataStreamOutputEntry.java      |  33 +--
 .../client/io/BlockDataStreamOutputEntryPool.java  |  20 --
 .../ozone/client/io/KeyDataStreamOutput.java       |  29 +--
 5 files changed, 211 insertions(+), 327 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index f658df1..39ec2f9 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -36,21 +36,18 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
-import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -93,7 +90,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
   private int chunkIndex;
   private final AtomicLong chunkOffset = new AtomicLong();
-  private final BufferPool bufferPool;
   // The IOException will be set by response handling thread in case there is an
   // exception received in the response. If the exception is set, the next
   // request will fail upfront.
@@ -106,28 +102,16 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
   // effective data write attempted so far for the block
   private long writtenDataLength;
 
-  // List containing buffers for which the putBlock call will
-  // update the length in the datanodes. This list will just maintain
-  // references to the buffers in the BufferPool which will be cleared
-  // when the watchForCommit acknowledges a putBlock logIndex has been
-  // committed on all datanodes. This list will be a  place holder for buffers
-  // which got written between successive putBlock calls.
-  private List<ChunkBuffer> bufferList;
-
   // This object will maintain the commitIndexes and byteBufferList in order
   // Also, corresponding to the logIndex, the corresponding list of buffers will
   // be released from the buffer pool.
-  private final CommitWatcher commitWatcher;
+  private final StreamCommitWatcher commitWatcher;
 
   private final List<DatanodeDetails> failedServers;
   private final Checksum checksum;
 
   //number of buffers used before doing a flush/putBlock.
   private int flushPeriod;
-  //bytes remaining to write in the current buffer.
-  private int currentBufferRemaining;
-  //current buffer allocated to write
-  private ChunkBuffer currentBuffer;
   private final Token<? extends TokenIdentifier> token;
   private final DataStreamOutput out;
   private CompletableFuture<DataStreamReply> dataStreamCloseReply;
@@ -141,13 +125,11 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    * @param blockID              block ID
    * @param xceiverClientManager client manager that controls client
    * @param pipeline             pipeline where block will be written
-   * @param bufferPool           pool of buffers
    */
   public BlockDataStreamOutput(
       BlockID blockID,
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
-      BufferPool bufferPool,
       OzoneClientConfig config,
       Token<? extends TokenIdentifier> token
   ) throws IOException {
@@ -163,11 +145,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
         (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline);
     // Alternatively, stream setup can be delayed till the first chunk write.
     this.out = setupStream();
-    this.bufferPool = bufferPool;
     this.token = token;
 
-    //number of buffers used before doing a flush
-    refreshCurrentBuffer(bufferPool);
     flushPeriod = (int) (config.getStreamBufferFlushSize() / config
         .getStreamBufferSize());
 
@@ -178,8 +157,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
-    commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
-    bufferList = null;
+    commitWatcher = new StreamCommitWatcher(xceiverClient);
     totalDataFlushedLength = 0;
     writtenDataLength = 0;
     failedServers = new ArrayList<>(0);
@@ -209,20 +187,10 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
         .stream(message.getContent().asReadOnlyByteBuffer());
   }
 
-  private void refreshCurrentBuffer(BufferPool pool) {
-    currentBuffer = pool.getCurrentBuffer();
-    currentBufferRemaining =
-        currentBuffer != null ? currentBuffer.remaining() : 0;
-  }
-
   public BlockID getBlockID() {
     return blockID.get();
   }
 
-  public long getTotalAckDataLength() {
-    return commitWatcher.getTotalAckDataLength();
-  }
-
   public long getWrittenDataLength() {
     return writtenDataLength;
   }
@@ -236,82 +204,29 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     return xceiverClient;
   }
 
-  @VisibleForTesting
-  public long getTotalDataFlushedLength() {
-    return totalDataFlushedLength;
-  }
-
-  @VisibleForTesting
-  public BufferPool getBufferPool() {
-    return bufferPool;
-  }
-
   public IOException getIoException() {
     return ioException.get();
   }
 
-  @VisibleForTesting
-  public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
-    return commitWatcher.getCommitIndex2flushedDataMap();
-  }
-
   @Override
-  public void write(ByteBuf b) throws IOException {
+  public void write(ByteBuf buf) throws IOException {
     checkOpen();
-    if (b == null) {
+    if (buf == null) {
       throw new NullPointerException();
     }
-    int off = b.readerIndex();
-    int len = b.readableBytes();
-
-    while (len > 0) {
-      allocateNewBufferIfNeeded();
-      final int writeLen = Math.min(currentBufferRemaining, len);
-      // TODO: avoid buffer copy here
-      currentBuffer.put(b.nioBuffer(off, writeLen));
-      currentBufferRemaining -= writeLen;
-      writeChunkIfNeeded();
-      off += writeLen;
-      len -= writeLen;
-      writtenDataLength += writeLen;
-      doFlushOrWatchIfNeeded();
-    }
-  }
-
-  private void writeChunkIfNeeded() throws IOException {
-    if (currentBufferRemaining == 0) {
-      writeChunk(currentBuffer);
-    }
-  }
-
-  private void doFlushOrWatchIfNeeded() throws IOException {
-    if (currentBufferRemaining == 0) {
-      if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
-        updateFlushLength();
-        executePutBlock(false, false);
-      }
-      // Data in the bufferPool can not exceed streamBufferMaxSize
-      if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
-        handleFullBuffer();
-      }
+    final int len = buf.readableBytes();
+    if (len == 0) {
+      return;
     }
-  }
+    writeChunkToContainer(buf);
 
-  private void allocateNewBufferIfNeeded() {
-    if (currentBufferRemaining == 0) {
-      currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
-      currentBufferRemaining = currentBuffer.remaining();
-    }
+    writtenDataLength += len;
   }
 
   private void updateFlushLength() {
     totalDataFlushedLength = writtenDataLength;
   }
 
-  private boolean isBufferPoolFull() {
-    return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
-  }
-
   /**
    * Will be called on the retryPath in case closedContainerException/
    * TimeoutException.
@@ -319,70 +234,9 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    * @throws IOException if error occurred
    */
 
-  // In this case, the data is already cached in the currentBuffer.
+  // TODO: We need add new retry policy without depend on bufferPool.
   public void writeOnRetry(long len) throws IOException {
-    if (len == 0) {
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Retrying write length {} for blockID {}", len, blockID);
-    }
-    Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
-    int count = 0;
-    while (len > 0) {
-      ChunkBuffer buffer = bufferPool.getBuffer(count);
-      long writeLen = Math.min(buffer.position(), len);
-      if (!buffer.hasRemaining()) {
-        writeChunk(buffer);
-      }
-      len -= writeLen;
-      count++;
-      writtenDataLength += writeLen;
-      // we should not call isBufferFull/shouldFlush here.
-      // The buffer might already be full as whole data is already cached in
-      // the buffer. We should just validate
-      // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
-      // call for handling full buffer/flush buffer condition.
-      if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
-        // reset the position to zero as now we will be reading the
-        // next buffer in the list
-        updateFlushLength();
-        executePutBlock(false, false);
-      }
-      if (writtenDataLength == config.getStreamBufferMaxSize()) {
-        handleFullBuffer();
-      }
-    }
-  }
 
-  /**
-   * This is a blocking call. It will wait for the flush till the commit index
-   * at the head of the commitIndex2flushedDataMap gets replicated to all or
-   * majority.
-   * @throws IOException
-   */
-  private void handleFullBuffer() throws IOException {
-    try {
-      checkOpen();
-      if (!commitWatcher.getFutureMap().isEmpty()) {
-        waitOnFlushFutures();
-      }
-    } catch (ExecutionException e) {
-      handleExecutionException(e);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      handleInterruptedException(ex, true);
-    }
-    watchForCommit(true);
-  }
-
-
-  // It may happen that once the exception is encountered , we still might
-  // have successfully flushed up to a certain index. Make sure the buffers
-  // only contain data which have not been sufficiently replicated
-  private void adjustBuffersOnException() {
-    commitWatcher.releaseBuffersOnException();
-    refreshCurrentBuffer(bufferPool);
   }
 
   /**
@@ -397,7 +251,8 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     checkOpen();
     try {
       XceiverClientReply reply = bufferFull ?
-          commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
+          commitWatcher.streamWatchOnFirstIndex() :
+          commitWatcher.streamWatchOnLastIndex();
       if (reply != null) {
         List<DatanodeDetails> dnList = reply.getDatanodes();
         if (!dnList.isEmpty()) {
@@ -412,7 +267,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
       setIoException(ioe);
       throw getIoException();
     }
-    refreshCurrentBuffer(bufferPool);
 
   }
 
@@ -426,22 +280,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
       boolean force) throws IOException {
     checkOpen();
     long flushPos = totalDataFlushedLength;
-    final List<ChunkBuffer> byteBufferList;
-    if (!force) {
-      Preconditions.checkNotNull(bufferList);
-      byteBufferList = bufferList;
-      bufferList = null;
-      Preconditions.checkNotNull(byteBufferList);
-    } else {
-      byteBufferList = null;
-    }
-
-    try {
-      CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
-    } catch (Exception e) {
-      LOG.warn("Failed to write all chunks through stream: " + e);
-      throw new IOException(e);
-    }
+    flush();
     if (close) {
       dataStreamCloseReply = out.closeAsync();
     }
@@ -471,15 +310,12 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
                 "Adding index " + asyncReply.getLogIndex() + " commitMap size "
-                    + commitWatcher.getCommitInfoMapSize() + " flushLength "
-                    + flushPos + " numBuffers " + byteBufferList.size()
-                    + " blockID " + blockID + " bufferPool size" + bufferPool
-                    .getSize() + " currentBufferIndex " + bufferPool
-                    .getCurrentBufferIndex());
+                    + commitWatcher.getCommitInfoSetSize() + " flushLength "
+                    + flushPos + " blockID " + blockID);
           }
           // for standalone protocol, logIndex will always be 0.
-          commitWatcher
-              .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
+          commitWatcher.updateCommitInfoSet(
+              asyncReply.getLogIndex());
         }
         return e;
       }, responseExecutor).exceptionally(e -> {
@@ -503,36 +339,12 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
   @Override
   public void flush() throws IOException {
-    if (xceiverClientFactory != null && xceiverClient != null
-        && bufferPool != null && bufferPool.getSize() > 0
-        && (!config.isStreamBufferFlushDelay() ||
-            writtenDataLength - totalDataFlushedLength
-                >= config.getStreamBufferSize())) {
-      try {
-        handleFlush(false);
-      } catch (ExecutionException e) {
-        // just set the exception here as well in order to maintain sanctity of
-        // ioException field
-        handleExecutionException(e);
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        handleInterruptedException(ex, true);
-      }
-    }
-  }
-
-  private void writeChunk(ChunkBuffer buffer)
-      throws IOException {
-    // This data in the buffer will be pushed to datanode and a reference will
-    // be added to the bufferList. Once putBlock gets executed, this list will
-    // be marked null. Hence, during first writeChunk call after every putBlock
-    // call or during the first call to writeChunk here, the list will be null.
-
-    if (bufferList == null) {
-      bufferList = new ArrayList<>();
+    try {
+      CompletableFuture.allOf(futures.toArray(EMPTY_FUTURE_ARRAY)).get();
+    } catch (Exception e) {
+      LOG.warn("Failed to write all chunks through stream: " + e);
+      throw new IOException(e);
     }
-    bufferList.add(buffer);
-    writeChunkToContainer(buffer.duplicate(0, buffer.position()));
   }
 
   /**
@@ -543,11 +355,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     checkOpen();
     // flush the last chunk data residing on the currentBuffer
     if (totalDataFlushedLength < writtenDataLength) {
-      refreshCurrentBuffer(bufferPool);
-      Preconditions.checkArgument(currentBuffer.position() > 0);
-      if (currentBuffer.hasRemaining()) {
-        writeChunk(currentBuffer);
-      }
       // This can be a partially filled chunk. Since we are flushing the buffer
       // here, we just limit this buffer to the current position. So that next
       // write will happen in new buffer
@@ -570,8 +377,7 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
   @Override
   public void close() throws IOException {
-    if (xceiverClientFactory != null && xceiverClient != null
-        && bufferPool != null && bufferPool.getSize() > 0) {
+    if (xceiverClientFactory != null && xceiverClient != null) {
       try {
         handleFlush(true);
         dataStreamCloseReply.get();
@@ -583,10 +389,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
       } finally {
         cleanup(false);
       }
-      // TODO: Turn the below buffer empty check on when Standalone pipeline
-      // is removed in the write path in tests
-      // Preconditions.checkArgument(buffer.position() == 0);
-      // bufferPool.checkBufferPoolEmpty();
 
     }
   }
@@ -638,10 +440,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     xceiverClientFactory = null;
     xceiverClient = null;
     commitWatcher.cleanup();
-    if (bufferList !=  null) {
-      bufferList.clear();
-    }
-    bufferList = null;
     responseExecutor.shutdown();
   }
 
@@ -655,7 +453,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
     if (isClosed()) {
       throw new IOException("BlockDataStreamOutput has been closed.");
     } else if (getIoException() != null) {
-      adjustBuffersOnException();
       throw getIoException();
     }
   }
@@ -683,12 +480,11 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    * @throws OzoneChecksumException if there is an error while computing
    * checksum
    */
-  private void writeChunkToContainer(ChunkBuffer chunk) throws IOException {
-    int effectiveChunkSize = chunk.remaining();
+  private void writeChunkToContainer(ByteBuf buf)
+      throws IOException {
+    ChecksumData checksumData = checksum.computeChecksum(buf.nioBuffer());
+    int effectiveChunkSize = buf.readableBytes();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
-    final ByteString data = chunk.toByteString(
-        bufferPool.byteStringConversion());
-    ChecksumData checksumData = checksum.computeChecksum(chunk);
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
         .setOffset(offset)
@@ -703,21 +499,22 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
 
     CompletableFuture<DataStreamReply> future =
         (needSync(offset + effectiveChunkSize) ?
-        out.writeAsync(data.asReadOnlyByteBuffer(), StandardWriteOption.SYNC) :
-        out.writeAsync(data.asReadOnlyByteBuffer()))
-        .whenCompleteAsync((r, e) -> {
-          if (e != null || !r.isSuccess()) {
-            if (e == null) {
-              e = new IOException("result is not success");
-            }
-            String msg = "Failed to write chunk " + chunkInfo.getChunkName() +
-                " " + "into block " + blockID;
-            LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
-            CompletionException ce = new CompletionException(msg, e);
-            setIoException(ce);
-            throw ce;
-          }
-        }, responseExecutor);
+            out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) :
+            out.writeAsync(buf.nioBuffer()))
+            .whenCompleteAsync((r, e) -> {
+              if (e != null || !r.isSuccess()) {
+                if (e == null) {
+                  e = new IOException("result is not success");
+                }
+                String msg =
+                    "Failed to write chunk " + chunkInfo.getChunkName() +
+                        " " + "into block " + blockID;
+                LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
+                CompletionException ce = new CompletionException(msg, e);
+                setIoException(ce);
+                throw ce;
+              }
+            }, responseExecutor);
 
     futures.add(future);
     containerBlockData.addChunks(chunkInfo);
@@ -754,7 +551,6 @@ public class BlockDataStreamOutput implements ByteBufStreamOutput {
    */
   private void handleExecutionException(Exception ex) throws IOException {
     setIoException(ex);
-    adjustBuffersOnException();
     throw getIoException();
   }
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
new file mode 100644
index 0000000..c187ffe
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * This class maintains the map of the commitIndexes to be watched for
+ * successful replication in the datanodes in a given pipeline. It also releases
+ * the buffers associated with the user data back to {@Link BufferPool} once
+ * minimum replication criteria is achieved during an ozone key write.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class executes watchForCommit on ratis pipeline and releases
+ * buffers once data successfully gets replicated.
+ */
+public class StreamCommitWatcher {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StreamCommitWatcher.class);
+
+  private Set<Long> commitIndexSet;
+
+  // future Map to hold up all putBlock futures
+  private ConcurrentHashMap<Long,
+      CompletableFuture<ContainerCommandResponseProto>>
+      futureMap;
+
+  private XceiverClientSpi xceiverClient;
+
+  public StreamCommitWatcher(XceiverClientSpi xceiverClient) {
+    this.xceiverClient = xceiverClient;
+    commitIndexSet = new ConcurrentSkipListSet();
+    futureMap = new ConcurrentHashMap<>();
+  }
+
+  public void updateCommitInfoSet(long index) {
+    commitIndexSet.add(index);
+  }
+
+  int getCommitInfoSetSize() {
+    return commitIndexSet.size();
+  }
+
+  /**
+   * Calls watch for commit for the first index in commitIndex2flushedDataMap to
+   * the Ratis client.
+   * @return {@link XceiverClientReply} reply from raft client
+   * @throws IOException in case watchForCommit fails
+   */
+  public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
+    if (!commitIndexSet.isEmpty()) {
+      // wait for the  first commit index in the commitIndex2flushedDataMap
+      // to get committed to all or majority of nodes in case timeout
+      // happens.
+      long index =
+          commitIndexSet.stream().mapToLong(v -> v).min()
+              .getAsLong();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("waiting for first index {} to catch up", index);
+      }
+      return streamWatchForCommit(index);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Calls watch for commit for the last index in commitIndex2flushedDataMap to
+   * the Ratis client.
+   * @return {@link XceiverClientReply} reply from raft client
+   * @throws IOException in case watchForCommit fails
+   */
+  public XceiverClientReply streamWatchOnLastIndex()
+      throws IOException {
+    if (!commitIndexSet.isEmpty()) {
+      // wait for the  commit index in the commitIndex2flushedDataMap
+      // to get committed to all or majority of nodes in case timeout
+      // happens.
+      long index =
+          commitIndexSet.stream().mapToLong(v -> v).max()
+              .getAsLong();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("waiting for last flush Index {} to catch up", index);
+      }
+      return streamWatchForCommit(index);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * calls watchForCommit API of the Ratis Client. This method is for streaming
+   * and no longer requires releaseBuffers
+   * @param commitIndex log index to watch for
+   * @return minimum commit index replicated to all nodes
+   * @throws IOException IOException in case watch gets timed out
+   */
+  public XceiverClientReply streamWatchForCommit(long commitIndex)
+      throws IOException {
+    try {
+      XceiverClientReply reply =
+          xceiverClient.watchForCommit(commitIndex);
+      return reply;
+    } catch (InterruptedException e) {
+      // Re-interrupt the thread while catching InterruptedException
+      Thread.currentThread().interrupt();
+      throw getIOExceptionForWatchForCommit(commitIndex, e);
+    } catch (TimeoutException | ExecutionException e) {
+      throw getIOExceptionForWatchForCommit(commitIndex, e);
+    }
+  }
+
+  private IOException getIOExceptionForWatchForCommit(long commitIndex,
+                                                       Exception e) {
+    LOG.warn("watchForCommit failed for index {}", commitIndex, e);
+    IOException ioException = new IOException(
+        "Unexpected Storage Container Exception: " + e.toString(), e);
+    return ioException;
+  }
+
+  public ConcurrentMap<Long,
+        CompletableFuture<
+            ContainerCommandResponseProto>> getFutureMap() {
+    return futureMap;
+  }
+
+  public void cleanup() {
+    if (commitIndexSet != null) {
+      commitIndexSet.clear();
+    }
+    if (futureMap != null) {
+      futureMap.clear();
+    }
+    commitIndexSet = null;
+  }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
index 6954742..98907bf 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ByteBufStreamOutput;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
@@ -52,15 +51,12 @@ public final class BlockDataStreamOutputEntry
   private long currentPosition;
   private final Token<OzoneBlockTokenIdentifier> token;
 
-  private BufferPool bufferPool;
-
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   private BlockDataStreamOutputEntry(
       BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager,
       Pipeline pipeline,
       long length,
-      BufferPool bufferPool,
       Token<OzoneBlockTokenIdentifier> token,
       OzoneClientConfig config
   ) {
@@ -73,7 +69,6 @@ public final class BlockDataStreamOutputEntry
     this.token = token;
     this.length = length;
     this.currentPosition = 0;
-    this.bufferPool = bufferPool;
   }
 
   long getLength() {
@@ -98,7 +93,7 @@ public final class BlockDataStreamOutputEntry
     if (this.byteBufStreamOutput == null) {
       this.byteBufStreamOutput =
           new BlockDataStreamOutput(blockID, xceiverClientManager,
-              pipeline, bufferPool, config, token);
+              pipeline, config, token);
     }
   }
 
@@ -135,20 +130,6 @@ public final class BlockDataStreamOutputEntry
     return false;
   }
 
-  long getTotalAckDataLength() {
-    if (byteBufStreamOutput != null) {
-      BlockDataStreamOutput out =
-          (BlockDataStreamOutput) this.byteBufStreamOutput;
-      blockID = out.getBlockID();
-      return out.getTotalAckDataLength();
-    } else {
-      // For a pre allocated block for which no write has been initiated,
-      // the ByteBufStreamOutput will be null here.
-      // In such cases, the default blockCommitSequenceId will be 0
-      return 0;
-    }
-  }
-
   Collection<DatanodeDetails> getFailedServers() {
     if (byteBufStreamOutput != null) {
       BlockDataStreamOutput out =
@@ -198,7 +179,6 @@ public final class BlockDataStreamOutputEntry
     private XceiverClientFactory xceiverClientManager;
     private Pipeline pipeline;
     private long length;
-    private BufferPool bufferPool;
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
 
@@ -230,12 +210,6 @@ public final class BlockDataStreamOutputEntry
       return this;
     }
 
-
-    public Builder setBufferPool(BufferPool pool) {
-      this.bufferPool = pool;
-      return this;
-    }
-
     public Builder setConfig(OzoneClientConfig clientConfig) {
       this.config = clientConfig;
       return this;
@@ -252,7 +226,6 @@ public final class BlockDataStreamOutputEntry
           xceiverClientManager,
           pipeline,
           length,
-          bufferPool,
           token, config);
     }
   }
@@ -282,10 +255,6 @@ public final class BlockDataStreamOutputEntry
     return currentPosition;
   }
 
-  public BufferPool getBufferPool() {
-    return bufferPool;
-  }
-
   public void setCurrentPosition(long curPosition) {
     this.currentPosition = curPosition;
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 94c505f..4bc55de 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -22,12 +22,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -58,7 +56,6 @@ public class BlockDataStreamOutputEntryPool {
   private final OmKeyArgs keyArgs;
   private final XceiverClientFactory xceiverClientFactory;
   private final String requestID;
-  private final BufferPool bufferPool;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private final long openID;
   private final ExcludeList excludeList;
@@ -86,13 +83,6 @@ public class BlockDataStreamOutputEntryPool {
     this.requestID = requestId;
     this.openID = openID;
     this.excludeList = new ExcludeList();
-
-    this.bufferPool =
-        new BufferPool(config.getStreamBufferSize(),
-            (int) (config.getStreamBufferMaxSize() / config
-                .getStreamBufferSize()),
-            ByteStringConversion
-                .createByteBufferConversion(unsafeByteBufferConversion));
   }
 
   /**
@@ -114,8 +104,6 @@ public class BlockDataStreamOutputEntryPool {
     config.setStreamBufferFlushDelay(false);
     requestID = null;
     int chunkSize = 0;
-    bufferPool = new BufferPool(chunkSize, 1);
-
     currentStreamIndex = 0;
     openID = -1;
     excludeList = new ExcludeList();
@@ -154,7 +142,6 @@ public class BlockDataStreamOutputEntryPool {
             .setPipeline(subKeyInfo.getPipeline())
             .setConfig(config)
             .setLength(subKeyInfo.getLength())
-            .setBufferPool(bufferPool)
             .setToken(subKeyInfo.getToken());
     streamEntries.add(builder.build());
   }
@@ -293,17 +280,10 @@ public class BlockDataStreamOutputEntryPool {
     return streamEntries.get(currentStreamIndex);
   }
 
-  long computeBufferData() {
-    return bufferPool.computeBufferData();
-  }
-
   void cleanup() {
     if (excludeList != null) {
       excludeList.clear();
     }
-    if (bufferPool != null) {
-      bufferPool.clearBufferPool();
-    }
 
     if (streamEntries != null) {
       streamEntries.clear();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index a9be116..c37f9cd 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -279,27 +279,7 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
     }
     Pipeline pipeline = streamEntry.getPipeline();
     PipelineID pipelineId = pipeline.getId();
-    long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
-    //set the correct length for the current stream
-    streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
-    long bufferedDataLen = blockDataStreamOutputEntryPool.computeBufferData();
-    if (containerExclusionException) {
-      LOG.debug(
-          "Encountered exception {}. The last committed block length is {}, "
-              + "uncommitted data length is {} retry count {}", exception,
-          totalSuccessfulFlushedData, bufferedDataLen, retryCount);
-    } else {
-      LOG.warn(
-          "Encountered exception {} on the pipeline {}. "
-              + "The last committed block length is {}, "
-              + "uncommitted data length is {} retry count {}", exception,
-          pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
-    }
-    Preconditions.checkArgument(
-        bufferedDataLen <= config.getStreamBufferMaxSize());
-    Preconditions.checkArgument(
-        offset - blockDataStreamOutputEntryPool.getKeyLength() ==
-        bufferedDataLen);
+
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
@@ -337,13 +317,6 @@ public class KeyDataStreamOutput implements ByteBufStreamOutput {
       blockDataStreamOutputEntryPool
           .discardPreallocatedBlocks(-1, pipelineId);
     }
-    if (bufferedDataLen > 0) {
-      // If the data is still cached in the underlying stream, we need to
-      // allocate new block and write this data in the datanode.
-      handleRetry(exception, bufferedDataLen);
-      // reset the retryCount after handling the exception
-      retryCount = 0;
-    }
   }
 
   private void markStreamClosed() {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 03/13: HDDS-5481. Fix stream() and link() method in ContainerStateMachine. (#2451)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 0cc389af40096be656f945cd4d9cd7ba361f3f32
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Thu Jul 22 19:46:31 2021 +0800

    HDDS-5481. Fix stream() and link() method in ContainerStateMachine. (#2451)
---
 .../common/transport/server/ratis/ContainerStateMachine.java          | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 5f4bac0..dda1fb3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -506,8 +506,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     return CompletableFuture.supplyAsync(() -> {
       try {
         ContainerCommandRequestProto requestProto =
-            getContainerCommandRequestProto(gid,
-                request.getMessage().getContent());
+            message2ContainerCommandRequestProto(request.getMessage());
         DispatcherContext context =
             new DispatcherContext.Builder()
                 .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
@@ -524,6 +523,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     }, executor);
   }
 
+  @Override
   public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
     return CompletableFuture.supplyAsync(() -> {
       if (stream == null) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 12/13: HDDS-5895. [Ozone-Streaming] Make raft.server.data-stream.client.pool.size configurable (#2766)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 9037764bb7eaf4e3a681c6fb05316a66a6ed77b4
Author: micah zhao <mi...@tencent.com>
AuthorDate: Tue Oct 26 15:10:58 2021 +0800

    HDDS-5895. [Ozone-Streaming] Make raft.server.data-stream.client.pool.size configurable (#2766)
---
 .../transport/server/ratis/XceiverServerRatis.java      |  5 +++++
 .../hadoop/hdds/conf/DatanodeRatisServerConfig.java     | 17 +++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 35d3627..9f7b03f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -244,6 +244,11 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             .getStreamWriteThreads();
     RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
         dataStreamWriteRequestThreadPoolSize);
+    int dataStreamClientPoolSize =
+        conf.getObject(DatanodeRatisServerConfig.class)
+            .getClientPoolSize();
+    RaftServerConfigKeys.DataStream.setClientPoolSize(properties,
+        dataStreamClientPoolSize);
   }
 
   @SuppressWarnings("checkstyle:methodlength")
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index 205d92e..3132928 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -158,6 +158,23 @@ public class DatanodeRatisServerConfig {
     this.streamWriteThreads = streamWriteThreads;
   }
 
+  @Config(key = "datastream.client.pool.size",
+      defaultValue = "10",
+      type = ConfigType.INT,
+      tags = {OZONE, DATANODE, RATIS, DATASTREAM},
+      description = "Maximum number of client proxy in NettyServerStreamRpc " +
+          "for datastream write."
+  )
+  private int clientPoolSize;
+
+  public int getClientPoolSize() {
+    return clientPoolSize;
+  }
+
+  public void setClientPoolSize(int clientPoolSize) {
+    this.clientPoolSize = clientPoolSize;
+  }
+
   @Config(key = "delete.ratis.log.directory",
           defaultValue = "true",
           type = ConfigType.BOOLEAN,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 13/13: HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 1d9f870ea3950cbfa153d8fac0799da7a6aec8a2
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 1 23:39:53 2021 +0800

    HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782)
---
 .../transport/server/ratis/ContainerStateMachine.java   |  7 +++++--
 .../common/transport/server/ratis/LocalStream.java      | 10 +++++++++-
 .../transport/server/ratis/XceiverServerRatis.java      |  5 -----
 .../hadoop/hdds/conf/DatanodeRatisServerConfig.java     | 17 -----------------
 4 files changed, 14 insertions(+), 25 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index dda1fb3..9e4a9e9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -515,8 +515,11 @@ public class ContainerStateMachine extends BaseStateMachine {
 
         ContainerCommandResponseProto response = runCommand(
             requestProto, context);
-        String path = response.getMessage();
-        return new LocalStream(new StreamDataChannel(Paths.get(path)));
+        final StreamDataChannel channel = new StreamDataChannel(
+            Paths.get(response.getMessage()));
+        final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ?
+            getChunkExecutor(requestProto.getWriteChunk()) : null;
+        return new LocalStream(channel, chunkExecutor);
       } catch (IOException e) {
         throw new CompletionException("Failed to create data stream", e);
       }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
index baae013..780f874 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
@@ -23,12 +23,15 @@ import org.apache.ratis.statemachine.StateMachine;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
 
 class LocalStream implements StateMachine.DataStream {
   private final StateMachine.DataChannel dataChannel;
+  private final Executor executor;
 
-  LocalStream(StateMachine.DataChannel dataChannel) {
+  LocalStream(StateMachine.DataChannel dataChannel, Executor executor) {
     this.dataChannel = dataChannel;
+    this.executor = executor;
   }
 
   @Override
@@ -47,4 +50,9 @@ class LocalStream implements StateMachine.DataStream {
       }
     });
   }
+
+  @Override
+  public Executor getExecutor() {
+    return executor;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 9f7b03f..42f4026 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -239,11 +239,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             .getStreamRequestThreads();
     RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties,
         dataStreamAsyncRequestThreadPoolSize);
-    int dataStreamWriteRequestThreadPoolSize =
-        conf.getObject(DatanodeRatisServerConfig.class)
-            .getStreamWriteThreads();
-    RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
-        dataStreamWriteRequestThreadPoolSize);
     int dataStreamClientPoolSize =
         conf.getObject(DatanodeRatisServerConfig.class)
             .getClientPoolSize();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index 3132928..058932e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -141,23 +141,6 @@ public class DatanodeRatisServerConfig {
     this.streamRequestThreads = streamRequestThreads;
   }
 
-  @Config(key = "datastream.write.threads",
-      defaultValue = "20",
-      type = ConfigType.INT,
-      tags = {OZONE, DATANODE, RATIS, DATASTREAM},
-      description = "Maximum number of threads in the thread pool for " +
-          "datastream write."
-  )
-  private int streamWriteThreads;
-
-  public int getStreamWriteThreads() {
-    return streamWriteThreads;
-  }
-
-  public void setStreamWriteThreads(int streamWriteThreads) {
-    this.streamWriteThreads = streamWriteThreads;
-  }
-
   @Config(key = "datastream.client.pool.size",
       defaultValue = "10",
       type = ConfigType.INT,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org