You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/04/01 10:50:33 UTC

[hadoop] branch ozone-0.4 updated: HDDS-1312. Add more unit tests to verify BlockOutputStream functionalities. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/ozone-0.4 by this push:
     new bb20c80  HDDS-1312. Add more unit tests to verify BlockOutputStream functionalities. Contributed by Shashikant Banerjee.
bb20c80 is described below

commit bb20c80955d1fd69a522653d7edbdf86f5eb3e7d
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Apr 1 16:09:58 2019 +0530

    HDDS-1312. Add more unit tests to verify BlockOutputStream functionalities. Contributed by Shashikant Banerjee.
    
    (cherry picked from commit ef5de292436db56547f05f42a745be32e2b38546)
---
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  72 ++-
 .../rpc/TestBlockOutputStreamWithFailures.java     | 696 ++++++++++++++++++++-
 .../ozone/container/ContainerTestHelper.java       |   9 +
 3 files changed, 750 insertions(+), 27 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 78f03d2..c04105c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -96,6 +96,7 @@ public class KeyOutputStream extends OutputStream {
   private ExcludeList excludeList;
   private final RetryPolicy retryPolicy;
   private int retryCount;
+  private long offset;
   /**
    * A constructor for testing purpose only.
    */
@@ -121,6 +122,7 @@ public class KeyOutputStream extends OutputStream {
         .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
     this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
     retryCount = 0;
+    offset = 0;
   }
 
   @VisibleForTesting
@@ -149,6 +151,10 @@ public class KeyOutputStream extends OutputStream {
     return locationInfoList;
   }
 
+  @VisibleForTesting
+  public int getRetryCount() {
+    return retryCount;
+  }
 
   @SuppressWarnings("parameternumber")
   public KeyOutputStream(OpenKeySession handler,
@@ -316,6 +322,7 @@ public class KeyOutputStream extends OutputStream {
           current.writeOnRetry(len);
         } else {
           current.write(b, off, writeLen);
+          offset += writeLen;
         }
       } catch (IOException ioe) {
         // for the current iteration, totalDataWritten - currentPos gives the
@@ -326,8 +333,12 @@ public class KeyOutputStream extends OutputStream {
         // The len specified here is the combined sum of the data length of
         // the buffers
         Preconditions.checkState(!retry || len <= streamBufferMaxSize);
-        writeLen = retry ? (int) len :
-            (int) (current.getWrittenDataLength() - currentPos);
+        int dataWritten  = (int) (current.getWrittenDataLength() - currentPos);
+        writeLen = retry ? (int) len : dataWritten;
+        // In retry path, the data written is already accounted in offset.
+        if (!retry) {
+          offset += writeLen;
+        }
         LOG.debug("writeLen {}, total len {}", writeLen, len);
         handleException(current, currentStreamIndex, ioe);
       }
@@ -345,20 +356,24 @@ public class KeyOutputStream extends OutputStream {
    * from the streamEntries list for the container which is closed.
    * @param containerID id of the closed container
    * @param pipelineId id of the associated pipeline
+   * @param streamIndex index of the stream
    */
   private void discardPreallocatedBlocks(long containerID,
-      PipelineID pipelineId) {
-    // currentStreamIndex < streamEntries.size() signifies that, there are still
+      PipelineID pipelineId, int streamIndex) {
+    // streamIndex < streamEntries.size() signifies that, there are still
     // pre allocated blocks available.
-    if (currentStreamIndex < streamEntries.size()) {
+
+    // This will be called only to discard the next subsequent unused blocks
+    // in the sreamEntryList.
+    if (streamIndex < streamEntries.size()) {
       ListIterator<BlockOutputStreamEntry> streamEntryIterator =
-          streamEntries.listIterator(currentStreamIndex);
+          streamEntries.listIterator(streamIndex);
       while (streamEntryIterator.hasNext()) {
         BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
+        Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
         if (((pipelineId != null && streamEntry.getPipeline().getId()
             .equals(pipelineId)) || (containerID != -1
-            && streamEntry.getBlockID().getContainerID() == containerID))
-            && streamEntry.getCurrentPosition() == 0) {
+            && streamEntry.getBlockID().getContainerID() == containerID))) {
           streamEntryIterator.remove();
         }
       }
@@ -396,7 +411,7 @@ public class KeyOutputStream extends OutputStream {
   private void handleException(BlockOutputStreamEntry streamEntry,
       int streamIndex, IOException exception) throws IOException {
     Throwable t = checkForException(exception);
-    boolean retryFailure = checkForRetryFailure(exception);
+    boolean retryFailure = checkForRetryFailure(t);
     boolean closedContainerException = false;
     if (!retryFailure) {
       closedContainerException = checkIfContainerIsClosed(t);
@@ -411,16 +426,14 @@ public class KeyOutputStream extends OutputStream {
             + "uncommitted data length is {}", exception,
         totalSuccessfulFlushedData, bufferedDataLen);
     Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
-    Preconditions.checkArgument(
-        streamEntry.getWrittenDataLength() - totalSuccessfulFlushedData
-            == bufferedDataLen);
+    Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen);
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
     if (!failedServers.isEmpty()) {
       excludeList.addDatanodes(failedServers);
     }
-    if (checkIfContainerIsClosed(t)) {
+    if (closedContainerException) {
       excludeList.addConatinerId(ContainerID.valueof(containerId));
     } else if (retryFailure || t instanceof TimeoutException) {
       pipelineId = streamEntry.getPipeline().getId();
@@ -428,22 +441,15 @@ public class KeyOutputStream extends OutputStream {
     }
     // just clean up the current stream.
     streamEntry.cleanup(retryFailure);
-    if (bufferedDataLen > 0) {
-      // If the data is still cached in the underlying stream, we need to
-      // allocate new block and write this data in the datanode.
-      currentStreamIndex += 1;
-      handleRetry(exception, bufferedDataLen);
-    }
-    if (totalSuccessfulFlushedData == 0) {
-      streamEntries.remove(streamIndex);
-      currentStreamIndex -= 1;
-    }
 
+    // discard all sunsequent blocks the containers and pipelines which
+    // are in the exclude list so that, the very next retry should never
+    // write data on the  closed container/pipeline
     if (closedContainerException) {
       // discard subsequent pre allocated blocks from the streamEntries list
       // from the closed container
       discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
-          null);
+          null, streamIndex + 1);
     } else {
       // In case there is timeoutException or Watch for commit happening over
       // majority or the client connection failure to the leader in the
@@ -451,7 +457,19 @@ public class KeyOutputStream extends OutputStream {
       // Next block allocation will happen with excluding this specific pipeline
       // This will ensure if 2 way commit happens , it cannot span over multiple
       // blocks
-      discardPreallocatedBlocks(-1, pipelineId);
+      discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1);
+    }
+    if (bufferedDataLen > 0) {
+      // If the data is still cached in the underlying stream, we need to
+      // allocate new block and write this data in the datanode.
+      currentStreamIndex += 1;
+      handleRetry(exception, bufferedDataLen);
+      // reset the retryCount after handling the exception
+      retryCount = 0;
+    }
+    if (totalSuccessfulFlushedData == 0) {
+      streamEntries.remove(streamIndex);
+      currentStreamIndex -= 1;
     }
   }
 
@@ -618,7 +636,9 @@ public class KeyOutputStream extends OutputStream {
       if (keyArgs != null) {
         // in test, this could be null
         removeEmptyBlocks();
-        keyArgs.setDataSize(getKeyLength());
+        long length = getKeyLength();
+        Preconditions.checkArgument(offset == length);
+        keyArgs.setDataSize(length);
         keyArgs.setLocationInfoList(getLocationInfoList());
         // When the key is multipart upload part file upload, we should not
         // commit the key, as this is not an actual key, this is a just a
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 671f16c..54cdff0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.client.rpc;
 
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -235,6 +236,8 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
 
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // commitInfoMap will remain intact as there is no server failure
     Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
     // now close the stream, It will update the ack length after watchForCommit
@@ -372,6 +375,8 @@ public class TestBlockOutputStreamWithFailures {
     key.close();
     Assert
         .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@@ -506,6 +511,8 @@ public class TestBlockOutputStreamWithFailures {
     key.flush();
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof AlreadyClosedException);
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     Assert
@@ -534,11 +541,698 @@ public class TestBlockOutputStreamWithFailures {
     validateData(keyName, data1);
   }
 
+  @Test
+  public void testFailureWithPrimeSizedData() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = 167;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            == pendingWriteChunkCount + 1);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            == pendingPutBlockCount);
+    Assert.assertEquals(writeChunkCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 1,
+        metrics.getTotalOpCount());
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+
+    Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(0,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
+
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 3,
+        metrics.getTotalOpCount());
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    // flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    // Close the containers on the Datanode and write more data
+    ContainerTestHelper.waitForContainerClose(key, cluster);
+    key.write(data1);
+
+    // As a part of handling the exception, 2 failed writeChunks  will be
+    // rewritten plus 1 putBlocks for flush
+    // and one flush for partial chunk
+    key.flush();
+
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+        .getIoException()) instanceof ContainerNotOpenException);
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+
+    // commitInfoMap will remain intact as there is no server failure
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 6,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 9,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    validateData(keyName, dataString.concat(dataString).getBytes());
+  }
+
+  @Test
+  public void testExceptionDuringClose() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = 167;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            == pendingWriteChunkCount + 1);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            == pendingPutBlockCount);
+    Assert.assertEquals(writeChunkCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 1,
+        metrics.getTotalOpCount());
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+
+    Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(0,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
+
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 1,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 3,
+        metrics.getTotalOpCount());
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    // flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    // Close the containers on the Datanode and write more data
+    ContainerTestHelper.waitForContainerClose(key, cluster);
+    key.write(data1);
+
+    // commitInfoMap will remain intact as there is no server failure
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    // now close the stream, It will hit exception
+    key.close();
+
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+        .getIoException()) instanceof ContainerNotOpenException);
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 6,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 9,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    validateData(keyName, dataString.concat(dataString).getBytes());
+  }
+
+  @Test
+  public void testWatchForCommitWithSingleNodeRatis() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes atleast putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data more than flush Size(2 chunks), at this time
+    // buffer pool will have 4 buffers allocated worth of chunk size
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // ack'd by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast one entry from the map where each
+    // entry corresponds to flushSize worth of data
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
+
+    // flush is a sync call, all pending operations will complete
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    // flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+    // Close the containers on the Datanode and write more data
+    ContainerTestHelper.waitForContainerClose(key, cluster);
+    // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
+    // once exception is hit
+    key.write(data1);
+
+    // As a part of handling the exception, 4 failed writeChunks  will be
+    // rewritten plus one partial chunk plus two putBlocks for flushSize
+    // and one flush for partial chunk
+    key.flush();
+
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+        .getIoException()) instanceof ContainerNotOpenException);
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+    // commitInfoMap will remain intact as there is no server failure
+    Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 14,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 8,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    validateData(keyName, dataString.concat(dataString).getBytes());
+  }
+
+  @Test
+  public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes at least putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data more than flush Size(2 chunks), at this time
+    // buffer pool will have 3 buffers allocated worth of chunk size
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // ack'd by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast flushSize worth of data buffer
+    // where each entry corresponds to flushSize worth of data
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    //  flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+    Pipeline pipeline = raftClient.getPipeline();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+    // again write data with more than max buffer limit. This will call
+    // watchForCommit again. No write will happen in the current block and
+    // data will be rewritten to the next block.
+
+    key.write(data1);
+
+    key.flush();
+
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+        .getIoException()) instanceof AlreadyClosedException);
+    Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // in total, there are 14 full write chunks, 5 before the failure injection,
+    // 4 chunks after which we detect the failure and then 5 again on the next
+    // block
+    Assert.assertEquals(writeChunkCount + 14,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    // 3 flushes at flushSize boundaries before failure injection + 2
+    // flush failed + 3 more flushes for the next block
+    Assert.assertEquals(putBlockCount + 8,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
+    validateData(keyName, dataString.concat(dataString).getBytes());
+  }
+
+  @Test
+  public void testDatanodeFailureWithPreAllocation() throws Exception {
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, 3 * blockSize,
+            ReplicationFactor.ONE);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes at least putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data more than flush Size(2 chunks), at this time
+    // buffer pool will have 3 buffers allocated worth of chunk size
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // ack'd by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast flushSize worth of data buffer
+    // where each entry corresponds to flushSize worth of data
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    //  flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+    Pipeline pipeline = raftClient.getPipeline();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+    // again write data with more than max buffer limit. This will call
+    // watchForCommit again. No write will happen and
+
+    key.write(data1);
+
+    key.flush();
+
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+        .getIoException()) instanceof AlreadyClosedException);
+
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+    Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+
+    // now close the stream, It will update the ack length after watchForCommit
+    key.close();
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+
+    // in total, there are 14 full write chunks, 5 before the failure injection,
+    // 4 chunks after which we detect the failure and then 5 again on the next
+    // block
+    Assert.assertEquals(writeChunkCount + 14,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+
+    // 3 flushes at flushSize boundaries before failure injection + 2
+    // flush failed + 3 more flushes for the next block
+    Assert.assertEquals(putBlockCount + 8,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
+    // Written the same data twice
+    String dataString = new String(data1, UTF_8);
+    cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
+    validateData(keyName, dataString.concat(dataString).getBytes());
+  }
+
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
       long size) throws Exception {
+    return createKey(keyName, type, size, ReplicationFactor.THREE);
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size, ReplicationFactor factor) throws Exception {
     return ContainerTestHelper
-        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+        .createKey(keyName, type, factor, size, objectStore, volumeName,
+            bucketName);
   }
+
   private void validateData(String keyName, byte[] data) throws Exception {
     ContainerTestHelper
         .validateData(keyName, data, objectStore, volumeName, bucketName);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 7f2d93d..0b618a0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -700,6 +700,15 @@ public final class ContainerTestHelper {
         .createKey(keyName, size, type, factor, new HashMap<>());
   }
 
+  public static OzoneOutputStream createKey(String keyName,
+      ReplicationType type,
+      org.apache.hadoop.hdds.client.ReplicationFactor factor, long size,
+      ObjectStore objectStore, String volumeName, String bucketName)
+      throws Exception {
+    return objectStore.getVolume(volumeName).getBucket(bucketName)
+        .createKey(keyName, size, type, factor, new HashMap<>());
+  }
+
   public static void validateData(String keyName, byte[] data,
       ObjectStore objectStore, String volumeName, String bucketName)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org