You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/15 10:12:10 UTC

[hadoop] branch trunk updated: HDDS-1098. Introduce Retry Policy in Ozone Client. Contributed by Shashikant Banerjee.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 155ab6d  HDDS-1098. Introduce Retry Policy in Ozone Client. Contributed by Shashikant Banerjee.
155ab6d is described below

commit 155ab6d5d8ab4a80019e65351572320502d8a510
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Fri Mar 15 15:40:59 2019 +0530

    HDDS-1098. Introduce Retry Policy in Ozone Client. Contributed by Shashikant Banerjee.
---
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |  19 ++-
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   4 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   5 +
 .../main/java/org/apache/ratis/RatisHelper.java    |  33 ++++-
 .../common/src/main/resources/ozone-default.xml    |  16 ++-
 .../transport/server/ratis/XceiverServerRatis.java |  18 +--
 .../TestCloseContainerCommandHandler.java          |   8 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |  10 +-
 .../hadoop/ozone/client/OzoneClientUtils.java      |  11 ++
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 149 +++++++++++++--------
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  22 +--
 .../org/apache/hadoop/ozone/RatisTestHelper.java   |   5 +-
 .../web/storage/DistributedStorageHandler.java     |  11 +-
 13 files changed, 206 insertions(+), 105 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 673a82b..65241bf 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -47,6 +47,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +75,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     final String rpcType = ozoneConf
         .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
             ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final TimeDuration clientRequestTimeout =
+        RatisHelper.getClientRequestTimeout(ozoneConf);
     final int maxOutstandingRequests =
         HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
     final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
@@ -81,7 +84,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
         SecurityConfig(ozoneConf));
     return new XceiverClientRatis(pipeline,
         SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
-        retryPolicy, tlsConfig);
+        retryPolicy, tlsConfig, clientRequestTimeout);
   }
 
   private final Pipeline pipeline;
@@ -90,6 +93,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   private final int maxOutstandingRequests;
   private final RetryPolicy retryPolicy;
   private final GrpcTlsConfig tlsConfig;
+  private final TimeDuration clientRequestTimeout;
 
   // Map to track commit index at every server
   private final ConcurrentHashMap<UUID, Long> commitInfoMap;
@@ -102,7 +106,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    */
   private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
       int maxOutStandingChunks, RetryPolicy retryPolicy,
-      GrpcTlsConfig tlsConfig) {
+      GrpcTlsConfig tlsConfig, TimeDuration timeout) {
     super();
     this.pipeline = pipeline;
     this.rpcType = rpcType;
@@ -111,6 +115,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     commitInfoMap = new ConcurrentHashMap<>();
     watchClient = null;
     this.tlsConfig = tlsConfig;
+    this.clientRequestTimeout = timeout;
   }
 
   private void updateCommitInfosMap(
@@ -160,7 +165,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     // requests to be handled by raft client
     if (!client.compareAndSet(null,
         RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
-            maxOutstandingRequests, tlsConfig))) {
+            maxOutstandingRequests, tlsConfig, clientRequestTimeout))) {
       throw new IllegalStateException("Client is already connected.");
     }
   }
@@ -243,7 +248,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     if (watchClient == null) {
       watchClient =
           RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
-              maxOutstandingRequests, tlsConfig);
+              maxOutstandingRequests, tlsConfig, clientRequestTimeout);
     }
     CompletableFuture<RaftClientReply> replyFuture = watchClient
         .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
@@ -260,9 +265,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       // TODO : need to remove the code to create the new RaftClient instance
       // here once the watch request bypassing sliding window in Raft Client
       // gets fixed.
