You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/09/03 06:59:10 UTC

hadoop git commit: HDDS-263. Add retries in Ozone Client to handle BlockNotCommitted Exception. Contributed by Shashikant Banerjee.

Repository: hadoop
Updated Branches:
  refs/heads/trunk ff036e49f -> 873ef8ae8


HDDS-263. Add retries in Ozone Client to handle BlockNotCommitted Exception. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/873ef8ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/873ef8ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/873ef8ae

Branch: refs/heads/trunk
Commit: 873ef8ae81321325889c9d3a6939163e98fbf5bb
Parents: ff036e4
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Sep 3 12:26:34 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Sep 3 12:26:34 2018 +0530

----------------------------------------------------------------------
 .../helpers/BlockNotCommittedException.java     | 36 ++++++++
 .../scm/storage/ContainerProtocolCalls.java     |  5 ++
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  8 ++
 .../common/src/main/resources/ozone-default.xml | 16 ++++
 .../hadoop/ozone/client/OzoneClientUtils.java   | 28 ++++++
 .../ozone/client/io/ChunkGroupOutputStream.java | 89 +++++++++++++++----
 .../hadoop/ozone/client/rpc/RpcClient.java      |  5 ++
 .../rpc/TestCloseContainerHandlingByClient.java | 91 +++++++++++++++++---
 .../web/storage/DistributedStorageHandler.java  |  5 ++
 9 files changed, 254 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java
