You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2020/01/23 09:14:39 UTC

[hadoop-ozone] branch revert-472-HDDS-2920 created (now 145ae53)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a change to branch revert-472-HDDS-2920
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git.


      at 145ae53  Revert "HDDS-2920. Remove ozone ratis client specific config keys. (#472)"

This branch includes the following new commits:

     new 145ae53  Revert "HDDS-2920. Remove ozone ratis client specific config keys. (#472)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


[hadoop-ozone] 01/01: Revert "HDDS-2920. Remove ozone ratis client specific config keys. (#472)"

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch revert-472-HDDS-2920
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 145ae53b651405d8f7d8a6fef43309ed47878c3e
Author: bshashikant <sb...@hortonworks.com>
AuthorDate: Thu Jan 23 14:44:32 2020 +0530

    Revert "HDDS-2920. Remove ozone ratis client specific config keys. (#472)"
    
    This reverts commit ab557dba89e4666e8c2679fa9b5fbd95dfd45e0b.
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  2 +-
 .../hadoop/hdds/scm/XceiverClientManager.java      | 42 +--------------
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java | 29 ++++++++---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  6 ++-
 .../hadoop/hdds/scm/storage/CommitWatcher.java     |  8 ++-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  | 46 +++++++++++++----
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  7 ++-
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |  3 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   | 11 ++++
 .../common/src/main/resources/ozone-default.xml    | 16 ++++++
 .../ozone/client/io/BlockOutputStreamEntry.java    |  2 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  5 ++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  8 +++
 .../org/apache/hadoop/ozone/RatisTestHelper.java   |  9 +++-
 .../ozone/client/rpc/Test2WayCommitInRatis.java    |  4 +-
 .../ozone/client/rpc/TestBlockOutputStream.java    |  1 +
 .../rpc/TestBlockOutputStreamWithFailures.java     |  1 +
 .../rpc/TestCloseContainerHandlingByClient.java    |  1 +
 .../hadoop/ozone/client/rpc/TestCommitWatcher.java |  5 +-
 .../client/rpc/TestFailureHandlingByClient.java    |  2 +
 .../ozone/client/rpc/TestKeyInputStream.java       |  1 +
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |  2 +
 .../rpc/TestOzoneClientRetriesOnException.java     |  1 +
 .../hadoop/ozone/client/rpc/TestReadRetries.java   |  2 +-
 .../ozone/client/rpc/TestWatchForCommit.java       | 60 ++++++++++++++++++++--
 .../hadoop/ozone/freon/TestDataValidate.java       |  2 +
 .../ozone/freon/TestOzoneClientKeyGenerator.java   |  2 +
 .../hadoop/ozone/freon/TestRandomKeyGenerator.java |  2 +
 .../hadoop/ozone/freon/DatanodeChunkGenerator.java |  2 +-
 29 files changed, 206 insertions(+), 76 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 0ded84a..9a4da38 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -473,7 +473,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   }
 
   @Override
