You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2020/06/03 04:26:54 UTC

[hadoop-ozone] branch master updated: HDDS-3477. Disable partial chunk write during flush() call in ozone client by default. (#957)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e4f23ee  HDDS-3477. Disable partial chunk write during flush() call in ozone client by default. (#957)
e4f23ee is described below

commit e4f23ee243a12354e326a959f40aab38e48dbce0
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Jun 3 12:26:40 2020 +0800

    HDDS-3477. Disable partial chunk write during flush() call in ozone client by default. (#957)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   4 +-
 .../common/src/main/resources/ozone-default.xml    |  10 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   2 +-
 .../ozone/client/rpc/TestBlockOutputStream.java    |   2 +
 ...m.java => TestBlockOutputStreamFlushDelay.java} |  62 +++---
 .../rpc/TestBlockOutputStreamWithFailures.java     |   2 +
 ...stBlockOutputStreamWithFailuresFlushDelay.java} |  33 ++-
 .../client/rpc/TestContainerStateMachine.java      |   2 +
 ...va => TestContainerStateMachineFlushDelay.java} | 107 +++-------
 .../client/rpc/TestFailureHandlingByClient.java    |   3 +-
 .../rpc/TestFailureHandlingByClientFlushDelay.java | 235 +++++++++++++++++++++
 .../rpc/TestOzoneClientRetriesOnException.java     |   2 +
 ...stOzoneClientRetriesOnExceptionFlushDelay.java} | 148 ++++++-------
 13 files changed, 413 insertions(+), 199 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 841ffc5..281f185 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -157,8 +157,8 @@ public final class OzoneConfigKeys {
    * */
   public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY =
       "ozone.client.stream.buffer.flush.delay";
-  public static final boolean OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT =
-      false;
+  public static final boolean OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT =
+      true;
 
   // This defines the overall connection limit for the connection pool used in
   // RestClient.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 26bf2ba..93292d8 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -390,11 +390,13 @@
   </property>
   <property>
     <name>ozone.client.stream.buffer.flush.delay</name>
-    <value>false</value>
+    <value>true</value>
     <tag>OZONE, CLIENT</tag>
-    <description>If set true, when call flush() and determine whether the
-      data in the current buffer is greater than ozone.client.stream.buffer.size.
-      if greater than then send buffer to the datanode.
+    <description>
+      Default true, when call flush() and determine whether the data in the
+      current buffer is greater than ozone.client.stream.buffer.size, if
+      greater than then send buffer to the datanode. You can turn this off
+      by setting this configuration to false.
     </description>
   </property>
   <property>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 57dfb0d..3c333bd 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -202,7 +202,7 @@ public class RpcClient implements ClientProtocol {
             StorageUnit.BYTES);
     streamBufferFlushDelay = conf.getBoolean(
         OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY,
-        OzoneConfigKeys.OOZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT);
+        OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT);
     streamBufferMaxSize = (long) conf
         .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
             OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 338e286..44b47dc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -48,6 +48,7 @@ import org.junit.rules.Timeout;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
 
 /**
  * Tests BlockOutputStream class.
@@ -90,6 +91,7 @@ public class TestBlockOutputStream {
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
+    conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
         .setTotalPipelineNumLimit(10)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
similarity index 95%
copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
index 338e286..93a3ad6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
@@ -50,13 +50,13 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
- * Tests BlockOutputStream class.
+ * Tests TestBlockOutputStreamFlushDelay class.
  */
