You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/04/02 08:32:54 UTC

[hadoop] branch trunk updated: HDDS-1337. Handle GroupMismatchException in OzoneClient. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d31c868  HDDS-1337. Handle GroupMismatchException in OzoneClient. Contributed by Shashikant Banerjee.
d31c868 is described below

commit d31c86892e0ceec5d642f76fc9123fac4fd80db8
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Tue Apr 2 16:27:11 2019 +0800

    HDDS-1337. Handle GroupMismatchException in OzoneClient. Contributed by Shashikant Banerjee.
---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  43 +++--
 .../transport/server/ratis/XceiverServerRatis.java |  11 ++
 hadoop-hdds/pom.xml                                |   2 +-
 .../hadoop/ozone/client/OzoneClientUtils.java      |   2 +
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  15 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   2 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   3 +-
 .../rpc/TestBlockOutputStreamWithFailures.java     |  11 +-
 .../client/rpc/TestContainerStateMachine.java      |   8 +-
 .../rpc/TestContainerStateMachineFailures.java     |   8 +-
 .../rpc/TestOzoneClientRetriesOnException.java     | 213 +++++++++++++++++++++
 .../ozone/container/ContainerTestHelper.java       |  82 +++++++-
 hadoop-ozone/pom.xml                               |   2 +-
 13 files changed, 359 insertions(+), 43 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index cfbb6ae..a8ead77 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -55,6 +55,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
