You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/25 10:20:36 UTC

[hadoop] branch ozone-0.4 updated: HDDS-1317. KeyOutputStream#write throws ArrayIndexOutOfBoundsException when running RandomWrite MR examples. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/ozone-0.4 by this push:
     new eed623a  HDDS-1317. KeyOutputStream#write throws ArrayIndexOutOfBoundsException when running RandomWrite MR examples. Contributed by Shashikant Banerjee.
eed623a is described below

commit eed623ad618d06784858f793d72ecc01126753ef
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Mar 25 15:41:20 2019 +0530

    HDDS-1317. KeyOutputStream#write throws ArrayIndexOutOfBoundsException when running RandomWrite MR examples. Contributed by Shashikant Banerjee.
---
 .../hadoop/hdds/scm/XceiverClientMetrics.java      |  20 +
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |  90 +--
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 199 ++++--
 .../apache/hadoop/hdds/scm/storage/BufferPool.java |  16 +-
 .../org/apache/hadoop/hdds/client/BlockID.java     |   4 +-
 .../ozone/client/io/BlockOutputStreamEntry.java    |   6 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  59 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   7 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  12 +-
 .../ozone/client/rpc/TestBlockOutputStream.java    | 690 +++++++++++++++++++++
 .../rpc/TestBlockOutputStreamWithFailures.java     | 546 ++++++++++++++++
 .../rpc/TestCloseContainerHandlingByClient.java    |  11 +-
 .../ozone/container/ContainerTestHelper.java       |  17 +
 .../commandhandler/TestBlockDeletion.java          |   2 +-
 14 files changed, 1543 insertions(+), 136 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
