You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/04/17 08:50:52 UTC

[hadoop] branch trunk updated: Revert "Merge pull request #729 from bshashikant/HDDS-1373"

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e5a918c  Revert "Merge pull request #729 from bshashikant/HDDS-1373"
e5a918c is described below

commit e5a918cda3e22ec5d49be2003e9b273ffc2dfd45
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Wed Apr 17 14:17:54 2019 +0530

    Revert "Merge pull request #729 from bshashikant/HDDS-1373"
    
    This reverts commit 4ff4314e40a4449f6d68f1a2c0f713baeb70fb10, reversing
    changes made to 04c0437d13cbe8474224735cc6c41d0f6ea917f9.
---
 .../scm/container/common/helpers/ExcludeList.java  |   6 -
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 198 ++++----
 .../ozone/client/rpc/TestBlockOutputStream.java    |  16 +-
 .../rpc/TestBlockOutputStreamWithFailures.java     |  27 +-
 .../rpc/TestOzoneClientRetriesOnException.java     |  46 +-
 .../ozone/client/rpc/TestWatchForCommit.java       | 512 ---------------------
 .../ozone/container/ContainerTestHelper.java       |   4 +-
 7 files changed, 114 insertions(+), 695 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
index eb215d6..94a4b94 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
@@ -102,10 +102,4 @@ public class ExcludeList {
     });
     return excludeList;
   }