@@ -100,7 +101,7 @@ public class BlockOutputStream extends OutputStream {
   // The IOException will be set by response handling thread in case there is an
   // exception received in the response. If the exception is set, the next
   // request will fail upfront.
-  private IOException ioException;
+  private AtomicReference<IOException> ioException;
   private ExecutorService responseExecutor;
 
   // the effective length of data flushed so far
@@ -187,6 +188,7 @@ public class BlockOutputStream extends OutputStream {
     writtenDataLength = 0;
     failedServers = Collections.emptyList();
     bufferList = null;
+    ioException = new AtomicReference<>(null);
   }
 
 
@@ -221,9 +223,8 @@ public class BlockOutputStream extends OutputStream {
     return bufferPool;
   }
 
-  @VisibleForTesting
   public IOException getIoException() {
-    return ioException;
+    return ioException.get();
   }
 
   @VisibleForTesting
@@ -372,10 +373,9 @@ public class BlockOutputStream extends OutputStream {
         waitOnFlushFutures();
       }
     } catch (InterruptedException | ExecutionException e) {
-      ioException = new IOException(
-          "Unexpected Storage Container Exception: " + e.toString(), e);
+      setIoException(e);
       adjustBuffersOnException();
-      throw ioException;
+      throw getIoException();
     }
     if (!commitIndex2flushedDataMap.isEmpty()) {
       watchForCommit(
@@ -430,9 +430,9 @@ public class BlockOutputStream extends OutputStream {
       adjustBuffers(index);
     } catch (TimeoutException | InterruptedException | ExecutionException e) {
       LOG.warn("watchForCommit failed for index " + commitIndex, e);
+      setIoException(e);
       adjustBuffersOnException();
-      throw new IOException(
-          "Unexpected Storage Container Exception: " + e.toString(), e);
+      throw getIoException();
     }
   }
 
@@ -461,7 +461,7 @@ public class BlockOutputStream extends OutputStream {
           throw new CompletionException(sce);
         }
         // if the ioException is not set, putBlock is successful
-        if (ioException == null) {
+        if (getIoException() == null) {
           BlockID responseBlockID = BlockID.getFromProtobuf(
               e.getPutBlock().getCommittedBlockLength().getBlockID());
           Preconditions.checkState(blockID.getContainerBlockID()
@@ -505,10 +505,9 @@ public class BlockOutputStream extends OutputStream {
       } catch (InterruptedException | ExecutionException e) {
         // just set the exception here as well in order to maintain sanctity of
         // ioException field
-        ioException = new IOException(
-            "Unexpected Storage Container Exception: " + e.toString(), e);
+        setIoException(e);
         adjustBuffersOnException();
-        throw ioException;
+        throw getIoException();
       }
     }
   }
@@ -580,10 +579,9 @@ public class BlockOutputStream extends OutputStream {
       try {
         handleFlush();
       } catch (InterruptedException | ExecutionException e) {
-        ioException = new IOException(
-            "Unexpected Storage Container Exception: " + e.toString(), e);
+        setIoException(e);
         adjustBuffersOnException();
-        throw ioException;
+        throw getIoException();
       } finally {
         cleanup(false);
       }
@@ -611,8 +609,9 @@ public class BlockOutputStream extends OutputStream {
       // if the ioException is already set, it means a prev request has failed
       // just throw the exception. The current operation will fail with the
       // original error
-      if (ioException != null) {
-        throw ioException;
+      IOException exception = getIoException();
+      if (exception != null) {
+        throw exception;
       }
       ContainerProtocolCalls.validateContainerResponse(responseProto);
     } catch (StorageContainerException sce) {
@@ -622,10 +621,12 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
+
   private void setIoException(Exception e) {
-    if (ioException != null) {
-      ioException =  new IOException(
+    if (getIoException() == null) {
+      IOException exception =  new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
+      ioException.compareAndSet(null, exception);
     }
   }
 
@@ -659,9 +660,9 @@ public class BlockOutputStream extends OutputStream {
   private void checkOpen() throws IOException {
     if (xceiverClient == null) {
       throw new IOException("BlockOutputStream has been closed.");
-    } else if (ioException != null) {
+    } else if (getIoException() != null) {
       adjustBuffersOnException();
-      throw ioException;
+      throw getIoException();
     }
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 8f09ff2..a542191 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -572,6 +572,17 @@ public final class XceiverServerRatis extends XceiverServer {
     }
   }
 
+  @VisibleForTesting
+  public List<PipelineID> getPipelineIds() {
+    Iterable<RaftGroupId> gids = server.getGroupIds();
+    List<PipelineID> pipelineIDs = new ArrayList<>();
+    for (RaftGroupId groupId : gids) {
+      pipelineIDs.add(PipelineID.valueOf(groupId.getUuid()));
+      LOG.info("pipeline id {}", PipelineID.valueOf(groupId.getUuid()));
+    }
+    return pipelineIDs;
+  }
+
   void handleNodeSlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(group.getGroupId(), roleInfoProto);
   }
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index 32b2c03..1110fb8 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -46,7 +46,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.4.0-1fc5ace-SNAPSHOT</ratis.version>
+    <ratis.version>0.4.0-8fed368-SNAPSHOT</ratis.version>
 
     <bouncycastle.version>1.60</bouncycastle.version>
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index 012a225..9e9bb39 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.rest.response.*;
 import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 
 import java.util.ArrayList;
@@ -43,6 +44,7 @@ public final class OzoneClientUtils {
         add(ContainerNotOpenException.class);
         add(RaftRetryFailureException.class);
         add(AlreadyClosedException.class);
+        add(GroupMismatchException.class);
       }};
   /**
    * Returns a BucketInfo object constructed using fields of the input
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index c04105c..0d9529f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -423,8 +424,8 @@ public class KeyOutputStream extends OutputStream {
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     long bufferedDataLen = computeBufferData();
     LOG.warn("Encountered exception {}. The last committed block length is {}, "
-            + "uncommitted data length is {}", exception,
-        totalSuccessfulFlushedData, bufferedDataLen);
+            + "uncommitted data length is {} retry count {}", exception,
+        totalSuccessfulFlushedData, bufferedDataLen, retryCount);
     Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
     Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen);
     long containerId = streamEntry.getBlockID().getContainerID();
@@ -435,7 +436,8 @@ public class KeyOutputStream extends OutputStream {
     }
     if (closedContainerException) {
       excludeList.addConatinerId(ContainerID.valueof(containerId));
-    } else if (retryFailure || t instanceof TimeoutException) {
+    } else if (retryFailure || t instanceof TimeoutException
+        || t instanceof GroupMismatchException) {
       pipelineId = streamEntry.getPipeline().getId();
       excludeList.addPipeline(pipelineId);
     }
@@ -482,11 +484,12 @@ public class KeyOutputStream extends OutputStream {
       throw e instanceof IOException ? (IOException) e : new IOException(e);
     }
     if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+      String msg = "";
       if (action.reason != null) {
-        LOG.error("Retry request failed. " + action.reason,
-            exception);
+        msg = "Retry request failed. " + action.reason;
+        LOG.error(msg, exception);
       }
-      throw exception;
+      throw new IOException(msg, exception);
     }
 
     // Throw the exception if the thread is interrupted
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index e94f0ac..8c40aa3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -181,6 +181,8 @@ public interface MiniOzoneCluster {
   void restartHddsDatanode(int i, boolean waitForDatanode)
       throws InterruptedException, TimeoutException;
 
+  int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException;
+
   /**
    * Restart a particular HddsDatanode.
    *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 5cd0841..8018bab 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -192,7 +192,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
     return hddsDatanodes;
   }
 
-  private int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException {
+  @Override
+  public int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException {
     for (HddsDatanodeService service : hddsDatanodes) {
       if (service.getDatanodeDetails().equals(dn)) {
         return hddsDatanodes.indexOf(service);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 54cdff0..f228dad 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -35,7 +36,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -510,7 +511,7 @@ public class TestBlockOutputStreamWithFailures {
     // and one flush for partial chunk
     key.flush();
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
-        .getIoException()) instanceof AlreadyClosedException);
+        .getIoException()) instanceof RaftRetryFailureException);
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // now close the stream, It will update the ack length after watchForCommit
@@ -1041,7 +1042,7 @@ public class TestBlockOutputStreamWithFailures {
     key.flush();
 
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
-        .getIoException()) instanceof AlreadyClosedException);
+        .getIoException()) instanceof RaftRetryFailureException);
     Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
@@ -1183,7 +1184,7 @@ public class TestBlockOutputStreamWithFailures {
     key.flush();
 
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
-        .getIoException()) instanceof AlreadyClosedException);
+        .getIoException()) instanceof RaftRetryFailureException);
 
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 2c3cfab..13e3eff 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -139,7 +139,13 @@ public class TestContainerStateMachine {
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
 
-    key.close();
+    try {
+      key.close();
+      Assert.fail();
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe.getMessage().contains(
+          "Requested operation not allowed as ContainerState is UNHEALTHY"));
+    }
     // Make sure the container is marked unhealthy
     Assert.assertTrue(
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 68f1ecc..a8b7295 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -142,7 +142,13 @@ public class TestContainerStateMachineFailures {
             .getContainer().getContainerSet()
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
-    key.close();
+    try {
+      key.close();
+      Assert.fail();
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe.getMessage().contains(
+          "Requested operation not allowed as ContainerState is UNHEALTHY"));
+    }
     long containerID = omKeyLocationInfo.getContainerID();
 
     // Make sure the container is marked unhealthy
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
new file mode 100644
index 0000000..381cf14
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests failure detection and handling in BlockOutputStream Class.
+ */
+public class TestOzoneClientRetriesOnException {
+
+  private static MiniOzoneCluster cluster;
+  private OzoneConfiguration conf = new OzoneConfiguration();
+  private OzoneClient client;
+  private ObjectStore objectStore;
+  private int chunkSize;
+  private int flushSize;
+  private int maxFlushSize;
+  private int blockSize;
+  private String volumeName;
+  private String bucketName;
+  private String keyString;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    chunkSize = 100;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
+    conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s");
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
+    conf.setQuietMode(false);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(7)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "testblockoutputstreamwithretries";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  private String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testGroupMismatchExceptionHandling() throws Exception {
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    long containerID =
+        keyOutputStream.getStreamEntries().get(0).getBlockID().getContainerID();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerID));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    ContainerTestHelper.waitForPipelineClose(key, cluster, true);
+    key.flush();
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+        .getIoException()) instanceof GroupMismatchException);
+    Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
+        .contains(pipeline.getId()));
+    key.close();
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+    validateData(keyName, data1);
+  }
+
+  @Test
+  public void testMaxRetriesByOzoneClient() throws Exception {
+    String keyName = getKeyName();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, 4 * blockSize);
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    List<BlockOutputStreamEntry> entries = keyOutputStream.getStreamEntries();
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 4);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+
+    OutputStream stream = entries.get(0).getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+    List<PipelineID> pipelineList = new ArrayList<>();
+    long containerID;
+    for (BlockOutputStreamEntry entry : entries) {
+      containerID = entry.getBlockID().getContainerID();
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueof(containerID));
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
+      pipelineList.add(pipeline.getId());
+    }
+    ContainerTestHelper.waitForPipelineClose(key, cluster, false);
+    try {
+      key.write(data1);
+    } catch (IOException ioe) {
+      Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+          .getIoException()) instanceof GroupMismatchException);
+      Assert.assertTrue(ioe.getMessage().contains(
+          "Retry request failed. retries get failed due to exceeded maximum "
+              + "allowed retries number: 3"));
+    }
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+
+  private void validateData(String keyName, byte[] data) throws Exception {
+    ContainerTestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index 0b618a0..dc5e8b4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 
@@ -740,8 +742,76 @@ public final class ContainerTestHelper {
       containerIdList.add(info.getContainerID());
     }
     Assert.assertTrue(!containerIdList.isEmpty());
-    ContainerTestHelper
-        .waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+    waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));
+  }
+
+  public static void waitForPipelineClose(OzoneOutputStream outputStream,
+      MiniOzoneCluster cluster, boolean waitForContainerCreation)
+      throws Exception {
+    KeyOutputStream keyOutputStream =
+        (KeyOutputStream) outputStream.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        keyOutputStream.getLocationInfoList();
+    List<Long> containerIdList = new ArrayList<>();
+    for (OmKeyLocationInfo info : locationInfoList) {
+      containerIdList.add(info.getContainerID());
+    }
+    Assert.assertTrue(!containerIdList.isEmpty());
+    waitForPipelineClose(cluster, waitForContainerCreation,
+        containerIdList.toArray(new Long[0]));
+  }
+
+  public static void waitForPipelineClose(MiniOzoneCluster cluster,
+      boolean waitForContainerCreation, Long... containerIdList)
+      throws TimeoutException, InterruptedException, IOException {
+    List<Pipeline> pipelineList = new ArrayList<>();
+    for (long containerID : containerIdList) {
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueof(containerID));
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getPipelineManager()
+              .getPipeline(container.getPipelineID());
+      if (!pipelineList.contains(pipeline)) {
+        pipelineList.add(pipeline);
+      }
+      List<DatanodeDetails> datanodes = pipeline.getNodes();
+
+      if (waitForContainerCreation) {
+        for (DatanodeDetails details : datanodes) {
+          // Client will issue write chunk and it will create the container on
+          // datanodes.
+          // wait for the container to be created
+          GenericTestUtils
+              .waitFor(() -> isContainerPresent(cluster, containerID, details),
+                  500, 100 * 1000);
+          Assert.assertTrue(isContainerPresent(cluster, containerID, details));
+
+          // make sure the container gets created first
+          Assert.assertFalse(ContainerTestHelper
+              .isContainerClosed(cluster, containerID, details));
+        }
+      }
+    }
+    for (Pipeline pipeline1 : pipelineList) {
+      // issue pipeline destroy command
+      cluster.getStorageContainerManager().getPipelineManager()
+          .finalizeAndDestroyPipeline(pipeline1, false);
+    }
+
+    // wait for the pipeline to get destroyed in the datanodes
+    for (Pipeline pipeline : pipelineList) {
+      for (DatanodeDetails dn : pipeline.getNodes()) {
+        XceiverServerSpi server =
+            cluster.getHddsDatanodes().get(cluster.getHddsDatanodeIndex(dn))
+                .getDatanodeStateMachine().getContainer().getWriteChannel();
+        Assert.assertTrue(server instanceof XceiverServerRatis);
+        XceiverServerRatis raftServer = (XceiverServerRatis) server;
+        GenericTestUtils.waitFor(
+            () -> (!raftServer.getPipelineIds().contains(pipeline.getId())),
+            500, 100 * 1000);
+      }
+    }
   }
 
   public static void waitForContainerClose(MiniOzoneCluster cluster,
@@ -785,13 +855,13 @@ public final class ContainerTestHelper {
       // but not yet been used by the client. In such a case container is never
       // created.
       for (DatanodeDetails datanodeDetails : datanodes) {
-        GenericTestUtils.waitFor(() -> ContainerTestHelper
-                .isContainerClosed(cluster, containerID, datanodeDetails), 500,
+        GenericTestUtils.waitFor(
+            () -> isContainerClosed(cluster, containerID, datanodeDetails), 500,
             15 * 1000);
         //double check if it's really closed
         // (waitFor also throws an exception)
-        Assert.assertTrue(ContainerTestHelper
-            .isContainerClosed(cluster, containerID, datanodeDetails));
+        Assert.assertTrue(
+            isContainerClosed(cluster, containerID, datanodeDetails));
       }
       index++;
     }
diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml
index b243ccc..1149894 100644
--- a/hadoop-ozone/pom.xml
+++ b/hadoop-ozone/pom.xml
@@ -29,7 +29,7 @@
     <hadoop.version>3.2.0</hadoop.version>
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
     <ozone.version>0.5.0-SNAPSHOT</ozone.version>
-    <ratis.version>0.4.0-1fc5ace-SNAPSHOT</ratis.version>
+    <ratis.version>0.4.0-8fed368-SNAPSHOT</ratis.version>
     <bouncycastle.version>1.60</bouncycastle.version>
     <ozone.release>Crater Lake</ozone.release>
     <declared.ozone.version>${ozone.version}</declared.ozone.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org