index a430400..6c40921 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.metrics2.MetricsSystem;
@@ -37,7 +38,9 @@ public class XceiverClientMetrics {
       .getSimpleName();
 
   private @Metric MutableCounterLong pendingOps;
+  private @Metric MutableCounterLong totalOps;
   private MutableCounterLong[] pendingOpsArray;
+  private MutableCounterLong[] opsArray;
   private MutableRate[] containerOpsLatency;
   private MetricsRegistry registry;
 
@@ -46,12 +49,17 @@ public class XceiverClientMetrics {
     this.registry = new MetricsRegistry(SOURCE_NAME);
 
     this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
+    this.opsArray = new MutableCounterLong[numEnumEntries];
     this.containerOpsLatency = new MutableRate[numEnumEntries];
     for (int i = 0; i < numEnumEntries; i++) {
       pendingOpsArray[i] = registry.newCounter(
           "numPending" + ContainerProtos.Type.forNumber(i + 1),
           "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops",
           (long) 0);
+      opsArray[i] = registry
+          .newCounter("opCount" + ContainerProtos.Type.forNumber(i + 1),
+              "number of" + ContainerProtos.Type.forNumber(i + 1) + " ops",
+              (long) 0);
 
       containerOpsLatency[i] = registry.newRate(
           ContainerProtos.Type.forNumber(i + 1) + "Latency",
@@ -68,6 +76,8 @@ public class XceiverClientMetrics {
 
   public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
     pendingOps.incr();
+    totalOps.incr();
+    opsArray[type.ordinal()].incr();
     pendingOpsArray[type.ordinal()].incr();
   }
 
@@ -85,6 +95,16 @@ public class XceiverClientMetrics {
     return pendingOpsArray[type.ordinal()].value();
   }
 
+  @VisibleForTesting
+  public long getTotalOpCount() {
+    return totalOps.value();
+  }
+
+  @VisibleForTesting
+  public long getContainerOpCountMetrics(ContainerProtos.Type type) {
+    return opsArray[type.ordinal()].value();
+  }
+
   public void unRegister() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     ms.unregisterSource(SOURCE_NAME);
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 65241bf..a2e65e2 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 
 import io.opentracing.Scope;
 import io.opentracing.util.GlobalTracer;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftRetryFailureException;
@@ -101,6 +103,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   // create a separate RaftClient for watchForCommit API
   private RaftClient watchClient;
 
+  private XceiverClientMetrics metrics;
+
   /**
    * Constructs a client.
    */
@@ -116,6 +120,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     watchClient = null;
     this.tlsConfig = tlsConfig;
     this.clientRequestTimeout = timeout;
+    metrics = XceiverClientManager.getXceiverClientMetrics();
   }
 
   private void updateCommitInfosMap(
@@ -199,6 +204,12 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return Objects.requireNonNull(client.get(), "client is null");
   }
 
+
+  @VisibleForTesting
+  public ConcurrentHashMap<UUID, Long> getCommitInfoMap() {
+    return commitInfoMap;
+  }
+
   private CompletableFuture<RaftClientReply> sendRequestAsync(
       ContainerCommandRequestProto request) {
     try (Scope scope = GlobalTracer.get()
@@ -301,47 +312,52 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   public XceiverClientReply sendCommandAsync(
       ContainerCommandRequestProto request) {
     XceiverClientReply asyncReply = new XceiverClientReply(null);
+    long requestTime = Time.monotonicNowNanos();
     CompletableFuture<RaftClientReply> raftClientReply =
         sendRequestAsync(request);
+    metrics.incrPendingContainerOpsMetrics(request.getCmdType());
     CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
-        raftClientReply.whenComplete((reply, e) -> LOG.debug(
-            "received reply {} for request: cmdType={} containerID={}"
-                + " pipelineID={} traceID={} exception: {}", reply,
-            request.getCmdType(), request.getContainerID(),
-            request.getPipelineID(), request.getTraceID(), e))
-            .thenApply(reply -> {
-              try {
-                // we need to handle RaftRetryFailure Exception
-                RaftRetryFailureException raftRetryFailureException =
-                    reply.getRetryFailureException();
-                if (raftRetryFailureException != null) {
-                  // in case of raft retry failure, the raft client is
-                  // not able to connect to the leader hence the pipeline
-                  // can not be used but this instance of RaftClient will close
-                  // and refreshed again. In case the client cannot connect to
-                   // leader, getClient call will fail.
+        raftClientReply.whenComplete((reply, e) -> {
+          LOG.debug("received reply {} for request: cmdType={} containerID={}"
+                  + " pipelineID={} traceID={} exception: {}", reply,
+              request.getCmdType(), request.getContainerID(),
+              request.getPipelineID(), request.getTraceID(), e);
+          metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+          metrics.addContainerOpsLatency(request.getCmdType(),
+              Time.monotonicNowNanos() - requestTime);
+        }).thenApply(reply -> {
+          try {
+            // we need to handle RaftRetryFailure Exception
+            RaftRetryFailureException raftRetryFailureException =
+                reply.getRetryFailureException();
+            if (raftRetryFailureException != null) {
+              // in case of raft retry failure, the raft client is
+              // not able to connect to the leader hence the pipeline
+              // can not be used but this instance of RaftClient will close
+              // and refreshed again. In case the client cannot connect to
+              // leader, getClient call will fail.
 
-                  // No need to set the failed Server ID here. Ozone client
-                  // will directly exclude this pipeline in next allocate block
-                  // to SCM as in this case, it is the raft client which is not
-                  // able to connect to leader in the pipeline, though the
-                  // pipeline can still be functional.
-                  throw new CompletionException(raftRetryFailureException);
-                }
-                ContainerCommandResponseProto response =
-                    ContainerCommandResponseProto
-                        .parseFrom(reply.getMessage().getContent());
-                UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
-                if (response.getResult() == ContainerProtos.Result.SUCCESS) {
-                  updateCommitInfosMap(reply.getCommitInfos());
-                }
-                asyncReply.setLogIndex(reply.getLogIndex());
-                addDatanodetoReply(serverId, asyncReply);
-                return response;
-              } catch (InvalidProtocolBufferException e) {
-                throw new CompletionException(e);
-              }
-            });
+              // No need to set the failed Server ID here. Ozone client
+              // will directly exclude this pipeline in next allocate block
+              // to SCM as in this case, it is the raft client which is not
+              // able to connect to leader in the pipeline, though the
+              // pipeline can still be functional.
+              throw new CompletionException(raftRetryFailureException);
+            }
+            ContainerCommandResponseProto response =
+                ContainerCommandResponseProto
+                    .parseFrom(reply.getMessage().getContent());
+            UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
+            if (response.getResult() == ContainerProtos.Result.SUCCESS) {
+              updateCommitInfosMap(reply.getCommitInfos());
+            }
+            asyncReply.setLogIndex(reply.getLogIndex());
+            addDatanodetoReply(serverId, asyncReply);
+            return response;
+          } catch (InvalidProtocolBufferException e) {
+            throw new CompletionException(e);
+          }
+        });
     asyncReply.setResponse(containerCommandResponse);
     return asyncReply;
   }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 13913ee..cfbb6ae 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -17,6 +17,7 @@
  */
 
 package org.apache.hadoop.hdds.scm.storage;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -45,7 +46,15 @@ import java.util.Collections;
 import java.util.UUID;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.concurrent.*;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
@@ -104,15 +113,25 @@ public class BlockOutputStream extends OutputStream {
   // by all servers
   private long totalAckDataLength;
 
+  // List containing buffers for which the putBlock call will
+  // update the length in the datanodes. This list will just maintain
+  // references to the buffers in the BufferPool which will be cleared
+  // when the watchForCommit acknowledges a putBlock logIndex has been
+  // committed on all datanodes. This list will be a  place holder for buffers
+  // which got written between successive putBlock calls.
+  private List<ByteBuffer> bufferList;
+
   // future Map to hold up all putBlock futures
   private ConcurrentHashMap<Long,
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
       futureMap;
-  // map containing mapping for putBlock logIndex to to flushedDataLength Map.
 
   // The map should maintain the keys (logIndexes) in order so that while
   // removing we always end up updating incremented data flushed length.
-  private ConcurrentSkipListMap<Long, Long> commitIndex2flushedDataMap;
+  // Also, corresponding to the logIndex, the corresponding list of buffers will
+  // be released from the buffer pool.
+  private ConcurrentSkipListMap<Long, List<ByteBuffer>>
+      commitIndex2flushedDataMap;
 
   private List<DatanodeDetails> failedServers;
 
@@ -167,13 +186,15 @@ public class BlockOutputStream extends OutputStream {
     totalDataFlushedLength = 0;
     writtenDataLength = 0;
     failedServers = Collections.emptyList();
+    bufferList = null;
   }
 
+
   public BlockID getBlockID() {
     return blockID;
   }
 
-  public long getTotalSuccessfulFlushedData() {
+  public long getTotalAckDataLength() {
     return totalAckDataLength;
   }
 
@@ -185,6 +206,31 @@ public class BlockOutputStream extends OutputStream {
     return failedServers;
   }
 
+  @VisibleForTesting
+  public XceiverClientSpi getXceiverClient() {
+    return xceiverClient;
+  }
+
+  @VisibleForTesting
+  public long getTotalDataFlushedLength() {
+    return totalDataFlushedLength;
+  }
+
+  @VisibleForTesting
+  public BufferPool getBufferPool() {
+    return bufferPool;
+  }
+
+  @VisibleForTesting
+  public IOException getIoException() {
+    return ioException;
+  }
+
+  @VisibleForTesting
+  public Map<Long, List<ByteBuffer>> getCommitIndex2flushedDataMap() {
+    return commitIndex2flushedDataMap;
+  }
+
   @Override
   public void write(int b) throws IOException {
     checkOpen();
@@ -206,9 +252,9 @@ public class BlockOutputStream extends OutputStream {
     if (len == 0) {
       return;
     }
+
     while (len > 0) {
       int writeLen;
-
       // Allocate a buffer if needed. The buffer will be allocated only
       // once as needed and will be reused again for multiple blockOutputStream
       // entries.
@@ -224,8 +270,8 @@ public class BlockOutputStream extends OutputStream {
       len -= writeLen;
       writtenDataLength += writeLen;
       if (shouldFlush()) {
-        totalDataFlushedLength += streamBufferFlushSize;
-        handlePartialFlush();
+        updateFlushLength();
+        executePutBlock();
       }
       // Data in the bufferPool can not exceed streamBufferMaxSize
       if (isBufferPoolFull()) {
@@ -235,7 +281,11 @@ public class BlockOutputStream extends OutputStream {
   }
 
   private boolean shouldFlush() {
-    return writtenDataLength % streamBufferFlushSize == 0;
+    return bufferPool.computeBufferData() % streamBufferFlushSize == 0;
+  }
+
+  private void updateFlushLength() {
+    totalDataFlushedLength += writtenDataLength - totalDataFlushedLength;
   }
 
   private boolean isBufferPoolFull() {
@@ -264,17 +314,17 @@ public class BlockOutputStream extends OutputStream {
       len -= writeLen;
       count++;
       writtenDataLength += writeLen;
-      if (shouldFlush()) {
+      // we should not call isBufferFull/shouldFlush here.
+      // The buffer might already be full as whole data is already cached in
+      // the buffer. We should just validate
+      // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
+      // call for handling full buffer/flush buffer condition.
+      if (writtenDataLength % streamBufferFlushSize == 0) {
         // reset the position to zero as now we will be reading the
         // next buffer in the list
-        totalDataFlushedLength += streamBufferFlushSize;
-        handlePartialFlush();
+        updateFlushLength();
+        executePutBlock();
       }
-
-      // we should not call isBufferFull here. The buffer might already be full
-      // as whole data is already cached in the buffer. We should just validate
-      // if we wrote data of size streamBufferMaxSize to call for handling
-      // full buffer condition.
       if (writtenDataLength == streamBufferMaxSize) {
         handleFullBuffer();
       }
@@ -289,25 +339,22 @@ public class BlockOutputStream extends OutputStream {
     Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
     for (long index : indexes) {
       Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
-      long length = commitIndex2flushedDataMap.remove(index);
-
-      // totalAckDataLength replicated yet should always be less than equal to
-      // the current length being returned from commitIndex2flushedDataMap.
-      // The below precondition would ensure commitIndex2flushedDataMap entries
-      // are removed in order of the insertion to the map.
-      Preconditions.checkArgument(totalAckDataLength < length);
-      totalAckDataLength = length;
+      List<ByteBuffer> buffers = commitIndex2flushedDataMap.remove(index);
+      long length = buffers.stream().mapToLong(value -> {
+        int pos = value.position();
+        Preconditions.checkArgument(pos <= chunkSize);
+        return pos;
+      }).sum();
+      // totalAckDataLength replicated yet should always be incremented
+      // with the current length being returned from commitIndex2flushedDataMap.
+      totalAckDataLength += length;
       LOG.debug("Total data successfully replicated: " + totalAckDataLength);
       futureMap.remove(totalAckDataLength);
       // Flush has been committed to required servers successful.
-      // just release the current buffer from the buffer pool.
-
-      // every entry removed from the putBlock future Map signifies
-      // streamBufferFlushSize/chunkSize no of chunks successfully committed.
-      // Release the buffers from the buffer pool to be reused again.
-      int chunkCount = (int) (streamBufferFlushSize / chunkSize);
-      for (int i = 0; i < chunkCount; i++) {
-        bufferPool.releaseBuffer();
+      // just release the current buffer from the buffer pool corresponding
+      // to the buffers that have been committed with the putBlock call.
+      for (ByteBuffer byteBuffer : buffers) {
+        bufferPool.releaseBuffer(byteBuffer);
       }
     }
   }
@@ -325,9 +372,10 @@ public class BlockOutputStream extends OutputStream {
         waitOnFlushFutures();
       }
     } catch (InterruptedException | ExecutionException e) {
-      adjustBuffersOnException();
-      throw new IOException(
+      ioException = new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
+      adjustBuffersOnException();
+      throw ioException;
     }
     if (!commitIndex2flushedDataMap.isEmpty()) {
       watchForCommit(
@@ -389,10 +437,14 @@ public class BlockOutputStream extends OutputStream {
   }
 
   private CompletableFuture<ContainerProtos.
-      ContainerCommandResponseProto> handlePartialFlush()
+      ContainerCommandResponseProto> executePutBlock()
       throws IOException {
     checkOpen();
     long flushPos = totalDataFlushedLength;
+    Preconditions.checkNotNull(bufferList);
+    List<ByteBuffer> byteBufferList = bufferList;
+    bufferList = null;
+    Preconditions.checkNotNull(byteBufferList);
     String requestId =
         traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID;
     CompletableFuture<ContainerProtos.
@@ -410,17 +462,22 @@ public class BlockOutputStream extends OutputStream {
         }
         // if the ioException is not set, putBlock is successful
         if (ioException == null) {
-          LOG.debug(
-              "Adding index " + asyncReply.getLogIndex() + " commitMap size "
-                  + commitIndex2flushedDataMap.size());
           BlockID responseBlockID = BlockID.getFromProtobuf(
               e.getPutBlock().getCommittedBlockLength().getBlockID());
           Preconditions.checkState(blockID.getContainerBlockID()
               .equals(responseBlockID.getContainerBlockID()));
           // updates the bcsId of the block
           blockID = responseBlockID;
+          LOG.debug(
+              "Adding index " + asyncReply.getLogIndex() + " commitMap size "
+                  + commitIndex2flushedDataMap.size() + " flushLength "
+                  + flushPos + " numBuffers " + byteBufferList.size()
+                  + " blockID " + blockID + " bufferPool size" + bufferPool
+                  .getSize() + " currentBufferIndex " + bufferPool
+                  .getCurrentBufferIndex());
           // for standalone protocol, logIndex will always be 0.
-          commitIndex2flushedDataMap.put(asyncReply.getLogIndex(), flushPos);
+          commitIndex2flushedDataMap
+              .put(asyncReply.getLogIndex(), byteBufferList);
         }
         return e;
       }, responseExecutor).exceptionally(e -> {
@@ -446,9 +503,12 @@ public class BlockOutputStream extends OutputStream {
       try {
         handleFlush();
       } catch (InterruptedException | ExecutionException e) {
-        adjustBuffersOnException();
-        throw new IOException(
+        // just set the exception here as well in order to maintain sanctity of
+        // ioException field
+        ioException = new IOException(
             "Unexpected Storage Container Exception: " + e.toString(), e);
+        adjustBuffersOnException();
+        throw ioException;
       }
     }
   }
@@ -456,6 +516,15 @@ public class BlockOutputStream extends OutputStream {
 
   private void writeChunk(ByteBuffer buffer)
       throws IOException {
+    // This data in the buffer will be pushed to datanode and a reference will
+    // be added to the bufferList. Once putBlock gets executed, this list will
+    // be marked null. Hence, during first writeChunk call after every putBlock
+    // call or during the first call to writeChunk here, the list will be null.
+
+    if (bufferList == null) {
+      bufferList = new ArrayList<>();
+    }
+    bufferList.add(buffer);
     // Please note : We are not flipping the slice when we write since
     // the slices are pointing the currentBuffer start and end as needed for
     // the chunk write. Also please note, Duplicate does not create a
@@ -472,20 +541,36 @@ public class BlockOutputStream extends OutputStream {
     checkOpen();
     // flush the last chunk data residing on the currentBuffer
     if (totalDataFlushedLength < writtenDataLength) {
-      ByteBuffer currentBuffer = bufferPool.getBuffer();
-      int pos = currentBuffer.position();
-      writeChunk(currentBuffer);
-      totalDataFlushedLength += pos;
-      handlePartialFlush();
+      ByteBuffer currentBuffer = bufferPool.getCurrentBuffer();
+      Preconditions.checkArgument(currentBuffer.position() > 0);
+      if (currentBuffer.position() != chunkSize) {
+        writeChunk(currentBuffer);
+      }
+      // This can be a partially filled chunk. Since we are flushing the buffer
+      // here, we just limit this buffer to the current position. So that next
+      // write will happen in new buffer
+      updateFlushLength();
+      executePutBlock();
     }
     waitOnFlushFutures();
+    if (!commitIndex2flushedDataMap.isEmpty()) {
+      // wait for the last commit index in the commitIndex2flushedDataMap
+      // to get committed to all or majority of nodes in case timeout
+      // happens.
+      long lastIndex =
+          commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
+              .max().getAsLong();
+      LOG.debug(
+          "waiting for last flush Index " + lastIndex + " to catch up");
+      watchForCommit(lastIndex);
+    }
+
     // just check again if the exception is hit while waiting for the
     // futures to ensure flush has indeed succeeded
 
     // irrespective of whether the commitIndex2flushedDataMap is empty
     // or not, ensure there is no exception set
     checkOpen();
-
   }
 
   @Override
@@ -494,21 +579,11 @@ public class BlockOutputStream extends OutputStream {
         && bufferPool != null && bufferPool.getSize() > 0) {
       try {
         handleFlush();
-        if (!commitIndex2flushedDataMap.isEmpty()) {
-          // wait for the last commit index in the commitIndex2flushedDataMap
-          // to get committed to all or majority of nodes in case timeout
-          // happens.
-          long lastIndex =
-              commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
-                  .max().getAsLong();
-          LOG.debug(
-              "waiting for last flush Index " + lastIndex + " to catch up");
-          watchForCommit(lastIndex);
-        }
       } catch (InterruptedException | ExecutionException e) {
-        adjustBuffersOnException();
-        throw new IOException(
+        ioException = new IOException(
             "Unexpected Storage Container Exception: " + e.toString(), e);
+        adjustBuffersOnException();
+        throw ioException;
       } finally {
         cleanup(false);
       }
@@ -564,6 +639,10 @@ public class BlockOutputStream extends OutputStream {
       futureMap.clear();
     }
     futureMap = null;
+    if (bufferList !=  null) {
+      bufferList.clear();
+    }
+    bufferList = null;
     if (commitIndex2flushedDataMap != null) {
       commitIndex2flushedDataMap.clear();
     }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
index 541e6bd..17788c7 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -41,7 +41,7 @@ public class BufferPool {
     currentBufferIndex = -1;
   }
 
-  public ByteBuffer getBuffer() {
+  public ByteBuffer getCurrentBuffer() {
     return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex);
   }
 
@@ -56,7 +56,7 @@ public class BufferPool {
    *
    */
   public ByteBuffer allocateBufferIfNeeded() {
-    ByteBuffer buffer = getBuffer();
+    ByteBuffer buffer = getCurrentBuffer();
     if (buffer != null && buffer.hasRemaining()) {
       return buffer;
     }
@@ -74,11 +74,14 @@ public class BufferPool {
     return buffer;
   }
 
-  public void releaseBuffer() {
+  public void releaseBuffer(ByteBuffer byteBuffer) {
     // always remove from head of the list and append at last
     ByteBuffer buffer = bufferList.remove(0);
+    // Ensure the buffer to be removed is always at the head of the list.
+    Preconditions.checkArgument(buffer.equals(byteBuffer));
     buffer.clear();
     bufferList.add(buffer);
+    Preconditions.checkArgument(currentBufferIndex >= 0);
     currentBufferIndex--;
   }
 
@@ -90,6 +93,7 @@ public class BufferPool {
   public void checkBufferPoolEmpty() {
     Preconditions.checkArgument(computeBufferData() == 0);
   }
+
   public long computeBufferData() {
     return bufferList.stream().mapToInt(value -> value.position())
         .sum();
@@ -99,8 +103,12 @@ public class BufferPool {
     return bufferList.size();
   }
 
-  ByteBuffer getBuffer(int index) {
+  public ByteBuffer getBuffer(int index) {
     return bufferList.get(index);
   }
 
+  int getCurrentBufferIndex() {
+    return currentBufferIndex;
+  }
+
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
index a49f8ae..07aa536 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java
@@ -74,8 +74,8 @@ public class BlockID {
 
   @Override
   public String toString() {
-    return new StringBuffer().append(getContainerBlockID().toString())
-        .append(" bcId: ")
+    return new StringBuilder().append(getContainerBlockID().toString())
+        .append(" bcsId: ")
         .append(blockCommitSequenceId)
         .toString();
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index b6de8ab..fb700da 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
 import java.io.IOException;
 import java.io.OutputStream;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -148,11 +149,11 @@ public final class BlockOutputStreamEntry extends OutputStream {
     }
   }
 
-  long getTotalSuccessfulFlushedData() throws IOException {
+  long getTotalAckDataLength() {
     if (outputStream != null) {
       BlockOutputStream out = (BlockOutputStream) this.outputStream;
       blockID = out.getBlockID();
-      return out.getTotalSuccessfulFlushedData();
+      return out.getTotalAckDataLength();
     } else {
       // For a pre allocated block for which no write has been initiated,
       // the OutputStream will be null here.
@@ -295,6 +296,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
     }
   }
 
+  @VisibleForTesting
   public OutputStream getOutputStream() {
     return outputStream;
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 3bd572d..78f03d2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -64,6 +64,13 @@ import java.util.concurrent.TimeoutException;
  */
 public class KeyOutputStream extends OutputStream {
 
+  /**
+   * Defines stream action while calling handleFlushOrClose.
+   */
+  enum StreamAction {
+    FLUSH, CLOSE, FULL
+  }
+
   public static final Logger LOG =
       LoggerFactory.getLogger(KeyOutputStream.class);
 
@@ -326,8 +333,7 @@ public class KeyOutputStream extends OutputStream {
       }
       if (current.getRemaining() <= 0) {
         // since the current block is already written close the stream.
-        handleFlushOrClose(true);
-        currentStreamIndex += 1;
+        handleFlushOrClose(StreamAction.FULL);
       }
       len -= writeLen;
       off += writeLen;
@@ -393,19 +399,21 @@ public class KeyOutputStream extends OutputStream {
     boolean retryFailure = checkForRetryFailure(exception);
     boolean closedContainerException = false;
     if (!retryFailure) {
-      closedContainerException = checkIfContainerIsClosed(exception);
+      closedContainerException = checkIfContainerIsClosed(t);
     }
     PipelineID pipelineId = null;
     long totalSuccessfulFlushedData =
-        streamEntry.getTotalSuccessfulFlushedData();
+        streamEntry.getTotalAckDataLength();
     //set the correct length for the current stream
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     long bufferedDataLen = computeBufferData();
-    LOG.warn("Encountered exception {}", exception);
-    LOG.info(
-        "The last committed block length is {}, uncommitted data length is {}",
+    LOG.warn("Encountered exception {}. The last committed block length is {}, "
+            + "uncommitted data length is {}", exception,
         totalSuccessfulFlushedData, bufferedDataLen);
     Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
+    Preconditions.checkArgument(
+        streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData
+            == bufferedDataLen);
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
@@ -498,7 +506,7 @@ public class KeyOutputStream extends OutputStream {
     return t instanceof ContainerNotOpenException;
   }
 
-  private Throwable checkForException(IOException ioe) throws IOException {
+  public Throwable checkForException(IOException ioe) throws IOException {
     Throwable t = ioe.getCause();
     while (t != null) {
       for (Class<? extends Exception> cls : OzoneClientUtils
@@ -513,7 +521,7 @@ public class KeyOutputStream extends OutputStream {
   }
 
   private long getKeyLength() {
-    return streamEntries.parallelStream().mapToLong(e -> e.getCurrentPosition())
+    return streamEntries.stream().mapToLong(e -> e.getCurrentPosition())
         .sum();
   }
 
@@ -535,16 +543,24 @@ public class KeyOutputStream extends OutputStream {
   @Override
   public void flush() throws IOException {
     checkNotClosed();
-    handleFlushOrClose(false);
+    handleFlushOrClose(StreamAction.FLUSH);
   }
 
   /**
-   * Close or Flush the latest outputStream.
-   * @param close Flag which decides whether to call close or flush on the
+   * Close or Flush the latest outputStream depending upon the action.
+   * This function gets called when while write is going on, the current stream
+   * gets full or explicit flush or close request is made by client. when the
+   * stream gets full and we try to close the stream , we might end up hitting
+   * an exception in the exception handling path, we write the data residing in
+   * in the buffer pool to a new Block. In cases, as such, when the data gets
+   * written to new stream , it will be at max half full. In such cases, we
+   * should just write the data and not close the stream as the block won't be
+   * completely full.
+   * @param op Flag which decides whether to call close or flush on the
    *              outputStream.
    * @throws IOException In case, flush or close fails with exception.
    */
-  private void handleFlushOrClose(boolean close) throws IOException {
+  private void handleFlushOrClose(StreamAction op) throws IOException {
     if (streamEntries.size() == 0) {
       return;
     }
@@ -561,10 +577,21 @@ public class KeyOutputStream extends OutputStream {
           if (failedServers != null && !failedServers.isEmpty()) {
             excludeList.addDatanodes(failedServers);
           }
-          if (close) {
+          switch (op) {
+          case CLOSE:
             entry.close();
-          } else {
+            break;
+          case FULL:
+            if (entry.getRemaining() == 0) {
+              entry.close();
+              currentStreamIndex++;
+            }
+            break;
+          case FLUSH:
             entry.flush();
+            break;
+          default:
+            throw new IOException("Invalid Operation");
           }
         } catch (IOException ioe) {
           handleException(entry, streamIndex, ioe);
@@ -587,7 +614,7 @@ public class KeyOutputStream extends OutputStream {
     }
     closed = true;
     try {
-      handleFlushOrClose(true);
+      handleFlushOrClose(StreamAction.CLOSE);
       if (keyArgs != null) {
         // in test, this could be null
         removeEmptyBlocks();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 7b65c46..e94f0ac 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -249,6 +250,7 @@ public interface MiniOzoneCluster {
     protected Optional<Long> streamBufferFlushSize = Optional.empty();
     protected Optional<Long> streamBufferMaxSize = Optional.empty();
     protected Optional<Long> blockSize = Optional.empty();
+    protected Optional<StorageUnit> streamBufferSizeUnit = Optional.empty();
     // Use relative smaller number of handlers for testing
     protected int numOfOmHandlers = 20;
     protected int numOfScmHandlers = 20;
@@ -434,6 +436,11 @@ public interface MiniOzoneCluster {
       return this;
     }
 
+    public Builder setStreamBufferSizeUnit(StorageUnit unit) {
+      this.streamBufferSizeUnit = Optional.of(unit);
+      return this;
+    }
+
     public Builder setOMServiceId(String serviceId) {
       this.omServiceId = serviceId;
       return this;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 521a4f1..e746f33 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -446,14 +446,18 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       if (!blockSize.isPresent()) {
         blockSize = Optional.of(2 * streamBufferMaxSize.get());
       }
+
+      if (!streamBufferSizeUnit.isPresent()) {
+        streamBufferSizeUnit = Optional.of(StorageUnit.MB);
+      }
       conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
-          chunkSize.get(), StorageUnit.MB);
+          chunkSize.get(), streamBufferSizeUnit.get());
       conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
-          streamBufferFlushSize.get(), StorageUnit.MB);
+          streamBufferFlushSize.get(), streamBufferSizeUnit.get());
       conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
-          streamBufferMaxSize.get(), StorageUnit.MB);
+          streamBufferMaxSize.get(), streamBufferSizeUnit.get());
       conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
-          StorageUnit.MB);
+          streamBufferSizeUnit.get());
       configureTrace();
     }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
new file mode 100644
index 0000000..32bef12
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests BlockOutputStream class.
+ */
+public class TestBlockOutputStream {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf = new OzoneConfiguration();
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static int flushSize;
+  private static int maxFlushSize;
+  private static int blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    chunkSize = 100;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+    conf.setQuietMode(false);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+        StorageUnit.MB);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(7)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "testblockoutputstream";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  private String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBufferCaching() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = 50;
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data less than a chunk size, the data will just sit
+    // in the buffer, with only one buffer being allocated in the buffer pool
+
+    Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
+    //Just the writtenDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    // no data will be flushed till now
+    Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(pendingWriteChunkCount,
+        XceiverClientManager.getXceiverClientMetrics()
+            .getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        XceiverClientManager.getXceiverClientMetrics()
+            .getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // commitIndex2FlushedData Map will be empty here
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    // flush is a sync call, all pending operations will complete
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    // we have just written data less than a chunk size, the data will just sit
+    // in the buffer, with only one buffer being allocated in the buffer pool
+
+    Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(0,
+        blockOutputStream.getBufferPool().getBuffer(0).position());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0,
+        blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+    // flush ensures watchForCommit updates the total length acknowledged
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 2,
+        metrics.getTotalOpCount());
+
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    validateData(keyName, data1);
+  }
+
+  @Test
+  public void testFlushChunk() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = flushSize;
+    // write data equal to 2 chunks
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    Assert.assertEquals(pendingWriteChunkCount + 2,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount + 1,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data equal flush Size = 2 chunks, at this time
+    // buffer pool will have 2 buffers allocated worth of chunk size
+
+    Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+    Assert.assertEquals(0,
+        blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+    // flush is a sync call, all pending operations will complete
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0,
+        blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+    // flush ensures watchForCommit updates the total length acknowledged
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 3,
+        metrics.getTotalOpCount());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    validateData(keyName, data1);
+  }
+
+  @Test
+  public void testMultiChunkWrite() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = chunkSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    Assert.assertEquals(pendingWriteChunkCount + 1,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data equal flush Size > 1 chunk, at this time
+    // buffer pool will have 2 buffers allocated worth of chunk size
+
+    Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    // since data written is still less than flushLength, flushLength will
+    // still be 0.
+    Assert.assertEquals(0,
+        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+    Assert.assertEquals(0,
+        blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+    Assert.assertEquals(writeChunkCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0,
+        blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+    // flush ensures watchForCommit updates the total length acknowledged
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 3,
+        metrics.getTotalOpCount());
+    validateData(keyName, data1);
+  }
+
+  @Test
+  public void testMultiChunkWrite2() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = flushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    Assert.assertEquals(pendingWriteChunkCount + 2,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount + 1,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data more than flush Size(2 chunks), at this time
+    // buffer pool will have 3 buffers allocated worth of chunk size
+
+    Assert.assertEquals(3, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(flushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+    Assert.assertEquals(0,
+        blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+    Assert.assertEquals(flushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0,
+        blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+    Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+    key.close();
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 5,
+        metrics.getTotalOpCount());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    validateData(keyName, data1);
+  }
+
+  @Test
+  public void testFullBufferCondition() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = maxFlushSize;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes atleast putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // ack'd by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast one entry from the map where each
+    // entry corresponds to flushSize worth of data
+
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    validateData(keyName, data1);
+  }
+
+  @Test
+  public void testWriteWithExceedingMaxBufferLimit() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes atleast putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // ack'd by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast one entry from the map where each
+    // entry corresponds to flushSize worth of data
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    // flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    validateData(keyName, data1);
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+  private void validateData(String keyName, byte[] data) throws Exception {
+    ContainerTestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
new file mode 100644
index 0000000..671f16c
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.ratis.protocol.AlreadyClosedException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests failure detection and handling in BlockOutputStream Class.
+ */
+public class TestBlockOutputStreamWithFailures {
+
+  private static MiniOzoneCluster cluster;
+  private OzoneConfiguration conf = new OzoneConfiguration();
+  private OzoneClient client;
+  private ObjectStore objectStore;
+  private int chunkSize;
+  private int flushSize;
+  private int maxFlushSize;
+  private int blockSize;
+  private String volumeName;
+  private String bucketName;
+  private String keyString;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    chunkSize = 100;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+    conf.setQuietMode(false);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+        StorageUnit.MB);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(7)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "testblockoutputstream";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  private String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testWatchForCommitWithCloseContainerException() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes atleast putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data more than flush Size(2 chunks), at this time
+    // buffer pool will have 4 buffers allocated worth of chunk size
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // ack'd by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast one entry from the map where each
+    // entry corresponds to flushSize worth of data
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
+
+    // flush is a sync call, all pending operations will complete
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    // flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    // Close the containers on the Datanode and write more data
+    ContainerTestHelper.waitForContainerClose(key, cluster);
+    // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
+    // once exception is hit
+    key.write(data1);
+
+    // As a part of handling the exception, 4 failed writeChunks  will be
+    // rewritten plus one partial chunk plus two putBlocks for flushSize
+    // and one flush for partial chunk
+    key.flush();
+
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+        .getIoException()) instanceof ContainerNotOpenException);
+
+    // commitInfoMap will remain intact as there is no server failure
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 14,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 8,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    validateData(keyName, dataString.concat(dataString).getBytes());
+  }
+
+  @Test
+  public void testWatchForCommitDatanodeFailure() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes at least putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data more than flush Size(2 chunks), at this time
+    // buffer pool will have 3 buffers allocated worth of chunk size
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    // since data written is still less than flushLength, flushLength will
+    // still be 0.
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // ack'd by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast flushSize worth of data buffer
+    // where each entry corresponds to flushSize worth of data
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    //  flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    Pipeline pipeline = raftClient.getPipeline();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+    // again write data with more than max buffer limit. This will call
+    // watchForCommit again. Since the commit will happen 2 way, the
+    // commitInfoMap will get updated for servers which are alive
+    key.write(data1);
+
+    key.flush();
+    Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
+
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    Assert
+        .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // in total, there are 8 full write chunks + 2 partial chunks written
+    Assert.assertEquals(writeChunkCount + 10,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    // 4 flushes at flushSize boundaries + 2 flush for partial chunks
+    Assert.assertEquals(putBlockCount + 6,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 16,
+        metrics.getTotalOpCount());
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    validateData(keyName, dataString.concat(dataString).getBytes());
+  }
+
+  @Test
+  public void test2DatanodesFailure() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes atleast putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data more than flush Size(2 chunks), at this time
+    // buffer pool will have 3 buffers allocated worth of chunk size
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // acked by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast one entry from the map where each
+    // entry corresponds to flushSize worth of data
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    // flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    Pipeline pipeline = raftClient.getPipeline();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+    // again write data with more than max buffer limit. This will call
+    // watchForCommit again. Since the commit will happen 2 way, the
+    // commitInfoMap will get updated for servers which are alive
+
+    // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
+    // once exception is hit
+    key.write(data1);
+
+    // As a part of handling the exception, 4 failed writeChunks  will be
+    // rewritten plus one partial chunk plus two putBlocks for flushSize
+    // and one flush for partial chunk
+    key.flush();
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+        .getIoException()) instanceof AlreadyClosedException);
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 14,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 8,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    validateData(keyName, data1);
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+  private void validateData(String keyName, byte[] data) throws Exception {
+    ContainerTestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 3124e77..8be1ccc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -338,17 +338,8 @@ public class TestCloseContainerHandlingByClient {
 
   private void waitForContainerClose(OzoneOutputStream outputStream)
       throws Exception {
-    KeyOutputStream keyOutputStream =
-        (KeyOutputStream) outputStream.getOutputStream();
-    List<OmKeyLocationInfo> locationInfoList =
-        keyOutputStream.getLocationInfoList();
-    List<Long> containerIdList = new ArrayList<>();
-    for (OmKeyLocationInfo info : locationInfoList) {
-      containerIdList.add(info.getContainerID());
-    }
-    Assert.assertTrue(!containerIdList.isEmpty());
     ContainerTestHelper
-        .waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+        .waitForContainerClose(outputStream, cluster);
   }
 
   @Ignore // test needs to be fixed after close container is handled for
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 0b97a00..f08a07f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.common.Checksum;
@@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.base.Preconditions;
@@ -719,6 +721,21 @@ public final class ContainerTestHelper {
     return String.format("%1$" + length + "s", string);
   }
 
+  public static void waitForContainerClose(OzoneOutputStream outputStream,
+      MiniOzoneCluster cluster) throws Exception {
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) outputStream.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        keyOutputStream.getLocationInfoList();
+    List<Long> containerIdList = new ArrayList<>();
+    for (OmKeyLocationInfo info : locationInfoList) {
+      containerIdList.add(info.getContainerID());
+    }
+    Assert.assertTrue(!containerIdList.isEmpty());
+    ContainerTestHelper
+        .waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+  }
+
   public static void waitForContainerClose(MiniOzoneCluster cluster,
       Long... containerIdList)
       throws ContainerNotFoundException, PipelineNotFoundException,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index bb4e676..14db90d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -134,7 +134,7 @@ public class TestBlockDeletion {
     String keyName = UUID.randomUUID().toString();
 
     OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
-        ReplicationType.STAND_ALONE, ReplicationFactor.ONE, new HashMap<>());
+        ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>());
     for (int i = 0; i < 100; i++) {
       out.write(value.getBytes());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org