-      watchClient = RatisHelper
-          .newRaftClient(rpcType, getPipeline(), retryPolicy,
-              maxOutstandingRequests, tlsConfig);
+      watchClient =
+          RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
+              maxOutstandingRequests, tlsConfig, clientRequestTimeout);
       reply = watchClient
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
           .get(timeout, TimeUnit.MILLISECONDS);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1f84ebe..4c67eb3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -121,12 +121,12 @@ public final class ScmConfigKeys {
       TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
   public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
       "dfs.ratis.client.request.max.retries";
-  public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
+  public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20;
   public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
       "dfs.ratis.client.request.retry.interval";
   public static final TimeDuration
       DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
-      TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+      TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
   public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
       "dfs.ratis.server.retry-cache.timeout.duration";
   public static final TimeDuration
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index dc3ebe5..1388d00 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -133,6 +133,11 @@ public final class OzoneConfigKeys {
   public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
       "30s";
 
+  public static final String OZONE_CLIENT_MAX_RETRIES =
+      "ozone.client.max.retries";
+  public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 5;
+
+
   // This defines the overall connection limit for the connection pool used in
   // RestClient.
   public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
index 3713d7a..a63d18a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcFactory;
@@ -134,33 +135,51 @@ public interface RatisHelper {
 
   static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
       RetryPolicy retryPolicy, int maxOutStandingRequest,
-      GrpcTlsConfig tlsConfig) throws IOException {
+      GrpcTlsConfig tlsConfig, TimeDuration timeout) throws IOException {
     return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()),
         newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
-        pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig);
+            pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig,
+        timeout);
+  }
+
+  static TimeDuration getClientRequestTimeout(Configuration conf) {
+    // Set the client requestTimeout
+    final TimeUnit timeUnit =
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+            .getUnit();
+    final long duration = conf.getTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+            .getDuration(), timeUnit);
+    final TimeDuration clientRequestTimeout =
+        TimeDuration.valueOf(duration, timeUnit);
+    return clientRequestTimeout;
   }
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
       RetryPolicy retryPolicy, int maxOutstandingRequests,
-      GrpcTlsConfig tlsConfig) {
+      GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
     return newRaftClient(rpcType, leader.getId(),
         newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
-        maxOutstandingRequests, tlsConfig);
+        maxOutstandingRequests, tlsConfig, clientRequestTimeout);
   }
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
-      RetryPolicy retryPolicy, int maxOutstandingRequests) {
+      RetryPolicy retryPolicy, int maxOutstandingRequests,
+      TimeDuration clientRequestTimeout) {
     return newRaftClient(rpcType, leader.getId(),
         newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
-        maxOutstandingRequests, null);
+        maxOutstandingRequests, null, clientRequestTimeout);
   }
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
       RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