-  public XceiverClientReply watchForCommit(long index)
+  public XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
     // there is no notion of watch for commit index in standalone pipeline
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index dc3b215..d46456a 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -342,12 +342,11 @@ public class XceiverClientManager implements Closeable {
   /**
    * Configuration for ratis client.
    */
-  @ConfigGroup(prefix = "raft.client")
+  @ConfigGroup(prefix = "dfs.ratis.client")
   public static class DFSRatisClientConfig {
 
-    @Config(key = "async.outstanding-requests.max",
+    @Config(key = "async.max.outstanding.requests",
         defaultValue = "64",
-        type = ConfigType.INT,
         tags = {OZONE, CLIENT, PERFORMANCE},
         description =
             "Controls the maximum number of outstanding async requests that can"
@@ -362,43 +361,6 @@ public class XceiverClientManager implements Closeable {
     public void setMaxOutstandingRequests(int maxOutstandingRequests) {
       this.maxOutstandingRequests = maxOutstandingRequests;
     }
-
-    @Config(key = "rpc.request.timeout",
-        defaultValue = "60s",
-        type = ConfigType.TIME,
-        tags = {OZONE, CLIENT, PERFORMANCE},
-        description = "The timeout duration for ratis client request (except " +
-            "for watch request). It should be set greater than leader " +
-            "election timeout in Ratis."
-    )
-    private long requestTimeOut = 60 * 1000;
-
-    public long getRequestTimeOut() {
-      return requestTimeOut;
-    }
-
-    public void setRequestTimeOut(long requestTimeOut) {
-      this.requestTimeOut = requestTimeOut;
-    }
-
-    @Config(key = "watch.request.timeout",
-        defaultValue = "180s",
-        type = ConfigType.TIME,
-        tags = {OZONE, CLIENT, PERFORMANCE},
-        description = "The timeout duration for ratis client watch request. " +
-            "Timeout for the watch API in Ratis client to acknowledgea " +
-            "particular request getting replayed to all servers."
-    )
-    private long watchRequestTimeOut = 180 * 1000;
-
-    public long getWatchRequestTimeOut() {
-      return watchRequestTimeOut;
-    }
-
-    public void setWatchRequestTimeOut(long watchRequestTimeOut) {
-      this.watchRequestTimeOut = watchRequestTimeOut;
-    }
   }
 
-
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 0d12355..6f102d8 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -58,6 +59,7 @@ import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,19 +89,25 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     final String rpcType = ozoneConf
         .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
             ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final TimeDuration clientRequestTimeout =
+        RatisHelper.getClientRequestTimeout(ozoneConf);
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
     final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
     final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
         SecurityConfig(ozoneConf), caCert);
     return new XceiverClientRatis(pipeline,
-        SupportedRpcType.valueOfIgnoreCase(rpcType),
-        retryPolicy, tlsConfig, ozoneConf);
+        SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
+        retryPolicy, tlsConfig, clientRequestTimeout, ozoneConf);
   }
 
   private final Pipeline pipeline;
   private final RpcType rpcType;
   private final AtomicReference<RaftClient> client = new AtomicReference<>();
+  private final int maxOutstandingRequests;
   private final RetryPolicy retryPolicy;
   private final GrpcTlsConfig tlsConfig;
+  private final TimeDuration clientRequestTimeout;
   private final Configuration ozoneConfiguration;
 
   // Map to track commit index at every server
@@ -111,14 +119,17 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    * Constructs a client.
    */
   private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
-      RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+      int maxOutStandingChunks, RetryPolicy retryPolicy,
+      GrpcTlsConfig tlsConfig, TimeDuration timeout,
       Configuration configuration) {
     super();
     this.pipeline = pipeline;
     this.rpcType = rpcType;
+    this.maxOutstandingRequests = maxOutStandingChunks;
     this.retryPolicy = retryPolicy;
     commitInfoMap = new ConcurrentHashMap<>();
     this.tlsConfig = tlsConfig;
+    this.clientRequestTimeout = timeout;
     metrics = XceiverClientManager.getXceiverClientMetrics();
     this.ozoneConfiguration = configuration;
   }
@@ -170,7 +181,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
 
     if (!client.compareAndSet(null,
         RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
-            tlsConfig, ozoneConfiguration))) {
+            maxOutstandingRequests, tlsConfig, clientRequestTimeout,
+            ozoneConfiguration))) {
       throw new IllegalStateException("Client is already connected.");
     }
   }
@@ -244,7 +256,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   }
 
   @Override
-  public XceiverClientReply watchForCommit(long index)
+  public XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
     long commitIndex = getReplicatedMinCommitIndex();