-public class TestBlockOutputStream {
+public class TestBlockOutputStreamFlushDelay {
 
   /**
-    * Set a timeout for each test.
-    */
+   * Set a timeout for each test.
+   */
   @Rule
   public Timeout timeout = new Timeout(300000);
   private static MiniOzoneCluster cluster;
@@ -140,6 +140,7 @@ public class TestBlockOutputStream {
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     int dataLength = 50;
+    int totalWriteDataLength = dataLength * 2;
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
@@ -174,7 +175,9 @@ public class TestBlockOutputStream {
     // commitIndex2FlushedData Map will be empty here
     Assert.assertTrue(
         blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
-
+    // Total write data greater than or equal one chunk
+    // size to make sure flush will sync data.
+    key.write(data1);
     // Now do a flush. This will flush the data and update the flush length and
     // the map.
     key.flush();
@@ -190,14 +193,16 @@ public class TestBlockOutputStream {
     Assert.assertEquals(1, blockOutputStream.getBufferPool().getSize());
     Assert.assertEquals(0,
         blockOutputStream.getBufferPool().getBuffer(0).position());
-    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-    Assert.assertEquals(dataLength,
+    Assert.assertEquals(totalWriteDataLength,
+        blockOutputStream.getWrittenDataLength());
+    Assert.assertEquals(totalWriteDataLength,
         blockOutputStream.getTotalDataFlushedLength());
     Assert.assertEquals(0,
         blockOutputStream.getCommitIndex2flushedDataMap().size());
 
     // flush ensures watchForCommit updates the total length acknowledged
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(totalWriteDataLength,
+        blockOutputStream.getTotalAckDataLength());
 
     Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
@@ -217,7 +222,8 @@ public class TestBlockOutputStream {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(totalWriteDataLength,
+        blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
@@ -271,31 +277,31 @@ public class TestBlockOutputStream {
     Assert.assertEquals(0,
         blockOutputStream.getCommitIndex2flushedDataMap().size());
 
-    // Now do a flush. This will flush the data and update the flush length and
-    // the map.
+    // Now do a flush.
     key.flush();
     Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
-    // flush is a sync call, all pending operations will complete
-    Assert.assertEquals(pendingWriteChunkCount, metrics
+    // The previously written data is equal to flushSize,so no action is
+    // triggered when execute flush.
+    Assert.assertEquals(pendingWriteChunkCount  + 2, metrics
         .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(pendingPutBlockCount, metrics
+    Assert.assertEquals(pendingPutBlockCount + 1, metrics
         .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
 
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
-
     Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
-    Assert
-        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
+    // No action is triggered when execute flush and BlockOutputStream will not
+    // be updated.
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
     Assert.assertEquals(dataLength,
         blockOutputStream.getTotalDataFlushedLength());
     Assert.assertEquals(0,
         blockOutputStream.getCommitIndex2flushedDataMap().size());
+    Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
 
-    // flush ensures watchForCommit updates the total length acknowledged
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
@@ -650,8 +656,7 @@ public class TestBlockOutputStream {
     Assert.assertTrue(
         blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
 
-    // Now do a flush. This will flush the data and update the flush length and
-    // the map.
+    // Now do a flush.
     key.flush();
     Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount, metrics
@@ -664,10 +669,10 @@ public class TestBlockOutputStream {
 
     Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
     Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    Assert.assertEquals(dataLength,
+    // dataLength = maxFlushSize + 50, the final data length of 50 is smaller
+    // than the chunkSize(100) and will not be sync when called flush.
+    Assert.assertEquals(dataLength - 50,
         blockOutputStream.getTotalDataFlushedLength());
-    // flush will make sure one more entry gets updated in the map
     Assert.assertTrue(
         blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
 
@@ -683,9 +688,10 @@ public class TestBlockOutputStream {
         .getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(writeChunkCount + 5,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(putBlockCount + 4,
+    // The previous flush did not trigger any action.
+    Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 9,
+    Assert.assertEquals(totalOpCount + 8,
         metrics.getTotalOpCount());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
@@ -694,7 +700,7 @@ public class TestBlockOutputStream {
   }
 
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
-      long size) throws Exception {
+                                      long size) throws Exception {
     return TestHelper
         .createKey(keyName, type, size, objectStore, volumeName, bucketName);
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 7e327bc..e377615 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -56,6 +56,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
 
 /**
  * Tests failure detection and handling in BlockOutputStream Class.
@@ -119,6 +120,7 @@ public class TestBlockOutputStreamWithFailures {
                     "watch.request.timeout",
             3, TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 15);
+    conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
         .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
         .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
similarity index 98%
copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
index 7e327bc..6a849ac 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -38,29 +39,23 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.TestHelper;
 import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.Rule;
-import org.junit.rules.Timeout;
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
- * Tests failure detection and handling in BlockOutputStream Class.
+ * Tests failure detection by set flush delay and handling in
+ * BlockOutputStream Class.
  */
-public class TestBlockOutputStreamWithFailures {
+public class TestBlockOutputStreamWithFailuresFlushDelay {
 
   /**
     * Set a timeout for each test.
@@ -156,7 +151,7 @@ public class TestBlockOutputStreamWithFailures {
       throws Exception {
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    int dataLength = maxFlushSize + 50;
+    int dataLength = maxFlushSize + chunkSize;
     // write data more than 1 chunk
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -245,7 +240,7 @@ public class TestBlockOutputStreamWithFailures {
   public void testWatchForCommitDatanodeFailure() throws Exception {
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    int dataLength = maxFlushSize + 50;
+    int dataLength = maxFlushSize + chunkSize;
     // write data more than 1 chunk
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -332,7 +327,7 @@ public class TestBlockOutputStreamWithFailures {
   public void test2DatanodesFailure() throws Exception {
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    int dataLength = maxFlushSize + 50;
+    int dataLength = maxFlushSize + chunkSize;
     // write data more than 1 chunk
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -437,7 +432,7 @@ public class TestBlockOutputStreamWithFailures {
   public void testFailureWithPrimeSizedData() throws Exception {
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    int dataLength = maxFlushSize + 69;
+    int dataLength = maxFlushSize + chunkSize;
     // write data more than 1 chunk
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -573,7 +568,7 @@ public class TestBlockOutputStreamWithFailures {
     String keyName = getKeyName();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
-    int dataLength = maxFlushSize + 50;
+    int dataLength = maxFlushSize + chunkSize;
     // write data more than 1 chunk
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -664,7 +659,7 @@ public class TestBlockOutputStreamWithFailures {
     String keyName = getKeyName();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
-    int dataLength = maxFlushSize + 50;
+    int dataLength = maxFlushSize + chunkSize;
     // write data more than 1 chunk
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
@@ -758,7 +753,7 @@ public class TestBlockOutputStreamWithFailures {
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, 3 * blockSize,
             ReplicationFactor.ONE);
-    int dataLength = maxFlushSize + 50;
+    int dataLength = maxFlushSize + chunkSize;
     // write data more than 1 chunk
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 4207006..1972dac 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -55,6 +55,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
 
 /**
  * Tests the containerStateMachine failure handling.
@@ -98,6 +99,7 @@ public class TestContainerStateMachine {
     conf.setQuietMode(false);
     OzoneManager.setTestSecureOmFlag(true);
     conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
+    conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
     //  conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
similarity index 61%
copy from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
copy to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
index 4207006..1208652 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
+import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
@@ -30,36 +31,29 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.RatisServerConfiguration;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
+import org.junit.rules.Timeout;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.Rule;
-import org.junit.rules.Timeout;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.HddsConfigKeys.*;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
- * Tests the containerStateMachine failure handling.
+ * Tests the containerStateMachine failure handling by set flush delay.
  */
-public class TestContainerStateMachine {
+public class TestContainerStateMachineFlushDelay {
 
   /**
     * Set a timeout for each test.
@@ -74,6 +68,11 @@ public class TestContainerStateMachine {
   private String volumeName;
   private String bucketName;
   private String path;
+  private static int chunkSize;
+  private static int flushSize;
+  private static int maxFlushSize;
+  private static int blockSize;
+  private static String keyString;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -82,8 +81,13 @@ public class TestContainerStateMachine {
    */
   @Before
   public void setup() throws Exception {
+    chunkSize = 100;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+    keyString = UUID.randomUUID().toString();
     path = GenericTestUtils
-        .getTempPath(TestContainerStateMachine.class.getSimpleName());
+        .getTempPath(TestContainerStateMachineFlushDelay.class.getSimpleName());
     File baseDir = new File(path);
     baseDir.mkdirs();
 
@@ -101,6 +105,11 @@ public class TestContainerStateMachine {
     //  conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
+            .setBlockSize(blockSize)
+            .setChunkSize(chunkSize)
+            .setStreamBufferFlushSize(flushSize)
+            .setStreamBufferMaxSize(maxFlushSize)
+            .setStreamBufferSizeUnit(StorageUnit.BYTES)
             .setHbInterval(200)
             .setCertificateClient(new CertificateClientTestImpl(conf))
             .build();
@@ -131,8 +140,14 @@ public class TestContainerStateMachine {
         objectStore.getVolume(volumeName).getBucket(bucketName)
             .createKey("ratis", 1024, ReplicationType.RATIS,
                 ReplicationFactor.ONE, new HashMap<>());
+    // Now ozone.client.stream.buffer.flush.delay is currently enabled
+    // by default. Here we  written data(length 110) greater than chunk
+    // Size(length 100), make sure flush will sync data.
+    byte[] data =
+        ContainerTestHelper.getFixedLengthString(keyString, 110)
+        .getBytes(UTF_8);
     // First write and flush creates a container in the datanode
-    key.write("ratis".getBytes());
+    key.write(data);
     key.flush();
     key.write("ratis".getBytes());
 
@@ -162,58 +177,4 @@ public class TestContainerStateMachine {
             == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
   }
 
-  @Test
-  public void testRatisSnapshotRetention() throws Exception {
-
-    ContainerStateMachine stateMachine =
-        (ContainerStateMachine) TestHelper.getStateMachine(cluster);
-    SimpleStateMachineStorage storage =
-        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
-    Assert.assertNull(storage.findLatestSnapshot());
-
-    // Write 10 keys. Num snapshots should be equal to config value.
-    for (int i = 1; i <= 10; i++) {
-      OzoneOutputStream key =
-          objectStore.getVolume(volumeName).getBucket(bucketName)
-              .createKey(("ratis" + i), 1024, ReplicationType.RATIS,
-                  ReplicationFactor.ONE, new HashMap<>());
-      // First write and flush creates a container in the datanode
-      key.write(("ratis" + i).getBytes());
-      key.flush();
-      key.write(("ratis" + i).getBytes());
-      key.close();
-    }
-
-    RatisServerConfiguration ratisServerConfiguration =
-        conf.getObject(RatisServerConfiguration.class);
-
-    stateMachine =
-        (ContainerStateMachine) TestHelper.getStateMachine(cluster);
-    storage = (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
-    Path parentPath = storage.findLatestSnapshot().getFile().getPath();
-    int numSnapshots = parentPath.getParent().toFile().listFiles().length;
-    Assert.assertTrue(Math.abs(ratisServerConfiguration
-        .getNumSnapshotsRetained() - numSnapshots) <= 1);
-
-    // Write 10 more keys. Num Snapshots should remain the same.
-    for (int i = 11; i <= 20; i++) {
-      OzoneOutputStream key =
-          objectStore.getVolume(volumeName).getBucket(bucketName)
-              .createKey(("ratis" + i), 1024, ReplicationType.RATIS,
-                  ReplicationFactor.ONE, new HashMap<>());
-      // First write and flush creates a container in the datanode
-      key.write(("ratis" + i).getBytes());
-      key.flush();
-      key.write(("ratis" + i).getBytes());
-      key.close();
-    }
-    stateMachine =
-        (ContainerStateMachine) TestHelper.getStateMachine(cluster);
-    storage = (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
-    parentPath = storage.findLatestSnapshot().getFile().getPath();
-    numSnapshots = parentPath.getParent().toFile().listFiles().length;
-    Assert.assertTrue(Math.abs(ratisServerConfiguration
-        .getNumSnapshotsRetained() - numSnapshots) <= 1);
-  }
-
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index d172dd5..2edf369 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -123,7 +123,8 @@ public class TestFailureHandlingByClient {
             RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
                     "watch.request.timeout",
             3, TimeUnit.SECONDS);
-
+    conf.setBoolean(
+        OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
     conf.setQuietMode(false);
     conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
         StaticMapping.class, DNSToSwitchMapping.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java
new file mode 100644
index 0000000..95e275b
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClientFlushDelay.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.junit.*;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests Exception handling by Ozone Client by set flush delay.
+ */
+public class TestFailureHandlingByClientFlushDelay {
+
+  /**
+    * Set a timeout for each test.
+    */
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  private MiniOzoneCluster cluster;
+  private OzoneConfiguration conf;
+  private OzoneClient client;
+  private ObjectStore objectStore;
+  private int chunkSize;
+  private int flushSize;
+  private int maxFlushSize;
+  private int blockSize;
+  private String volumeName;
+  private String bucketName;
+  private String keyString;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  private void init() throws Exception {
+    conf = new OzoneConfiguration();
+    chunkSize = 100;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 4 * chunkSize;
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+    conf.setTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
+        1, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+        1, TimeUnit.SECONDS);
+    conf.setBoolean(
+        OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 2);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+                    DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+            3, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+                    DatanodeRatisServerConfig.
+                            RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
+            3, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+                    "rpc.request.timeout",
+            3, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+            RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+                    "watch.request.timeout",
+            3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        StaticMapping.class, DNSToSwitchMapping.class);
+    StaticMapping.addNodeToRack(NetUtils.normalizeHostNames(
+        Collections.singleton(HddsUtils.getHostName(conf))).get(0),
+        "/rack1");
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(10)
+        .setTotalPipelineNumLimit(15)
+        .setChunkSize(chunkSize)
+        .setBlockSize(blockSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getRpcClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "datanodefailurehandlingtest";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  private void startCluster() throws Exception {
+    init();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testPipelineExclusionWithPipelineFailure() throws Exception {
+    startCluster();
+    String keyName = UUID.randomUUID().toString();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, blockSize);
+    String data = ContainerTestHelper
+        .getFixedLengthString(keyString,  chunkSize);
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) key.getOutputStream();
+    List<BlockOutputStreamEntry> streamEntryList =
+        keyOutputStream.getStreamEntries();
+
+    // Assert that 1 block will be preallocated
+    Assert.assertEquals(1, streamEntryList.size());
+    key.write(data.getBytes());
+    key.flush();
+    long containerId = streamEntryList.get(0).getBlockID().getContainerID();
+    BlockID blockId = streamEntryList.get(0).getBlockID();
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+    // Two nodes, next write will hit AlreadyClosedException , the pipeline
+    // will be added in the exclude list
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+    cluster.shutdownHddsDatanode(datanodes.get(1));
+
+    key.write(data.getBytes());
+    key.flush();
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getContainerIds().isEmpty());
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getDatanodes().isEmpty());
+    Assert.assertTrue(
+        keyOutputStream.getExcludeList().getDatanodes().isEmpty());
+    key.write(data.getBytes());
+    // The close will just write to the buffer
+    key.close();
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .setRefreshPipeline(true)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+
+    // Make sure a new block is written
+    Assert.assertNotEquals(
+        keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly().get(0)
+            .getBlockID(), blockId);
+    Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize());
+    validateData(keyName, data.concat(data).concat(data).getBytes());
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return TestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+
+  private void validateData(String keyName, byte[] data) throws Exception {
+    TestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 8f0ce00..d93d817 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -57,6 +57,7 @@ import org.junit.Rule;
 import org.junit.rules.Timeout;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
 
 /**
  * Tests failure detection and handling in BlockOutputStream Class.
@@ -100,6 +101,7 @@ public class TestOzoneClientRetriesOnException {
     conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
     conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3);
     conf.setQuietMode(false);
+    conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
         .setTotalPipelineNumLimit(10)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
similarity index 54%
rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java
rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
index cf719b2..b9029ea 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2BlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
@@ -14,12 +14,19 @@
  * License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.hadoop.ozone.client.rpc;
 
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -30,44 +37,42 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.TestHelper;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.junit.*;
+import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.Rule;
-import org.junit.rules.Timeout;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
 
 /**
- * Tests BlockOutputStream class.
+ * Tests failure detection and handling in BlockOutputStream Class by set
+ * flush delay.
  */
-public class Test2BlockOutputStream {
+public class TestOzoneClientRetriesOnExceptionFlushDelay {
 
   /**
-    * Set a timeout for each test.
-    */
+   * Set a timeout for each test.
+   */
   @Rule
   public Timeout timeout = new Timeout(300000);
+
   private static MiniOzoneCluster cluster;
-  private static OzoneConfiguration conf = new OzoneConfiguration();
-  private static OzoneClient client;
-  private static ObjectStore objectStore;
-  private static int chunkSize;
-  private static int flushSize;
-  private static int maxFlushSize;
-  private static int blockSize;
-  private static String volumeName;
-  private static String bucketName;
-  private static String keyString;
+  private OzoneConfiguration conf = new OzoneConfiguration();
+  private OzoneClient client;
+  private ObjectStore objectStore;
+  private int chunkSize;
+  private int flushSize;
+  private int maxFlushSize;
+  private int blockSize;
+  private String volumeName;
+  private String bucketName;
+  private String keyString;
+  private XceiverClientManager xceiverClientManager;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -76,19 +81,18 @@ public class Test2BlockOutputStream {
    *
    * @throws IOException
    */
-  @BeforeClass
-  public static void init() throws Exception {
+  @Before
+  public void init() throws Exception {
     chunkSize = 100;
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
-    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000,
+        TimeUnit.MILLISECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3);
     conf.setQuietMode(false);
-    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
-        StorageUnit.MB);
-    conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, true);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
         .setTotalPipelineNumLimit(10)
@@ -102,8 +106,9 @@ public class Test2BlockOutputStream {
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getRpcClient(conf);
     objectStore = client.getObjectStore();
+    xceiverClientManager = new XceiverClientManager(conf);
     keyString = UUID.randomUUID().toString();
-    volumeName = "testblockoutputstream";
+    volumeName = "testblockoutputstreamwithretries";
     bucketName = volumeName;
     objectStore.createVolume(volumeName);
     objectStore.getVolume(volumeName).createBucket(bucketName);
@@ -116,66 +121,67 @@ public class Test2BlockOutputStream {
   /**
    * Shutdown MiniDFSCluster.
    */
-  @AfterClass
-  public static void shutdown() {
+  @After
+  public void shutdown() {
     if (cluster != null) {
       cluster.shutdown();
     }
   }
 
   @Test
-  public void testFlushChunkDelay() throws Exception {
-    String keyName1 = getKeyName();
-    OzoneOutputStream key1 = createKey(keyName1, ReplicationType.RATIS, 0);
-
+  public void testGroupMismatchExceptionHandling() throws Exception {
+    String keyName = getKeyName();
+    // make sure flush will sync data.
+    int dataLength = maxFlushSize + chunkSize;
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS,
+        dataLength);
+    // write data more than 1 chunk
     byte[] data1 =
-        ContainerTestHelper.getFixedLengthString(keyString, 10)
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
-    key1.write(data1);
-    key1.flush();
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key1.getOutputStream();
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    long containerID =
+        keyOutputStream.getStreamEntries().get(0).
+            getBlockID().getContainerID();
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerID));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    XceiverClientSpi xceiverClient =
+        xceiverClientManager.acquireClient(pipeline);
+    xceiverClient.sendCommand(ContainerTestHelper
+        .getCreateContainerRequest(containerID, pipeline));
+    xceiverClientManager.releaseClient(xceiverClient, false);
+    key.write(data1);
     OutputStream stream = keyOutputStream.getStreamEntries().get(0)
         .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
-
-    // we have just written data(length 10) less than chunk Size,
-    // at this time we call flush will not  sync data.
-    Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
-    key1.close();
-    validateData(keyName1, data1);
-
-    String keyName2 = getKeyName();
-    OzoneOutputStream key2 = createKey(keyName2, ReplicationType.RATIS, 0);
-    byte[] data2 =
-            ContainerTestHelper.getFixedLengthString(keyString, 110)
-                    .getBytes(UTF_8);
-    key2.write(data2);
-    key2.flush();
-    keyOutputStream = (KeyOutputStream)key2.getOutputStream();
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    stream = keyOutputStream.getStreamEntries().get(0)
-            .getOutputStream();
-    Assert.assertTrue(stream instanceof BlockOutputStream);
-    blockOutputStream = (BlockOutputStream) stream;
-
-    // we have just written data(length 110) greater than chunk Size,
-    // at this time we call flush will sync data.
-    Assert.assertEquals(data2.length,
-            blockOutputStream.getTotalDataFlushedLength());
-    key2.close();
-    validateData(keyName2, data2);
+    TestHelper.waitForPipelineClose(key, cluster, false);
+    key.flush();
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+        .getIoException()) instanceof GroupMismatchException);
+    Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
+        .contains(pipeline.getId()));
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+    key.close();
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+    validateData(keyName, data1);
   }
 
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
-      long size) throws Exception {
+                                      long size) throws Exception {
     return TestHelper
-        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+        .createKey(keyName, type, ReplicationFactor.ONE,
+            size, objectStore, volumeName, bucketName);
   }
+
   private void validateData(String keyName, byte[] data) throws Exception {
     TestHelper
         .validateData(keyName, data, objectStore, volumeName, bucketName);
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org