-
-  public void clear() {
-    datanodes.clear();
-    containerIds.clear();
-    pipelineIds.clear();
-  }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index c1f195f..0d9529f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -295,66 +295,60 @@ public class KeyOutputStream extends OutputStream {
       throws IOException {
     int succeededAllocates = 0;
     while (len > 0) {
-      try {
-        if (streamEntries.size() <= currentStreamIndex) {
-          Preconditions.checkNotNull(omClient);
-          // allocate a new block, if a exception happens, log an error and
-          // throw exception to the caller directly, and the write fails.
-          try {
-            allocateNewBlock(currentStreamIndex);
-            succeededAllocates += 1;
-          } catch (IOException ioe) {
-            LOG.error("Try to allocate more blocks for write failed, already "
-                + "allocated " + succeededAllocates
-                + " blocks for this write.");
-            throw ioe;
-          }
-        }
-        // in theory, this condition should never violate due the check above
-        // still do a sanity check.
-        Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
-        BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-
-        // length(len) will be in int range if the call is happening through
-        // write API of blockOutputStream. Length can be in long range if it comes
-        // via Exception path.
-        int writeLen = Math.min((int) len, (int) current.getRemaining());
-        long currentPos = current.getWrittenDataLength();
+      if (streamEntries.size() <= currentStreamIndex) {
+        Preconditions.checkNotNull(omClient);
+        // allocate a new block, if a exception happens, log an error and
+        // throw exception to the caller directly, and the write fails.
         try {
-          if (retry) {
-            current.writeOnRetry(len);
-          } else {
-            current.write(b, off, writeLen);
-            offset += writeLen;
-          }
+          allocateNewBlock(currentStreamIndex);
+          succeededAllocates += 1;
         } catch (IOException ioe) {
-          // for the current iteration, totalDataWritten - currentPos gives the
-          // amount of data already written to the buffer
-
-          // In the retryPath, the total data to be written will always be equal
-          // to or less than the max length of the buffer allocated.
-          // The len specified here is the combined sum of the data length of
-          // the buffers
-          Preconditions.checkState(!retry || len <= streamBufferMaxSize);
-          int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
-          writeLen = retry ? (int) len : dataWritten;
-          // In retry path, the data written is already accounted in offset.
-          if (!retry) {
-            offset += writeLen;
-          }
-          LOG.debug("writeLen {}, total len {}", writeLen, len);
-          handleException(current, currentStreamIndex, ioe);
+          LOG.error("Try to allocate more blocks for write failed, already "
+              + "allocated " + succeededAllocates + " blocks for this write.");
+          throw ioe;
         }
-        if (current.getRemaining() <= 0) {
-          // since the current block is already written close the stream.
-          handleFlushOrClose(StreamAction.FULL);
+      }
+      // in theory, this condition should never violate due the check above
+      // still do a sanity check.
+      Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+      BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+
+      // length(len) will be in int range if the call is happening through
+      // write API of blockOutputStream. Length can be in long range if it comes
+      // via Exception path.
+      int writeLen = Math.min((int)len, (int) current.getRemaining());
+      long currentPos = current.getWrittenDataLength();
+      try {
+        if (retry) {
+          current.writeOnRetry(len);
+        } else {
+          current.write(b, off, writeLen);
+          offset += writeLen;
+        }
+      } catch (IOException ioe) {
+        // for the current iteration, totalDataWritten - currentPos gives the
+        // amount of data already written to the buffer
+
+        // In the retryPath, the total data to be written will always be equal
+        // to or less than the max length of the buffer allocated.
+        // The len specified here is the combined sum of the data length of
+        // the buffers
+        Preconditions.checkState(!retry || len <= streamBufferMaxSize);
+        int dataWritten  = (int) (current.getWrittenDataLength() - currentPos);
+        writeLen = retry ? (int) len : dataWritten;
+        // In retry path, the data written is already accounted in offset.
+        if (!retry) {
+          offset += writeLen;
         }
-        len -= writeLen;
-        off += writeLen;
-      } catch (Exception e) {
-        markStreamClosed();
-        throw e;
+        LOG.debug("writeLen {}, total len {}", writeLen, len);
+        handleException(current, currentStreamIndex, ioe);
+      }
+      if (current.getRemaining() <= 0) {
+        // since the current block is already written close the stream.
+        handleFlushOrClose(StreamAction.FULL);
       }
+      len -= writeLen;
+      off += writeLen;
     }
   }
 
@@ -371,7 +365,7 @@ public class KeyOutputStream extends OutputStream {
     // pre allocated blocks available.
 
     // This will be called only to discard the next subsequent unused blocks
-    // in the streamEntryList.
+    // in the sreamEntryList.
     if (streamIndex < streamEntries.size()) {
       ListIterator<BlockOutputStreamEntry> streamEntryIterator =
           streamEntries.listIterator(streamIndex);
@@ -404,20 +398,6 @@ public class KeyOutputStream extends OutputStream {
       }
     }
   }
-
-  private void cleanup() {
-    if (excludeList != null) {
-      excludeList.clear();
-      excludeList = null;
-    }
-    if (bufferPool != null) {
-      bufferPool.clearBufferPool();
-    }
-
-    if (streamEntries != null) {
-      streamEntries.clear();
-    }
-  }
   /**
    * It performs following actions :
    * a. Updates the committed length at datanode for the current stream in
@@ -438,7 +418,8 @@ public class KeyOutputStream extends OutputStream {
       closedContainerException = checkIfContainerIsClosed(t);
     }
     PipelineID pipelineId = null;
-    long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
+    long totalSuccessfulFlushedData =
+        streamEntry.getTotalAckDataLength();
     //set the correct length for the current stream
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     long bufferedDataLen = computeBufferData();
@@ -469,8 +450,8 @@ public class KeyOutputStream extends OutputStream {
     if (closedContainerException) {
       // discard subsequent pre allocated blocks from the streamEntries list
       // from the closed container
-      discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null,
-          streamIndex + 1);
+      discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
+          null, streamIndex + 1);
     } else {
       // In case there is timeoutException or Watch for commit happening over
       // majority or the client connection failure to the leader in the
@@ -494,11 +475,6 @@ public class KeyOutputStream extends OutputStream {
     }
   }
 
-  private void markStreamClosed() {
-    cleanup();
-    closed = true;
-  }
-
   private void handleRetry(IOException exception, long len) throws IOException {
     RetryPolicy.RetryAction action;
     try {
@@ -610,46 +586,40 @@ public class KeyOutputStream extends OutputStream {
       return;
     }
     while (true) {
-      try {
-        int size = streamEntries.size();
-        int streamIndex =
-            currentStreamIndex >= size ? size - 1 : currentStreamIndex;
-        BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
-        if (entry != null) {
-          try {
-            Collection<DatanodeDetails> failedServers =
-                entry.getFailedServers();
-            // failed servers can be null in case there is no data written in the
-            // stream
-            if (failedServers != null && !failedServers.isEmpty()) {
-              excludeList.addDatanodes(failedServers);
-            }
-            switch (op) {
-            case CLOSE:
+      int size = streamEntries.size();
+      int streamIndex =
+          currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+      BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
+      if (entry != null) {
+        try {
+          Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+          // failed servers can be null in case there is no data written in the
+          // stream
+          if (failedServers != null && !failedServers.isEmpty()) {
+            excludeList.addDatanodes(failedServers);
+          }
+          switch (op) {
+          case CLOSE:
+            entry.close();
+            break;
+          case FULL:
+            if (entry.getRemaining() == 0) {
               entry.close();
-              break;
-            case FULL:
-              if (entry.getRemaining() == 0) {
-                entry.close();
-                currentStreamIndex++;
-              }
-              break;
-            case FLUSH:
-              entry.flush();
-              break;
-            default:
-              throw new IOException("Invalid Operation");
+              currentStreamIndex++;
             }
-          } catch (IOException ioe) {
-            handleException(entry, streamIndex, ioe);
-            continue;
+            break;
+          case FLUSH:
+            entry.flush();
+            break;
+          default:
+            throw new IOException("Invalid Operation");
           }
+        } catch (IOException ioe) {
+          handleException(entry, streamIndex, ioe);
+          continue;
         }
-        break;
-      } catch (Exception e) {
-        markStreamClosed();
-        throw e;
       }
+      break;
     }
   }
 
@@ -688,7 +658,7 @@ public class KeyOutputStream extends OutputStream {
     } catch (IOException ioe) {
       throw ioe;
     } finally {
-      cleanup();
+      bufferPool.clearBufferPool();
     }
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 399b977..32bef12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -189,7 +189,6 @@ public class TestBlockOutputStream {
     // flush ensures watchForCommit updates the total length acknowledged
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
 
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
 
@@ -209,7 +208,7 @@ public class TestBlockOutputStream {
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -264,7 +263,6 @@ public class TestBlockOutputStream {
     // Now do a flush. This will flush the data and update the flush length and
     // the map.
     key.flush();
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     // flush is a sync call, all pending operations will complete
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -304,7 +302,7 @@ public class TestBlockOutputStream {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 3,
         metrics.getTotalOpCount());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -399,7 +397,6 @@ public class TestBlockOutputStream {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 3,
         metrics.getTotalOpCount());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -457,7 +454,6 @@ public class TestBlockOutputStream {
         blockOutputStream.getCommitIndex2flushedDataMap().size());
 
     Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     key.close();
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -475,7 +471,7 @@ public class TestBlockOutputStream {
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -540,7 +536,6 @@ public class TestBlockOutputStream {
     // Now do a flush. This will flush the data and update the flush length and
     // the map.
     key.flush();
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -575,7 +570,7 @@ public class TestBlockOutputStream {
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -643,7 +638,6 @@ public class TestBlockOutputStream {
     // Now do a flush. This will flush the data and update the flush length and
     // the map.
     key.flush();
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -679,7 +673,7 @@ public class TestBlockOutputStream {
         metrics.getTotalOpCount());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 89a2af9..f228dad 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -234,7 +234,6 @@ public class TestBlockOutputStreamWithFailures {
     // and one flush for partial chunk
     key.flush();
 
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
 
@@ -250,7 +249,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -373,7 +372,6 @@ public class TestBlockOutputStreamWithFailures {
     key.flush();
     Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
 
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     Assert
@@ -384,7 +382,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -517,14 +515,13 @@ public class TestBlockOutputStreamWithFailures {
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // now close the stream, It will update the ack length after watchForCommit
-
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     key.close();
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -541,7 +538,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -640,7 +637,6 @@ public class TestBlockOutputStreamWithFailures {
     // and one flush for partial chunk
     key.flush();
 
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
     // Make sure the retryCount is reset after the exception is handled
@@ -656,6 +652,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -666,7 +663,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 9,
         metrics.getTotalOpCount());
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
@@ -777,6 +774,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -787,7 +785,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 9,
         metrics.getTotalOpCount());
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
@@ -913,7 +911,6 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // commitInfoMap will remain intact as there is no server failure
     Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     // make sure the bufferPool is empty
@@ -922,7 +919,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -1049,7 +1046,6 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     Assert
@@ -1058,7 +1054,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -1075,7 +1071,6 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 22,
         metrics.getTotalOpCount());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
@@ -1203,7 +1198,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 5cb6dbc..381cf14 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -19,12 +19,10 @@ package org.apache.hadoop.ozone.client.rpc;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -68,7 +66,6 @@ public class TestOzoneClientRetriesOnException {
   private String volumeName;
   private String bucketName;
   private String keyString;
-  private XceiverClientManager xceiverClientManager;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -87,6 +84,8 @@ public class TestOzoneClientRetriesOnException {
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
+    conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s");
     conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
@@ -101,7 +100,6 @@ public class TestOzoneClientRetriesOnException {
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
     objectStore = client.getObjectStore();
-    xceiverClientManager = new XceiverClientManager(conf);
     keyString = UUID.randomUUID().toString();
     volumeName = "testblockoutputstreamwithretries";
     bucketName = volumeName;
@@ -154,9 +152,8 @@ public class TestOzoneClientRetriesOnException {
         .getIoException()) instanceof GroupMismatchException);
     Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
         .contains(pipeline.getId()));
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
     key.close();
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
     validateData(keyName, data1);
   }
 
@@ -174,8 +171,13 @@ public class TestOzoneClientRetriesOnException {
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
+    key.write(data1);
+
+    OutputStream stream = entries.get(0).getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+    List<PipelineID> pipelineList = new ArrayList<>();
     long containerID;
-    List<Long> containerList = new ArrayList<>();
     for (BlockOutputStreamEntry entry : entries) {
       containerID = entry.getBlockID().getContainerID();
       ContainerInfo container =
@@ -184,40 +186,18 @@ public class TestOzoneClientRetriesOnException {
       Pipeline pipeline =
           cluster.getStorageContainerManager().getPipelineManager()
               .getPipeline(container.getPipelineID());
-      XceiverClientSpi xceiverClient =
-          xceiverClientManager.acquireClient(pipeline);
-      if (!containerList.contains(containerID)) {
-        xceiverClient.sendCommand(ContainerTestHelper
-            .getCreateContainerRequest(containerID, pipeline));
-      }
-      xceiverClientManager.releaseClient(xceiverClient, false);
+      pipelineList.add(pipeline.getId());
     }
-    key.write(data1);
-    OutputStream stream = entries.get(0).getOutputStream();
-    Assert.assertTrue(stream instanceof BlockOutputStream);
-    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
-    ContainerTestHelper.waitForContainerClose(key, cluster);
+    ContainerTestHelper.waitForPipelineClose(key, cluster, false);
     try {
       key.write(data1);
-      Assert.fail("Expected exception not thrown");
     } catch (IOException ioe) {
       Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
-          .getIoException()) instanceof ContainerNotOpenException);
+          .getIoException()) instanceof GroupMismatchException);
       Assert.assertTrue(ioe.getMessage().contains(
           "Retry request failed. retries get failed due to exceeded maximum "
               + "allowed retries number: 3"));
     }
-    try {
-      key.flush();
-      Assert.fail("Expected exception not thrown");
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe.getMessage().contains("Stream is closed"));
-    }
-    try {
-      key.close();
-    } catch (IOException ioe) {
-      Assert.fail("Expected should not be thrown");
-    }
   }
 
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
deleted file mode 100644
index 823ee48..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc;
-
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.*;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.io.KeyOutputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-
-/**
- * This class verifies the watchForCommit Handling by client.
- */
-public class TestWatchForCommit {
-
-  private MiniOzoneCluster cluster;
-  private OzoneConfiguration conf;
-  private OzoneClient client;
-  private ObjectStore objectStore;
-  private String volumeName;
-  private String bucketName;
-  private String keyString;
-  private int chunkSize;
-  private int flushSize;
-  private int maxFlushSize;
-  private int blockSize;
-  private StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocationClient;
-  private static String containerOwner = "OZONE";
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true
-   *
-   * @throws IOException
-   */
-  private void startCluster(OzoneConfiguration conf) throws Exception {
-    chunkSize = 100;
-    flushSize = 2 * chunkSize;
-    maxFlushSize = 2 * flushSize;
-    blockSize = 2 * maxFlushSize;
-
-    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
-        1, TimeUnit.SECONDS);
-
-    conf.setQuietMode(false);
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(7)
-        .setBlockSize(blockSize)
-        .setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
-        .setStreamBufferMaxSize(maxFlushSize)
-        .setStreamBufferSizeUnit(StorageUnit.BYTES)
-        .build();
-    cluster.waitForClusterToBeReady();
-    //the easiest way to create an open container is creating a key
-    client = OzoneClientFactory.getClient(conf);
-    objectStore = client.getObjectStore();
-    keyString = UUID.randomUUID().toString();
-    volumeName = "watchforcommithandlingtest";
-    bucketName = volumeName;
-    objectStore.createVolume(volumeName);
-    objectStore.getVolume(volumeName).createBucket(bucketName);
-    storageContainerLocationClient = cluster
-        .getStorageContainerLocationClient();
-  }
-
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  private void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  private String getKeyName() {
-    return UUID.randomUUID().toString();
-  }
-
-  @Test
-  public void testWatchForCommitWithKeyWrite() throws Exception {
-    // in this case, watch request should fail with RaftRetryFailureException
-    // and will be captured in keyOutputStream and the failover will happen
-    // to a different block
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 10,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
-    startCluster(conf);
-    XceiverClientMetrics metrics =
-        XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
-    long totalOpCount = metrics.getTotalOpCount();
-    String keyName = getKeyName();
-    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    int dataLength = maxFlushSize + 50;
-    // write data more than 1 chunk
-    byte[] data1 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
-            .getBytes(UTF_8);
-    key.write(data1);
-    // since its hitting the full bufferCondition, it will call watchForCommit
-    // and completes atleast putBlock for first flushSize worth of data
-    Assert.assertTrue(
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
-            <= pendingWriteChunkCount + 2);
-    Assert.assertTrue(
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
-            <= pendingPutBlockCount + 1);
-    Assert.assertEquals(writeChunkCount + 4,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(putBlockCount + 2,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6,
-        metrics.getTotalOpCount());
-    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
-
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
-    Assert.assertTrue(stream instanceof BlockOutputStream);
-    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
-
-    // we have just written data more than flush Size(2 chunks), at this time
-    // buffer pool will have 3 buffers allocated worth of chunk size
-
-    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
-    // writtenDataLength as well flushedDataLength will be updated here
-    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    Assert.assertEquals(maxFlushSize,
-        blockOutputStream.getTotalDataFlushedLength());
-
-    // since data equals to maxBufferSize is written, this will be a blocking
-    // call and hence will wait for atleast flushSize worth of data to get
-    // acked by all servers right here
-    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
-
-    // watchForCommit will clean up atleast one entry from the map where each
-    // entry corresponds to flushSize worth of data
-    Assert.assertTrue(
-        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
-
-    // Now do a flush. This will flush the data and update the flush length and
-    // the map.
-    key.flush();
-
-    Assert.assertEquals(pendingWriteChunkCount,
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(pendingPutBlockCount,
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(writeChunkCount + 5,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(putBlockCount + 3,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8,
-        metrics.getTotalOpCount());
-
-    // Since the data in the buffer is already flushed, flush here will have
-    // no impact on the counters and data structures
-
-    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
-    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    Assert.assertEquals(dataLength,
-        blockOutputStream.getTotalDataFlushedLength());
-    // flush will make sure one more entry gets updated in the map
-    Assert.assertTrue(
-        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
-
-    XceiverClientRatis raftClient =
-        (XceiverClientRatis) blockOutputStream.getXceiverClient();
-    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
-    Pipeline pipeline = raftClient.getPipeline();
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
-    // again write data with more than max buffer limit. This will call
-    // watchForCommit again. Since the commit will happen 2 way, the
-    // commitInfoMap will get updated for servers which are alive
-
-    // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
-    // once exception is hit
-    key.write(data1);
-
-    // As a part of handling the exception, 4 failed writeChunks  will be
-    // rewritten plus one partial chunk plus two putBlocks for flushSize
-    // and one flush for partial chunk
-    key.flush();
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
-        .getIoException()) instanceof RaftRetryFailureException);
-    // Make sure the retryCount is reset after the exception is handled
-    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
-    // now close the stream, It will update the ack length after watchForCommit
-    key.close();
-    Assert
-        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
-    Assert.assertEquals(pendingWriteChunkCount,
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(pendingPutBlockCount,
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(writeChunkCount + 14,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(putBlockCount + 8,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22,
-        metrics.getTotalOpCount());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    // make sure the bufferPool is empty
-    Assert
-        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
-    validateData(keyName, data1);
-    shutdown();
-  }
-
-  @Test
-  public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
-    startCluster(conf);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-    ContainerWithPipeline container1 =
-        storageContainerLocationClient.allocateContainer(
-            HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
-            containerOwner);
-    XceiverClientSpi client = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, client.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        client.getPipeline());
-    Pipeline pipeline = client.getPipeline();
-    XceiverClientReply reply =
-        client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
-            container1.getContainerInfo().getContainerID(),
-            client.getPipeline()));
-    reply.getResponse().get();
-    long index = reply.getLogIndex();
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
-    try {
-      // just watch for a lo index which in not updated in the commitInfo Map
-      client.watchForCommit(index + 1, 3000);
-      Assert.fail("expected exception not thrown");
-    } catch (Exception e) {
-      Assert.assertTrue(
-          HddsClientUtils.checkForException(e) instanceof TimeoutException);
-    }
-    // After releasing the client, this connection should be closed
-    // and any container operations should fail
-    clientManager.releaseClient(client, false);
-    shutdown();
-  }
-
-  @Test
-  public void testWatchForCommitForRetryfailure() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3);
-    startCluster(conf);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-    ContainerWithPipeline container1 =
-        storageContainerLocationClient.allocateContainer(
-            HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
-            containerOwner);
-    XceiverClientSpi client = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, client.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        client.getPipeline());
-    Pipeline pipeline = client.getPipeline();
-    XceiverClientReply reply =
-        client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
-            container1.getContainerInfo().getContainerID(),
-            client.getPipeline()));
-    reply.getResponse().get();
-    long index = reply.getLogIndex();
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
-    // again write data with more than max buffer limit. This wi
-    try {
-      // just watch for a lo index which in not updated in the commitInfo Map
-      client.watchForCommit(index + 1, 20000);
-      Assert.fail("expected exception not thrown");
-    } catch (Exception e) {
-      Assert.assertTrue(HddsClientUtils
-          .checkForException(e) instanceof RaftRetryFailureException);
-    }
-    clientManager.releaseClient(client, false);
-    shutdown();
-  }
-
-  @Test
-  public void test2WayCommitForRetryfailure() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 3);
-    startCluster(conf);
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-
-    ContainerWithPipeline container1 =
-        storageContainerLocationClient.allocateContainer(
-            HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
-            containerOwner);
-    XceiverClientSpi client = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, client.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        client.getPipeline());
-    Pipeline pipeline = client.getPipeline();
-    XceiverClientRatis ratisClient = (XceiverClientRatis) client;
-    XceiverClientReply reply =
-        client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
-            container1.getContainerInfo().getContainerID(),
-            client.getPipeline()));
-    reply.getResponse().get();
-    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    reply = client.sendCommandAsync(ContainerTestHelper
-        .getCloseContainer(pipeline,
-            container1.getContainerInfo().getContainerID()));
-    reply.getResponse().get();
-    client.watchForCommit(reply.getLogIndex(), 20000);
-
-    // commitInfo Map will be reduced to 2 here
-    Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
-    clientManager.releaseClient(client, false);
-    Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
-    Assert.assertTrue(
-        logCapturer.getOutput().contains("RaftRetryFailureException"));
-    Assert
-        .assertTrue(logCapturer.getOutput().contains("Committed by majority"));
-    logCapturer.stopCapturing();
-    shutdown();
-  }
-
-  @Test
-  public void test2WayCommitForTimeoutException() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
-    startCluster(conf);
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-
-    ContainerWithPipeline container1 =
-        storageContainerLocationClient.allocateContainer(
-            HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
-            containerOwner);
-    XceiverClientSpi client = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, client.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        client.getPipeline());
-    Pipeline pipeline = client.getPipeline();
-    XceiverClientRatis ratisClient = (XceiverClientRatis) client;
-    XceiverClientReply reply =
-        client.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(
-            container1.getContainerInfo().getContainerID(),
-            client.getPipeline()));
-    reply.getResponse().get();
-    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    reply = client.sendCommandAsync(ContainerTestHelper
-        .getCloseContainer(pipeline,
-            container1.getContainerInfo().getContainerID()));
-    reply.getResponse().get();
-    client.watchForCommit(reply.getLogIndex(), 3000);
-
-    // commitInfo Map will be reduced to 2 here
-    Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
-    clientManager.releaseClient(client, false);
-    Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
-    Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException"));
-    Assert
-        .assertTrue(logCapturer.getOutput().contains("Committed by majority"));
-    logCapturer.stopCapturing();
-    shutdown();
-  }
-
-  @Test
-  public void testWatchForCommitForGroupMismatchException() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
-
-    // mark the node stale early so that pipeline gets destroyed quickly
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
-    startCluster(conf);
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-
-    ContainerWithPipeline container1 =
-        storageContainerLocationClient.allocateContainer(
-            HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
-            containerOwner);
-    XceiverClientSpi client = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, client.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        client.getPipeline());
-    Pipeline pipeline = client.getPipeline();
-    XceiverClientRatis ratisClient = (XceiverClientRatis) client;
-    long containerId = container1.getContainerInfo().getContainerID();
-    XceiverClientReply reply = client.sendCommandAsync(ContainerTestHelper
-        .getCreateContainerRequest(containerId, client.getPipeline()));
-    reply.getResponse().get();
-    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
-    List<Pipeline> pipelineList = new ArrayList<>();
-    pipelineList.add(pipeline);
-    ContainerTestHelper.waitForPipelineClose(pipelineList, cluster);
-    try {
-      // just watch for a lo index which in not updated in the commitInfo Map
-      //client.watchForCommit(reply.getLogIndex() + 1, 20000);
-      reply = client.sendCommandAsync(ContainerTestHelper
-          .getCreateContainerRequest(containerId, client.getPipeline()));
-      reply.getResponse().get();
-      Assert.fail("Expected exception not thrown");
-    } catch(Exception e) {
-      Assert.assertTrue(HddsClientUtils
-          .checkForException(e) instanceof GroupMismatchException);
-    }
-    clientManager.releaseClient(client, false);
-    shutdown();
-  }
-
-  private OzoneOutputStream createKey(String keyName, ReplicationType type,
-      long size) throws Exception {
-    return ContainerTestHelper
-        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
-  }
-
-  private void validateData(String keyName, byte[] data) throws Exception {
-    ContainerTestHelper
-        .validateData(keyName, data, objectStore, volumeName, bucketName);
-  }
-}
-
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index a1fd17c..93807b4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -727,9 +727,7 @@ public final class ContainerTestHelper {
         keyOutputStream.getLocationInfoList();
     List<Long> containerIdList = new ArrayList<>();
     for (OmKeyLocationInfo info : locationInfoList) {
-      long id = info.getContainerID();
-      if (!containerIdList.contains(id))
-      containerIdList.add(id);
+      containerIdList.add(info.getContainerID());
     }
     Assert.assertTrue(!containerIdList.isEmpty());
     waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org