-      GrpcTlsConfig tlsConfig) {
+      GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
     LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
     final RaftProperties properties = new RaftProperties();
     RaftConfigKeys.Rpc.setType(properties, rpcType);
+    RaftClientConfigKeys.Rpc
+        .setRequestTimeout(properties, clientRequestTimeout);
 
     GrpcConfigKeys.setMessageSizeMax(properties,
         SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9918b8b..e68e05e 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -231,17 +231,19 @@
     <name>dfs.ratis.client.request.timeout.duration</name>
     <value>3s</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
-    <description>The timeout duration for ratis client request.</description>
+    <description>The timeout duration for ratis client request.It should be
+        set greater than leader election timeout in Ratis.
+    </description>
   </property>
   <property>
     <name>dfs.ratis.client.request.max.retries</name>
-    <value>180</value>
+    <value>20</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <description>Number of retries for ratis client request.</description>
   </property>
   <property>
     <name>dfs.ratis.client.request.retry.interval</name>
-    <value>100ms</value>
+    <value>500ms</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <description>Interval between successive retries for a ratis client request.
     </description>
@@ -418,6 +420,14 @@
     </description>
   </property>
   <property>
+    <name>ozone.client.max.retries</name>
+    <value>5</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>Maximum number of retries by Ozone Client on encountering
+      exception while writing a key.
+    </description>
+  </property>
+  <property>
     <name>ozone.client.protocol</name>
     <value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>
     <tag>OZONE, CLIENT, MANAGEMENT</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index b96e00a..19e43b9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
 import io.opentracing.Scope;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcFactory;
@@ -176,7 +175,7 @@ public final class XceiverServerRatis extends XceiverServer {
         setRaftSegmentPreallocatedSize(conf, properties);
 
     // Set max write buffer size, which is the scm chunk size
-    final int maxChunkSize = setMaxWriteBuffer(conf, properties);
+    final int maxChunkSize = setMaxWriteBuffer(properties);
     TimeUnit timeUnit;
     long duration;
 
@@ -329,23 +328,10 @@ public final class XceiverServerRatis extends XceiverServer {
         .setRequestTimeout(properties, serverRequestTimeout);
   }
 
-  private int setMaxWriteBuffer(Configuration conf, RaftProperties properties) {
+  private int setMaxWriteBuffer(RaftProperties properties) {
     final int maxChunkSize = OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE;
     RaftServerConfigKeys.Log.setWriteBufferSize(properties,
         SizeInBytes.valueOf(maxChunkSize));
-
-    // Set the client requestTimeout
-    TimeUnit timeUnit =
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
-            .getUnit();
-    long duration = conf.getTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
-            .getDuration(), timeUnit);
-    final TimeDuration clientRequestTimeout =
-        TimeDuration.valueOf(duration, timeUnit);
-    RaftClientConfigKeys.Rpc
-        .setRequestTimeout(properties, clientRequestTimeout);
     return maxChunkSize;
   }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 7d8b3d6..16e0e9d 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
@@ -49,6 +50,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Test cases to verify CloseContainerCommandHandler in datanode.
@@ -289,8 +291,10 @@ public class TestCloseContainerCommandHandler {
     final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
         Collections.singleton(datanodeDetails));
     final int maxOutstandingRequests = 100;
-    final RaftClient client = RatisHelper.newRaftClient(SupportedRpcType.GRPC,
-        peer, retryPolicy, maxOutstandingRequests, null);
+    final RaftClient client = RatisHelper
+        .newRaftClient(SupportedRpcType.GRPC, peer, retryPolicy,
+            maxOutstandingRequests,
+            TimeDuration.valueOf(3, TimeUnit.SECONDS));
     Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess());
     Thread.sleep(2000);
     final ContainerID containerId = ContainerID.valueof(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 3b36add..0af34fb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedBiConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,9 +117,11 @@ final class RatisPipelineUtils {
         HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
     final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(
         new SecurityConfig(ozoneConf));
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(ozoneConf);
     RaftClient client = RatisHelper
         .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-            retryPolicy, maxOutstandingRequests, tlsConfig);
+            retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout);
     client
         .groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
   }
@@ -141,12 +144,13 @@ final class RatisPipelineUtils {
         HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
     final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
         SecurityConfig(ozoneConf));
-
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(ozoneConf);
     datanodes.parallelStream().forEach(d -> {
       final RaftPeer p = RatisHelper.toRaftPeer(d);
       try (RaftClient client = RatisHelper
           .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-              retryPolicy, maxOutstandingRequests, tlsConfig)) {
+              retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
         rpc.accept(client, p);
       } catch (IOException ioe) {
         String errMsg =
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index 9651518..012a225 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.rest.response.*;
 import org.apache.ratis.protocol.AlreadyClosedException;
@@ -27,6 +29,7 @@ import org.apache.ratis.protocol.RaftRetryFailureException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /** A utility class for OzoneClient. */
@@ -122,6 +125,14 @@ public final class OzoneClientUtils {
     return keyInfo;
   }
 
+  public static RetryPolicy createRetryPolicy(int maxRetryCount) {
+    // just retry without sleep
+    RetryPolicy retryPolicy = RetryPolicies
+        .retryUpToMaximumCountWithFixedSleep(maxRetryCount, 0,
+            TimeUnit.MILLISECONDS);
+    return retryPolicy;
+  }
+
   public static List<Class<? extends Exception>> getExceptionList() {
     return EXCEPTION_LIST;
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index d1acbe1..a379889 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -45,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
@@ -87,6 +90,8 @@ public class KeyOutputStream extends OutputStream {
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private FileEncryptionInfo feInfo;
   private ExcludeList excludeList;
+  private final RetryPolicy retryPolicy;
+  private int retryCount;
   /**
    * A constructor for testing purpose only.
    */
@@ -111,6 +116,8 @@ public class KeyOutputStream extends OutputStream {
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
     this.bytesPerChecksum = OzoneConfigKeys
         .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
+    this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+    retryCount = 0;
   }
 
   @VisibleForTesting
@@ -147,7 +154,7 @@ public class KeyOutputStream extends OutputStream {
       String requestId, ReplicationFactor factor, ReplicationType type,
       long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
       ChecksumType checksumType, int bytesPerChecksum,
-      String uploadID, int partNumber, boolean isMultipart) {
+      String uploadID, int partNumber, boolean isMultipart, int maxRetryCount) {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.omClient = omClient;
@@ -183,6 +190,8 @@ public class KeyOutputStream extends OutputStream {
     this.bufferPool =
         new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
     this.excludeList = new ExcludeList();
+    this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount);
+    this.retryCount = 0;
   }
 
   /**
@@ -308,23 +317,18 @@ public class KeyOutputStream extends OutputStream {
           current.write(b, off, writeLen);
         }
       } catch (IOException ioe) {
-        Throwable t = checkForException(ioe);
-        if (t != null) {
-          // for the current iteration, totalDataWritten - currentPos gives the
-          // amount of data already written to the buffer
-
-          // In the retryPath, the total data to be written will always be equal
-          // to or less than the max length of the buffer allocated.
-          // The len specified here is the combined sum of the data length of
-          // the buffers
-          Preconditions.checkState(!retry || len <= streamBufferMaxSize);
-          writeLen = retry ? (int) len :
-              (int) (current.getWrittenDataLength() - currentPos);
-          LOG.debug("writeLen {}, total len {}", writeLen, len);
-          handleException(current, currentStreamIndex, t);
-        } else {
-          throw ioe;
-        }
+        // for the current iteration, totalDataWritten - currentPos gives the
+        // amount of data already written to the buffer
+
+        // In the retryPath, the total data to be written will always be equal
+        // to or less than the max length of the buffer allocated.
+        // The len specified here is the combined sum of the data length of
+        // the buffers
+        Preconditions.checkState(!retry || len <= streamBufferMaxSize);
+        writeLen = retry ? (int) len :
+            (int) (current.getWrittenDataLength() - currentPos);
+        LOG.debug("writeLen {}, total len {}", writeLen, len);
+        handleException(current, currentStreamIndex, ioe);
       }
       if (current.getRemaining() <= 0) {
         // since the current block is already written close the stream.
@@ -390,7 +394,8 @@ public class KeyOutputStream extends OutputStream {
    * @throws IOException Throws IOException if Write fails
    */
   private void handleException(BlockOutputStreamEntry streamEntry,
-      int streamIndex, Throwable exception) throws IOException {
+      int streamIndex, IOException exception) throws IOException {
+    Throwable t = checkForException(exception);
     boolean retryFailure = checkForRetryFailure(exception);
     boolean closedContainerException = false;
     if (!retryFailure) {
@@ -413,9 +418,9 @@ public class KeyOutputStream extends OutputStream {
     if (!failedServers.isEmpty()) {
       excludeList.addDatanodes(failedServers);
     }
-    if (checkIfContainerIsClosed(exception)) {
+    if (checkIfContainerIsClosed(t)) {
       excludeList.addConatinerId(ContainerID.valueof(containerId));
-    } else if (retryFailure || exception instanceof TimeoutException) {
+    } else if (retryFailure || t instanceof TimeoutException) {
       pipelineId = streamEntry.getPipeline().getId();
       excludeList.addPipeline(pipelineId);
     }
@@ -425,7 +430,7 @@ public class KeyOutputStream extends OutputStream {
       // If the data is still cached in the underlying stream, we need to
       // allocate new block and write this data in the datanode.
       currentStreamIndex += 1;
-      handleWrite(null, 0, bufferedDataLen, true);
+      handleRetry(exception, bufferedDataLen);
     }
     if (totalSuccessfulFlushedData == 0) {
       streamEntries.remove(streamIndex);
@@ -448,6 +453,43 @@ public class KeyOutputStream extends OutputStream {
     }
   }
 
+  private void handleRetry(IOException exception, long len) throws IOException {
+    RetryPolicy.RetryAction action;
+    try {
+      action = retryPolicy
+          .shouldRetry(exception, retryCount, 0, true);
+    } catch (Exception e) {
+      throw e instanceof IOException ? (IOException) e : new IOException(e);
+    }
+    if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+      if (action.reason != null) {
+        LOG.error("Retry request failed. " + action.reason,
+            exception);
+      }
+      throw exception;
+    }
+
+    // Throw the exception if the thread is interrupted
+    if (Thread.currentThread().isInterrupted()) {
+      LOG.warn("Interrupted while trying for retry");
+      throw exception;
+    }
+    Preconditions.checkArgument(
+        action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+    if (action.delayMillis > 0) {
+      try {
+        Thread.sleep(action.delayMillis);
+      } catch (InterruptedException e) {
+        throw (IOException) new InterruptedIOException(
+            "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+            .initCause(e);
+      }
+    }
+    retryCount++;
+    LOG.trace("Retrying Write request. Already tried "
+        + retryCount + " time(s); retry policy is " + retryPolicy);
+    handleWrite(null, 0, len, true);
+  }
   /**
    * Checks if the provided exception signifies retry failure in ratis client.
    * In case of retry failure, ratis client throws RaftRetryFailureException
@@ -462,7 +504,7 @@ public class KeyOutputStream extends OutputStream {
     return t instanceof ContainerNotOpenException;
   }
 
-  private Throwable checkForException(IOException ioe) {
+  private Throwable checkForException(IOException ioe) throws IOException {
     Throwable t = ioe.getCause();
     while (t != null) {
       for (Class<? extends Exception> cls : OzoneClientUtils
@@ -473,7 +515,7 @@ public class KeyOutputStream extends OutputStream {
       }
       t = t.getCause();
     }
-    return null;
+    throw ioe;
   }
 
   private long getKeyLength() {
@@ -512,36 +554,30 @@ public class KeyOutputStream extends OutputStream {
     if (streamEntries.size() == 0) {
       return;
     }
-    int size = streamEntries.size();
-    int streamIndex =
-        currentStreamIndex >= size ? size - 1 : currentStreamIndex;
-    BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
-    if (entry != null) {
-      try {
-        Collection<DatanodeDetails> failedServers = entry.getFailedServers();
-
-        // failed servers can be null in case there is no data written in the
-        // stream
-        if (failedServers != null && !failedServers.isEmpty()) {
-          excludeList.addDatanodes(failedServers);
-        }
-        if (close) {
-          entry.close();
-        } else {
-          entry.flush();
-        }
-      } catch (IOException ioe) {
-        Throwable t = checkForException(ioe);
-        if (t != null) {
-          // This call will allocate a new streamEntry and write the Data.
-          // Close needs to be retried on the newly allocated streamEntry as
-          // as well.
-          handleException(entry, streamIndex, t);
-          handleFlushOrClose(close);
-        } else {
-          throw ioe;
+    while (true) {
+      int size = streamEntries.size();
+      int streamIndex =
+          currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+      BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
+      if (entry != null) {
+        try {
+          Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+          // failed servers can be null in case there is no data written in the
+          // stream
+          if (failedServers != null && !failedServers.isEmpty()) {
+            excludeList.addDatanodes(failedServers);
+          }
+          if (close) {
+            entry.close();
+          } else {
+            entry.flush();
+          }
+        } catch (IOException ioe) {
+          handleException(entry, streamIndex, ioe);
+          continue;
         }
       }
+      break;
     }
   }
 
@@ -616,6 +652,7 @@ public class KeyOutputStream extends OutputStream {
     private String multipartUploadID;
     private int multipartNumber;
     private boolean isMultipartKey;
+    private int maxRetryCount;
 
 
     public Builder setMultipartUploadID(String uploadID) {
@@ -704,11 +741,17 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
+    public Builder setMaxRetryCount(int maxCount) {
+      this.maxRetryCount = maxCount;
+      return this;
+    }
+
     public KeyOutputStream build() throws IOException {
       return new KeyOutputStream(openHandler, xceiverManager, scmClient,
           omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
           streamBufferMaxSize, blockSize, watchTimeout, checksumType,
-          bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey);
+          bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
+          maxRetryCount);
     }
   }
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 7fab650..d059582 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -123,6 +123,7 @@ public class RpcClient implements ClientProtocol {
   private final long blockSize;
   private final long watchTimeout;
   private final ClientId clientId = ClientId.randomId();
+  private final int maxRetryCount;
 
    /**
     * Creates RpcClient instance with the given configuration.
@@ -205,6 +206,9 @@ public class RpcClient implements ClientProtocol {
     this.verifyChecksum =
         conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
             OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
+    maxRetryCount =
+        conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
+            OZONE_CLIENT_MAX_RETRIES_DEFAULT);
   }
 
   private InetSocketAddress getScmAddressForClient() throws IOException {
@@ -594,7 +598,7 @@ public class RpcClient implements ClientProtocol {
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    KeyOutputStream groupOutputStream =
+    KeyOutputStream keyOutputStream =
         new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
@@ -610,20 +614,21 @@ public class RpcClient implements ClientProtocol {
             .setBlockSize(blockSize)
             .setChecksumType(checksumType)
             .setBytesPerChecksum(bytesPerChecksum)
+            .setMaxRetryCount(maxRetryCount)
             .build();
-    groupOutputStream.addPreallocateBlocks(
+    keyOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),
         openKey.getOpenVersion());
-    final FileEncryptionInfo feInfo = groupOutputStream
+    final FileEncryptionInfo feInfo = keyOutputStream
         .getFileEncryptionInfo();
     if (feInfo != null) {
       KeyProvider.KeyVersion decrypted = getDEK(feInfo);
       final CryptoOutputStream cryptoOut = new CryptoOutputStream(
-          groupOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo),
+          keyOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo),
           decrypted.getMaterial(), feInfo.getIV());
       return new OzoneOutputStream(cryptoOut);
     } else {
-      return new OzoneOutputStream(groupOutputStream);
+      return new OzoneOutputStream(keyOutputStream);
     }
   }
 
@@ -856,7 +861,7 @@ public class RpcClient implements ClientProtocol {
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    KeyOutputStream groupOutputStream =
+    KeyOutputStream keyOutputStream =
         new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
@@ -875,11 +880,12 @@ public class RpcClient implements ClientProtocol {
             .setMultipartNumber(partNumber)
             .setMultipartUploadID(uploadID)
             .setIsMultipartKey(true)
+            .setMaxRetryCount(maxRetryCount)
             .build();
-    groupOutputStream.addPreallocateBlocks(
+    keyOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),
         openKey.getOpenVersion());
-    return new OzoneOutputStream(groupOutputStream);
+    return new OzoneOutputStream(keyOutputStream);
   }
 
   @Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index 322339b..da06c59 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -31,6 +31,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,9 +127,11 @@ public interface RatisTestHelper {
     final OzoneConfiguration conf = new OzoneConfiguration();
     final int maxOutstandingRequests =
         HddsClientUtils.getMaxOutstandingRequests(conf);
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(conf);
     final RaftClient client =
         newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf),
-            maxOutstandingRequests);
+            maxOutstandingRequests, requestTimeout);
     client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
   }
 }
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index f0f8a60..6fff3e4 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -88,6 +88,7 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final ChecksumType checksumType;
   private final int bytesPerChecksum;
   private final boolean verifyChecksum;
+  private final int maxRetryCount;
 
   /**
    * Creates a new DistributedStorageHandler.
@@ -154,6 +155,9 @@ public final class DistributedStorageHandler implements StorageHandler {
     this.verifyChecksum =
         conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
             OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
+    this.maxRetryCount =
+        conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
+            OZONE_CLIENT_MAX_RETRIES_DEFAULT);
   }
 
   @Override
@@ -438,7 +442,7 @@ public final class DistributedStorageHandler implements StorageHandler {
         .build();
     // contact OM to allocate a block for key.
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
-    KeyOutputStream groupOutputStream =
+    KeyOutputStream keyOutputStream =
         new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
@@ -454,11 +458,12 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setWatchTimeout(watchTimeout)
             .setChecksumType(checksumType)
             .setBytesPerChecksum(bytesPerChecksum)
+            .setMaxRetryCount(maxRetryCount)
             .build();
-    groupOutputStream.addPreallocateBlocks(
+    keyOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),
         openKey.getOpenVersion());
-    return new OzoneOutputStream(groupOutputStream);
+    return new OzoneOutputStream(keyOutputStream);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org