new file mode 100644
index 0000000..86f5a66
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/BlockNotCommittedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+
+/**
+ * Exceptions thrown when a block is yet to be committed on the datanode.
+ */
+public class BlockNotCommittedException extends StorageContainerException {
+
+  /**
+   * Constructs an {@code IOException} with the specified detail message.
+   *
+   * @param message The detail message (which is saved for later retrieval by
+   * the {@link #getMessage()} method)
+   */
+  public BlockNotCommittedException(String message) {
+    super(message, ContainerProtos.Result.BLOCK_NOT_COMMITTED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 1f2fafb..1d6a89d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .BlockNotCommittedException;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.common.helpers
@@ -420,6 +422,9 @@ public final class ContainerProtocolCalls  {
   ) throws StorageContainerException {
     if (response.getResult() == ContainerProtos.Result.SUCCESS) {
       return;
+    } else if (response.getResult()
+        == ContainerProtos.Result.BLOCK_NOT_COMMITTED) {
+      throw new BlockNotCommittedException(response.getMessage());
     }
     throw new StorageContainerException(
         response.getMessage(), response.getResult());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 6ad9085..8f53da5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -194,6 +194,14 @@ public final class OzoneConfigKeys {
   public static final int
       OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
 
+  public static final String OZONE_CLIENT_MAX_RETRIES =
+      "ozone.client.max.retries";
+  public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 50;
+
+  public static final String OZONE_CLIENT_RETRY_INTERVAL =
+      "ozone.client.retry.interval";
+  public static final String OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = "200ms";
+
   public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
       = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
   public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 6f296c6..a9fd10b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -244,6 +244,22 @@
     </description>
   </property>
   <property>
+    <name>ozone.client.max.retries</name>
+    <value>50</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>Maximum number of retries by Ozone Client on encountering
+      exception while fetching committed block length.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.retry.interval</name>
+    <value>200ms</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>Interval between retries by Ozone Client on encountering
+      exception while fetching committed block length.
+    </description>
+  </property>
+  <property>
     <name>ozone.client.protocol</name>
     <value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>
     <tag>OZONE, CLIENT, MANAGEMENT</tag>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index 0aaee31..5d57753 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -17,14 +17,23 @@
  */
 package org.apache.hadoop.ozone.client;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
 import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
 import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
 import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 /** A utility class for OzoneClient. */
 public final class OzoneClientUtils {
 
@@ -84,4 +93,23 @@ public final class OzoneClientUtils {
     keyInfo.setSize(key.getDataSize());
     return keyInfo;
   }
+
+  public static RetryPolicy createRetryPolicy(Configuration conf) {
+    int maxRetryCount =
+        conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
+            OZONE_CLIENT_MAX_RETRIES_DEFAULT);
+    long retryInterval = conf.getTimeDuration(OzoneConfigKeys.
+        OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys.
+        OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+    RetryPolicy basePolicy = RetryPolicies
+        .retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval,
+            TimeUnit.MILLISECONDS);
+    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
+        new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(BlockNotCommittedException.class, basePolicy);
+    RetryPolicy retryPolicy = RetryPolicies
+        .retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+            exceptionToPolicyMap);
+    return retryPolicy;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index c632df6..21406b5 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -46,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -63,7 +65,7 @@ import java.util.Optional;
  */
 public class ChunkGroupOutputStream extends OutputStream {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(ChunkGroupOutputStream.class);
 
   // array list's get(index) is O(1)
@@ -80,6 +82,7 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final String requestID;
   private boolean closed;
   private List<OmKeyLocationInfo> locationInfoList;
+  private final RetryPolicy retryPolicy;
   /**
    * A constructor for testing purpose only.
    */
@@ -95,6 +98,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     requestID = null;
     closed = false;
     locationInfoList = null;
+    retryPolicy = null;
   }
 
   /**
@@ -124,7 +128,7 @@ public class ChunkGroupOutputStream extends OutputStream {
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
       OzoneManagerProtocolClientSideTranslatorPB omClient,
       int chunkSize, String requestId, ReplicationFactor factor,
-      ReplicationType type) throws IOException {
+      ReplicationType type, RetryPolicy retryPolicy) throws IOException {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.byteOffset = 0;
@@ -143,6 +147,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     this.chunkSize = chunkSize;
     this.requestID = requestId;
     this.locationInfoList = new ArrayList<>();
+    this.retryPolicy = retryPolicy;
     LOG.debug("Expecting open key with one block, but got" +
         info.getKeyLocationVersions().size());
   }
@@ -305,6 +310,62 @@ public class ChunkGroupOutputStream extends OutputStream {
     }
   }
 
+  private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry)
+      throws IOException {
+    long blockLength;
+    ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
+    RetryPolicy.RetryAction action;
+    int numRetries = 0;
+
+    // TODO : At this point of time, we also need to allocate new blocks
+    // from a different container and may need to nullify
+    // all the remaining pre-allocated blocks in case they were
+    // pre-allocated on the same container which got closed now.This needs
+    // caching the closed container list on the client itself.
+    while (true) {
+      try {
+        responseProto = ContainerProtocolCalls
+            .getCommittedBlockLength(streamEntry.xceiverClient,
+                streamEntry.blockID, requestID);
+        blockLength = responseProto.getBlockLength();
+        return blockLength;
+      } catch (StorageContainerException sce) {
+        try {
+          action = retryPolicy.shouldRetry(sce, numRetries, 0, true);
+        } catch (Exception e) {
+          throw e instanceof IOException ? (IOException) e : new IOException(e);
+        }
+        if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+          if (action.reason != null) {
+            LOG.error(
+                "GetCommittedBlockLength request failed. " + action.reason,
+                sce);
+          }
+          throw sce;
+        }
+
+        // Throw the exception if the thread is interrupted
+        if (Thread.currentThread().isInterrupted()) {
+          LOG.warn("Interrupted while trying for connection");
+          throw sce;
+        }
+        Preconditions.checkArgument(
+            action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+        try {
+          Thread.sleep(action.delayMillis);
+        } catch (InterruptedException e) {
+          throw (IOException) new InterruptedIOException(
+              "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+              .initCause(e);
+        }
+        numRetries++;
+        LOG.trace("Retrying GetCommittedBlockLength request. Already tried "
+            + numRetries + " time(s); retry policy is " + retryPolicy);
+        continue;
+      }
+    }
+  }
+
   /**
    * It performs following actions :
    * a. Updates the committed length at datanode for the current stream in
@@ -317,15 +378,6 @@ public class ChunkGroupOutputStream extends OutputStream {
    */
   private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
       int streamIndex) throws IOException {
-    // TODO : If the block is still not committed and is in the
-    // pending openBlock Map, it will return BLOCK_NOT_COMMITTED
-    // exception. We should handle this by retrying the same operation
-    // n times and update the OzoneManager with the actual block length
-    // written. At this point of time, we also need to allocate new blocks
-    // from a different container and may need to nullify
-    // all the remaining pre-allocated blocks in case they were
-    // pre-allocated on the same container which got closed now.This needs
-    // caching the closed container list on the client itself.
     long committedLength = 0;
     ByteBuffer buffer = streamEntry.getBuffer();
     if (buffer == null) {
@@ -342,11 +394,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     // for this block associated with the stream here.
     if (streamEntry.currentPosition >= chunkSize
         || streamEntry.currentPosition != buffer.position()) {
-      ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
-          ContainerProtocolCalls
-              .getCommittedBlockLength(streamEntry.xceiverClient,
-                  streamEntry.blockID, requestID);
-      committedLength = responseProto.getBlockLength();
+      committedLength = getCommittedBlockLength(streamEntry);
       // update the length of the current stream
       locationInfoList.get(streamIndex).setLength(committedLength);
     }
@@ -481,6 +529,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     private String requestID;
     private ReplicationType type;
     private ReplicationFactor factor;
+    private RetryPolicy retryPolicy;
 
     public Builder setHandler(OpenKeySession handler) {
       this.openHandler = handler;
@@ -526,8 +575,14 @@ public class ChunkGroupOutputStream extends OutputStream {
 
     public ChunkGroupOutputStream build() throws IOException {
       return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
-          omClient, chunkSize, requestID, factor, type);
+          omClient, chunkSize, requestID, factor, type, retryPolicy);
+    }
+
+    public Builder setRetryPolicy(RetryPolicy rPolicy) {
+      this.retryPolicy = rPolicy;
+      return this;
     }
+
   }
 
   private static class ChunkOutputStreamEntry extends OutputStream {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index fc70514..387f41f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -37,6 +38,7 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
@@ -97,6 +99,7 @@ public class RpcClient implements ClientProtocol {
   private final UserGroupInformation ugi;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
+  private final RetryPolicy retryPolicy;
 
    /**
     * Creates RpcClient instance with the given configuration.
@@ -137,6 +140,7 @@ public class RpcClient implements ClientProtocol {
                 Client.getRpcTimeout(conf)));
 
     this.xceiverClientManager = new XceiverClientManager(conf);
+    retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
 
     int configuredChunkSize = conf.getInt(
         ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@@ -469,6 +473,7 @@ public class RpcClient implements ClientProtocol {
             .setRequestID(requestId)
             .setType(HddsProtos.ReplicationType.valueOf(type.toString()))
             .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
+            .setRetryPolicy(retryPolicy)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 50d7ec5..9f12633 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -22,6 +22,10 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.hdds.scm.container.common.helpers.
+    StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -45,6 +49,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.event.Level;
 
 import java.io.IOException;
 import java.security.MessageDigest;
@@ -52,6 +57,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
+import java.util.Random;
 
 /**
  * Tests Close Container Exception handling by Ozone Client.
@@ -67,6 +73,7 @@ public class TestCloseContainerHandlingByClient {
   private static String volumeName;
   private static String bucketName;
   private static String keyString;
+  private static int maxRetries;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -78,6 +85,9 @@ public class TestCloseContainerHandlingByClient {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    // generate a no between 1 to 10
+    maxRetries = new Random().nextInt(10);
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, maxRetries);
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
     conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
@@ -286,17 +296,8 @@ public class TestCloseContainerHandlingByClient {
 
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) outputStream.getOutputStream();
-    long clientId = groupOutputStream.getOpenID();
-    OMMetadataManager metadataManager =
-        cluster.getOzoneManager().getMetadataManager();
-    byte[] openKey =
-        metadataManager.getOpenKeyBytes(
-            volumeName, bucketName, keyName, clientId);
-    byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
-    OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
-        OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
     List<OmKeyLocationInfo> locationInfoList =
-        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+        getLocationInfos(groupOutputStream, keyName);
     List<Long> containerIdList = new ArrayList<>();
     List<Pipeline> pipelineList = new ArrayList<>();
     for (OmKeyLocationInfo info : locationInfoList) {
@@ -318,7 +319,6 @@ public class TestCloseContainerHandlingByClient {
                 new CloseContainerCommand(containerID, type, pipeline.getId()));
       }
     }
-
     int index = 0;
     for (long containerID : containerIdList) {
       Pipeline pipeline = pipelineList.get(index);
@@ -333,7 +333,6 @@ public class TestCloseContainerHandlingByClient {
       }
       index++;
     }
-
   }
 
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
@@ -345,6 +344,20 @@ public class TestCloseContainerHandlingByClient {
         .createKey(keyName, size, type, factor);
   }
 
+  private List<OmKeyLocationInfo> getLocationInfos(
+      ChunkGroupOutputStream groupOutputStream, String keyName)
+      throws IOException {
+    long clientId = groupOutputStream.getOpenID();
+    OMMetadataManager metadataManager =
+        cluster.getOzoneManager().getMetadataManager();
+    byte[] openKey = metadataManager
+        .getOpenKeyBytes(volumeName, bucketName, keyName, clientId);
+    byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
+    OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
+        OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
+    return keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+  }
+
   private void validateData(String keyName, byte[] data) throws Exception {
     byte[] readData = new byte[data.length];
     OzoneInputStream is =
@@ -399,4 +412,58 @@ public class TestCloseContainerHandlingByClient {
     dataString.concat(dataString);
     validateData(keyName, dataString.getBytes());
   }
+
+  @Test
+  public void testRetriesOnBlockNotCommittedException() throws Exception {
+    String keyName = "blockcommitexceptiontest";
+    OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    GenericTestUtils.setLogLevel(ChunkGroupOutputStream.LOG, Level.TRACE);
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(ChunkGroupOutputStream.LOG);
+
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    String dataString = fixedLengthString(keyString, (3 * chunkSize));
+    key.write(dataString.getBytes());
+    List<OmKeyLocationInfo> locationInfos =
+        getLocationInfos(groupOutputStream, keyName);
+    long containerID = locationInfos.get(0).getContainerID();
+    List<DatanodeDetails> datanodes =
+        cluster.getStorageContainerManager().getScmContainerManager()
+            .getContainerWithPipeline(containerID).getPipeline().getMachines();
+    Assert.assertEquals(1, datanodes.size());
+    // move the container on the datanode to Closing state, this will ensure
+    // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying
+    // to fetch the committed length
+    for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
+      if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) {
+        datanodeService.getDatanodeStateMachine().getContainer()
+            .getContainerSet().getContainer(containerID).getContainerData()
+            .setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
+      }
+    }
+    dataString = fixedLengthString(keyString, (chunkSize * 1 / 2));
+    key.write(dataString.getBytes());
+    try {
+      key.close();
+      Assert.fail("Expected Exception not thrown");
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe instanceof StorageContainerException);
+      Assert.assertTrue(((StorageContainerException) ioe).getResult()
+          == ContainerProtos.Result.BLOCK_NOT_COMMITTED);
+    }
+    // It should retry only for max retries times
+    for (int i = 1; i <= maxRetries; i++) {
+      Assert.assertTrue(logCapturer.getOutput()
+          .contains("Retrying GetCommittedBlockLength request"));
+      Assert.assertTrue(logCapturer.getOutput().contains("Already tried " + i));
+    }
+    Assert.assertTrue(logCapturer.getOutput()
+        .contains("GetCommittedBlockLength request failed."));
+    Assert.assertTrue(logCapturer.getOutput().contains(
+        "retries get failed due to exceeded maximum allowed retries number"
+            + ": " + maxRetries));
+    logCapturer.stopCapturing();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/873ef8ae/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index ec33990..0d62432 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.ozone.web.storage;
 import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -85,6 +87,7 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final boolean useRatis;
   private final HddsProtos.ReplicationType type;
   private final HddsProtos.ReplicationFactor factor;
+  private final RetryPolicy retryPolicy;
 
   /**
    * Creates a new DistributedStorageHandler.
@@ -119,6 +122,7 @@ public final class DistributedStorageHandler implements StorageHandler {
         OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
     groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
         OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
+    retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
     if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
       LOG.warn("The chunk size ({}) is not allowed to be more than"
               + " the maximum size ({}),"
@@ -420,6 +424,7 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setRequestID(args.getRequestID())
             .setType(xceiverClientManager.getType())
             .setFactor(xceiverClientManager.getFactor())
+            .setRetryPolicy(retryPolicy)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org