@@ -255,11 +267,14 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       clientReply.setLogIndex(commitIndex);
       return clientReply;
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("commit index : {} watch timeout : {}", index, timeout);
+    }
     RaftClientReply reply;
     try {
       CompletableFuture<RaftClientReply> replyFuture = getClient()
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
-      replyFuture.get();
+      replyFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch (Exception e) {
       Throwable t = HddsClientUtils.checkForException(e);
       LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
@@ -268,7 +283,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       }
       reply = getClient()
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
-          .get();
+          .get(timeout, TimeUnit.MILLISECONDS);
       List<RaftProtos.CommitInfoProto> commitInfoProtoList =
           reply.getCommitInfos().stream()
               .filter(i -> i.getCommitIndex() < index)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 9131f5c..15aebe1 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -125,6 +125,7 @@ public class BlockOutputStream extends OutputStream {
    * @param bufferPool           pool of buffers
    * @param streamBufferFlushSize flush size
    * @param streamBufferMaxSize   max size of the currentBuffer
+   * @param watchTimeout          watch timeout
    * @param checksumType          checksum type
    * @param bytesPerChecksum      Bytes per checksum
    */
@@ -132,7 +133,8 @@ public class BlockOutputStream extends OutputStream {
   public BlockOutputStream(BlockID blockID,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
       int chunkSize, long streamBufferFlushSize, long streamBufferMaxSize,
-      BufferPool bufferPool, ChecksumType checksumType, int bytesPerChecksum)
+      long watchTimeout, BufferPool bufferPool, ChecksumType checksumType,
+      int bytesPerChecksum)
       throws IOException {
     this.blockID = new AtomicReference<>(blockID);
     this.chunkSize = chunkSize;
@@ -152,7 +154,7 @@ public class BlockOutputStream extends OutputStream {
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
-    commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
+    commitWatcher = new CommitWatcher(bufferPool, xceiverClient, watchTimeout);
     bufferList = null;
     totalDataFlushedLength = 0;
     writtenDataLength = 0;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 34d0d7c..ebcc6dc 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -71,13 +71,17 @@ public class CommitWatcher {
 
   private XceiverClientSpi xceiverClient;
 
+  private final long watchTimeout;
+
   // total data which has been successfully flushed and acknowledged
   // by all servers
   private long totalAckDataLength;
 
-  public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
+  public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient,
+      long watchTimeout) {
     this.bufferPool = bufferPool;
     this.xceiverClient = xceiverClient;
+    this.watchTimeout = watchTimeout;
     commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
     totalAckDataLength = 0;
     futureMap = new ConcurrentHashMap<>();
@@ -187,7 +191,7 @@ public class CommitWatcher {
     long index;
     try {
       XceiverClientReply reply =
-          xceiverClient.watchForCommit(commitIndex);
+          xceiverClient.watchForCommit(commitIndex, watchTimeout);
       if (reply == null) {
         index = 0;
       } else {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 75e07c3..98c36b6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcFactory;
@@ -148,12 +149,26 @@ public interface RatisHelper {
   }
 
   static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
-      RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+      RetryPolicy retryPolicy, int maxOutStandingRequest,
+      GrpcTlsConfig tlsConfig, TimeDuration timeout,
       Configuration ozoneConfiguration) throws IOException {
     return newRaftClient(rpcType,
         toRaftPeerId(pipeline.getLeaderNode()),
         newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
-            pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration);
+            pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig,
+        timeout, ozoneConfiguration);
+  }
+
+  static TimeDuration getClientRequestTimeout(Configuration conf) {
+    // Set the client requestTimeout
+    final TimeUnit timeUnit =
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+            .getUnit();
+    final long duration = conf.getTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+            .getDuration(), timeUnit);
+    return TimeDuration.valueOf(duration, timeUnit);
   }
 
   static RpcType getRpcType(Configuration conf) {
@@ -163,30 +178,35 @@ public interface RatisHelper {
   }
 
   static RaftClient newRaftClient(RaftPeer leader, Configuration conf) {
-    return newRaftClient(getRpcType(conf), leader,
-        RatisHelper.createRetryPolicy(conf), conf);
+    return newRaftClient(getRpcType(conf), leader, RetryPolicies.noRetry(),
+        GrpcConfigKeys.OutputStream.OUTSTANDING_APPENDS_MAX_DEFAULT,
+        getClientRequestTimeout(conf), conf);
   }
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
-      RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+      RetryPolicy retryPolicy, int maxOutstandingRequests,
+      GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout,
       Configuration configuration) {
     return newRaftClient(rpcType, leader.getId(),
         newRaftGroup(Collections.singletonList(leader)), retryPolicy,
-        tlsConfig, configuration);
+        maxOutstandingRequests, tlsConfig, clientRequestTimeout, configuration);
   }
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
-      RetryPolicy retryPolicy,
+      RetryPolicy retryPolicy, int maxOutstandingRequests,
+      TimeDuration clientRequestTimeout,
       Configuration ozoneConfiguration) {
     return newRaftClient(rpcType, leader.getId(),
-        newRaftGroup(Collections.singletonList(leader)), retryPolicy, null,
+        newRaftGroup(Collections.singletonList(leader)), retryPolicy,
+        maxOutstandingRequests, null, clientRequestTimeout,
         ozoneConfiguration);
   }
 
   @SuppressWarnings("checkstyle:ParameterNumber")
   static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
-      RaftGroup group, RetryPolicy retryPolicy,
-      GrpcTlsConfig tlsConfig, Configuration ozoneConfiguration) {
+      RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
+      GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout,
+      Configuration ozoneConfiguration) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("newRaftClient: {}, leader={}, group={}",
           rpcType, leader, group);
@@ -200,10 +220,16 @@ public interface RatisHelper {
     createRaftGrpcProperties(ozoneConfiguration, properties);
 
     RaftConfigKeys.Rpc.setType(properties, rpcType);
+    RaftClientConfigKeys.Rpc
+        .setRequestTimeout(properties, clientRequestTimeout);
 
     GrpcConfigKeys.setMessageSizeMax(properties,
         SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));
 
+    // set async max outstanding requests.
+    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties,
+        maxOutStandingRequest);
+
     RaftClient.Builder builder =  RaftClient.newBuilder()
         .setRaftGroup(group)
         .setLeaderId(leader)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index ab17a52..737add0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -114,7 +114,12 @@ public final class ScmConfigKeys {
       "dfs.container.ratis.leader.pending.bytes.limit";
   public static final String
       DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT = "1GB";
-  
+
+  public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
+      "dfs.ratis.client.request.timeout.duration";
+  public static final TimeDuration
+      DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT =
+      TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
   public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
       "dfs.ratis.client.request.max.retries";
   public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 3287777..f938448 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -170,6 +170,7 @@ public abstract class XceiverClientSpi implements Closeable {
   /**
    * Check if an specfic commitIndex is replicated to majority/all servers.
    * @param index index to watch for
+   * @param timeout timeout provided for the watch operation to complete
    * @return reply containing the min commit index replicated to all or majority
    *         servers in case of a failure
    * @throws InterruptedException
@@ -177,7 +178,7 @@ public abstract class XceiverClientSpi implements Closeable {
    * @throws TimeoutException
    * @throws IOException
    */
-  public abstract XceiverClientReply watchForCommit(long index)
+  public abstract XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException;
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index f2f4a6a..857f1de 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -134,6 +134,12 @@ public final class OzoneConfigKeys {
   public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT =
       "128MB";
 
+  public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT =
+      "ozone.client.watch.request.timeout";
+
+  public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
+      "30s";
+
   public static final String OZONE_CLIENT_MAX_RETRIES =
       "ozone.client.max.retries";
   public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 100;
@@ -263,6 +269,11 @@ public final class OzoneConfigKeys {
 
   public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
       "dfs.container.ratis.datanode.storage.dir";
+  public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
+      ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY;
+  public static final TimeDuration
+      DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT =
+      ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT;
   public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
       ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY;
   public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a49c198..c8682bd 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -253,6 +253,14 @@
     </description>
   </property>
   <property>
+    <name>dfs.ratis.client.request.timeout.duration</name>
+    <value>3s</value>
+    <tag>OZONE, RATIS, MANAGEMENT</tag>
+    <description>The timeout duration for ratis client request.It should be
+        set greater than leader election timeout in Ratis.
+    </description>
+  </property>
+  <property>
     <name>dfs.ratis.client.request.max.retries</name>
     <value>180</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
@@ -428,6 +436,14 @@
     </description>
   </property>
   <property>
+    <name>ozone.client.watch.request.timeout</name>
+    <value>30s</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>Timeout for the watch API in Ratis client to acknowledge
+      a particular request getting replayed to all servers.
+    </description>
+  </property>
+  <property>
     <name>ozone.client.max.retries</name>
     <value>100</value>
     <tag>OZONE, CLIENT</tag>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 4af792a..1aa10d8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -111,7 +111,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
       this.outputStream =
           new BlockOutputStream(blockID, xceiverClientManager,
               pipeline, chunkSize, streamBufferFlushSize,
-              streamBufferMaxSize, bufferPool, checksumType,
+              streamBufferMaxSize, watchTimeout, bufferPool, checksumType,
               bytesPerChecksum);
     }
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 7cf55d6..28916f9 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -586,6 +586,11 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
+    public Builder setWatchTimeout(long timeout) {
+      this.watchTimeout = timeout;
+      return this;
+    }
+
     public Builder setChecksumType(ChecksumType cType) {
       this.checksumType = cType;
       return this;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 2982e38..66b789f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -99,6 +99,7 @@ import java.net.URI;
 import java.security.InvalidKeyException;
 import java.security.SecureRandom;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -127,6 +128,7 @@ public class RpcClient implements ClientProtocol {
   private final long streamBufferFlushSize;
   private final long streamBufferMaxSize;
   private final long blockSize;
+  private final long watchTimeout;
   private final ClientId clientId = ClientId.randomId();
   private final int maxRetryCount;
   private final long retryInterval;
@@ -186,6 +188,10 @@ public class RpcClient implements ClientProtocol {
             StorageUnit.BYTES);
     blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
         OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
+    watchTimeout =
+        conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+            OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
 
     int configuredChecksumSize = (int) conf.getStorageSize(
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
@@ -884,6 +890,7 @@ public class RpcClient implements ClientProtocol {
             .setFactor(openKey.getKeyInfo().getFactor())
             .setStreamBufferFlushSize(streamBufferFlushSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
             .setBytesPerChecksum(bytesPerChecksum)
             .setChecksumType(checksumType)
@@ -1184,6 +1191,7 @@ public class RpcClient implements ClientProtocol {
             .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
             .setStreamBufferFlushSize(streamBufferFlushSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
             .setChecksumType(checksumType)
             .setBytesPerChecksum(bytesPerChecksum)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index 3f5d33e..f862fd2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
@@ -38,6 +39,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,8 +123,13 @@ public interface RatisTestHelper {
       RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
     final RaftPeer p = RatisHelper.toRaftPeer(dd);
     final OzoneConfiguration conf = new OzoneConfiguration();
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(conf);
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(conf);
     final RaftClient client =
-        newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), conf);
+        newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf),
+            maxOutstandingRequests, requestTimeout, conf);
     client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index fda6228..fd2cea3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -113,6 +113,8 @@ public class Test2WayCommitInRatis {
   @Test
   public void test2WayCommitForRetryfailure() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
     startCluster(conf);
     GenericTestUtils.LogCapturer logCapturer =
@@ -140,7 +142,7 @@ public class Test2WayCommitInRatis {
         .getCloseContainer(pipeline,
             container1.getContainerInfo().getContainerID()));
     reply.getResponse().get();
-    xceiverClient.watchForCommit(reply.getLogIndex());
+    xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
 
     // commitInfo Map will be reduced to 2 here
     Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 96226d8..2b41012 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -76,6 +76,7 @@ public class TestBlockOutputStream {
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 3f1d9ff..3cbe06d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -83,6 +83,7 @@ public class TestBlockOutputStreamWithFailures {
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index c2444c1..e18b222 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -85,6 +85,7 @@ public class TestCloseContainerHandlingByClient {
   public static void init() throws Exception {
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index eaceb04..8089ac3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -87,6 +87,7 @@ public class TestCommitWatcher {
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
@@ -138,7 +139,7 @@ public class TestCommitWatcher {
     Assert.assertEquals(1, xceiverClient.getRefcount());
     Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
     XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
-    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient);
+    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
     BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
     List<XceiverClientReply> replies = new ArrayList<>();
     long length = 0;
@@ -212,7 +213,7 @@ public class TestCommitWatcher {
     Assert.assertEquals(1, xceiverClient.getRefcount());
     Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
     XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
-    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient);
+    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
     BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
     List<XceiverClientReply> replies = new ArrayList<>();
     long length = 0;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index d1f2016..a7f5960 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -85,6 +85,8 @@ public class TestFailureHandlingByClient {
     conf = new OzoneConfiguration();
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5,
+        TimeUnit.SECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index bb7b6f0..d834350 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -75,6 +75,7 @@ public class TestKeyInputStream {
     flushSize = 4 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setQuietMode(false);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index f532f4d..5717f58 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -77,6 +77,8 @@ public class TestMultiBlockWritesWithDnFailures {
     conf = new OzoneConfiguration();
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5,
+        TimeUnit.SECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 0151c6e..6758f4f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -84,6 +84,7 @@ public class TestOzoneClientRetriesOnException {
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
    // conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
index 9f9d5af..1343a03 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
@@ -174,7 +174,7 @@ public class TestReadRetries {
     Assert.assertTrue(clientSpi instanceof XceiverClientRatis);
     XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi;
 
-    ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId());
+    ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000);
     // shutdown the datanode
     cluster.shutdownHddsDatanode(datanodeDetails);
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 645db6a..5808655 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -136,6 +136,8 @@ public class TestWatchForCommit {
     // and will be captured in keyOutputStream and the failover will happen
     // to a different block
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
     conf.setTimeDuration(
         OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
@@ -269,9 +271,53 @@ public class TestWatchForCommit {
   }
 
   @Test
+  public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
+        TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
+    startCluster(conf);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    ContainerWithPipeline container1 = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, OzoneConsts.OZONE);
+    XceiverClientSpi xceiverClient = clientManager
+        .acquireClient(container1.getPipeline());
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertEquals(container1.getPipeline(),
+        xceiverClient.getPipeline());
+    Pipeline pipeline = xceiverClient.getPipeline();
+    XceiverClientReply reply = xceiverClient.sendCommandAsync(
+        ContainerTestHelper.getCreateContainerRequest(
+            container1.getContainerInfo().getContainerID(),
+            xceiverClient.getPipeline()));
+    reply.getResponse().get();
+    long index = reply.getLogIndex();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+    try {
+      // just watch for a log index which in not updated in the commitInfo Map
+      // as well as there is no logIndex generate in Ratis.
+      // The basic idea here is just to test if its throws an exception.
+      xceiverClient
+          .watchForCommit(index + new Random().nextInt(100) + 10, 3000);
+      Assert.fail("expected exception not thrown");
+    } catch (Exception e) {
+      Assert.assertTrue(
+          HddsClientUtils.checkForException(e) instanceof TimeoutException);
+    }
+    // After releasing the xceiverClient, this connection should be closed
+    // and any container operations should fail
+    clientManager.releaseClient(xceiverClient, false);
+    shutdown();
+  }
+
+  @Test
   public void testWatchForCommitForRetryfailure() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+        100, TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
     startCluster(conf);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
     ContainerWithPipeline container1 = storageContainerLocationClient
@@ -297,7 +343,7 @@ public class TestWatchForCommit {
       // as well as there is no logIndex generate in Ratis.
       // The basic idea here is just to test if its throws an exception.
       xceiverClient
-          .watchForCommit(index + new Random().nextInt(100) + 10);
+          .watchForCommit(index + new Random().nextInt(100) + 10, 20000);
       Assert.fail("expected exception not thrown");
     } catch (Exception e) {
       Assert.assertTrue(e instanceof ExecutionException);
@@ -314,8 +360,9 @@ public class TestWatchForCommit {
   @Test
   public void test2WayCommitForTimeoutException() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
+        TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
-    conf.set("raft.client.watch.request.timeout", "3s");
     startCluster(conf);
     GenericTestUtils.LogCapturer logCapturer =
         GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
@@ -342,7 +389,7 @@ public class TestWatchForCommit {
         .getCloseContainer(pipeline,
             container1.getContainerInfo().getContainerID()));
     reply.getResponse().get();
-    xceiverClient.watchForCommit(reply.getLogIndex());
+    xceiverClient.watchForCommit(reply.getLogIndex(), 3000);
 
     // commitInfo Map will be reduced to 2 here
     Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
@@ -358,6 +405,8 @@ public class TestWatchForCommit {
   @Test
   public void testWatchForCommitForGroupMismatchException() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
 
     // mark the node stale early so that pipleline gets destroyed quickly
@@ -391,7 +440,8 @@ public class TestWatchForCommit {
       // as well as there is no logIndex generate in Ratis.
       // The basic idea here is just to test if its throws an exception.
       xceiverClient
-          .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10);
+          .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10,
+              20000);
       Assert.fail("Expected exception not thrown");
     } catch(Exception e) {
       Assert.assertTrue(HddsClientUtils
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index 7857e1f..fdcb822 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,6 +40,7 @@ public abstract class TestDataValidate {
    *
    */
   static void startCluster(OzoneConfiguration conf) throws Exception {
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
index bef3330..315d1ee 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.freon;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -67,6 +68,7 @@ public class TestOzoneClientKeyGenerator {
     if (conf == null) {
       conf = new OzoneConfiguration();
     }
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(5)
         .build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
index 218c570..45ea23d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,6 +45,7 @@ public class TestRandomKeyGenerator {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
   }
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
index 16973ac..c4c84cb 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
@@ -181,7 +181,7 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements
         XceiverClientReply xceiverClientReply =
             xceiverClientSpi.sendCommandAsync(request);
         xceiverClientSpi
-            .watchForCommit(xceiverClientReply.getLogIndex());
+            .watchForCommit(xceiverClientReply.getLogIndex(), 1000L);
 
       } else {
         xceiverClientSpi.sendCommand(request);


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org