You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2020/01/23 09:14:40 UTC
[hadoop-ozone] 01/01: Revert "HDDS-2920. Remove ozone ratis client
specific config keys. (#472)"
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch revert-472-HDDS-2920
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 145ae53b651405d8f7d8a6fef43309ed47878c3e
Author: bshashikant <sb...@hortonworks.com>
AuthorDate: Thu Jan 23 14:44:32 2020 +0530
Revert "HDDS-2920. Remove ozone ratis client specific config keys. (#472)"
This reverts commit ab557dba89e4666e8c2679fa9b5fbd95dfd45e0b.
---
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 2 +-
.../hadoop/hdds/scm/XceiverClientManager.java | 42 +--------------
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 29 ++++++++---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 6 ++-
.../hadoop/hdds/scm/storage/CommitWatcher.java | 8 ++-
.../org/apache/hadoop/hdds/ratis/RatisHelper.java | 46 +++++++++++++----
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 ++-
.../apache/hadoop/hdds/scm/XceiverClientSpi.java | 3 +-
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 11 ++++
.../common/src/main/resources/ozone-default.xml | 16 ++++++
.../ozone/client/io/BlockOutputStreamEntry.java | 2 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 5 ++
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 8 +++
.../org/apache/hadoop/ozone/RatisTestHelper.java | 9 +++-
.../ozone/client/rpc/Test2WayCommitInRatis.java | 4 +-
.../ozone/client/rpc/TestBlockOutputStream.java | 1 +
.../rpc/TestBlockOutputStreamWithFailures.java | 1 +
.../rpc/TestCloseContainerHandlingByClient.java | 1 +
.../hadoop/ozone/client/rpc/TestCommitWatcher.java | 5 +-
.../client/rpc/TestFailureHandlingByClient.java | 2 +
.../ozone/client/rpc/TestKeyInputStream.java | 1 +
.../rpc/TestMultiBlockWritesWithDnFailures.java | 2 +
.../rpc/TestOzoneClientRetriesOnException.java | 1 +
.../hadoop/ozone/client/rpc/TestReadRetries.java | 2 +-
.../ozone/client/rpc/TestWatchForCommit.java | 60 ++++++++++++++++++++--
.../hadoop/ozone/freon/TestDataValidate.java | 2 +
.../ozone/freon/TestOzoneClientKeyGenerator.java | 2 +
.../hadoop/ozone/freon/TestRandomKeyGenerator.java | 2 +
.../hadoop/ozone/freon/DatanodeChunkGenerator.java | 2 +-
29 files changed, 206 insertions(+), 76 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 0ded84a..9a4da38 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -473,7 +473,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
}
@Override
- public XceiverClientReply watchForCommit(long index)
+ public XceiverClientReply watchForCommit(long index, long timeout)
throws InterruptedException, ExecutionException, TimeoutException,
IOException {
// there is no notion of watch for commit index in standalone pipeline
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index dc3b215..d46456a 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -342,12 +342,11 @@ public class XceiverClientManager implements Closeable {
/**
* Configuration for ratis client.
*/
- @ConfigGroup(prefix = "raft.client")
+ @ConfigGroup(prefix = "dfs.ratis.client")
public static class DFSRatisClientConfig {
- @Config(key = "async.outstanding-requests.max",
+ @Config(key = "async.max.outstanding.requests",
defaultValue = "64",
- type = ConfigType.INT,
tags = {OZONE, CLIENT, PERFORMANCE},
description =
"Controls the maximum number of outstanding async requests that can"
@@ -362,43 +361,6 @@ public class XceiverClientManager implements Closeable {
public void setMaxOutstandingRequests(int maxOutstandingRequests) {
this.maxOutstandingRequests = maxOutstandingRequests;
}
-
- @Config(key = "rpc.request.timeout",
- defaultValue = "60s",
- type = ConfigType.TIME,
- tags = {OZONE, CLIENT, PERFORMANCE},
- description = "The timeout duration for ratis client request (except " +
- "for watch request). It should be set greater than leader " +
- "election timeout in Ratis."
- )
- private long requestTimeOut = 60 * 1000;
-
- public long getRequestTimeOut() {
- return requestTimeOut;
- }
-
- public void setRequestTimeOut(long requestTimeOut) {
- this.requestTimeOut = requestTimeOut;
- }
-
- @Config(key = "watch.request.timeout",
- defaultValue = "180s",
- type = ConfigType.TIME,
- tags = {OZONE, CLIENT, PERFORMANCE},
- description = "The timeout duration for ratis client watch request. " +
- "Timeout for the watch API in Ratis client to acknowledgea " +
- "particular request getting replayed to all servers."
- )
- private long watchRequestTimeOut = 180 * 1000;
-
- public long getWatchRequestTimeOut() {
- return watchRequestTimeOut;
- }
-
- public void setWatchRequestTimeOut(long watchRequestTimeOut) {
- this.watchRequestTimeOut = watchRequestTimeOut;
- }
}
-
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 0d12355..6f102d8 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -58,6 +59,7 @@ import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,19 +89,25 @@ public final class XceiverClientRatis extends XceiverClientSpi {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+ final TimeDuration clientRequestTimeout =
+ RatisHelper.getClientRequestTimeout(ozoneConf);
+ final int maxOutstandingRequests =
+ HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf), caCert);
return new XceiverClientRatis(pipeline,
- SupportedRpcType.valueOfIgnoreCase(rpcType),
- retryPolicy, tlsConfig, ozoneConf);
+ SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
+ retryPolicy, tlsConfig, clientRequestTimeout, ozoneConf);
}
private final Pipeline pipeline;
private final RpcType rpcType;
private final AtomicReference<RaftClient> client = new AtomicReference<>();
+ private final int maxOutstandingRequests;
private final RetryPolicy retryPolicy;
private final GrpcTlsConfig tlsConfig;
+ private final TimeDuration clientRequestTimeout;
private final Configuration ozoneConfiguration;
// Map to track commit index at every server
@@ -111,14 +119,17 @@ public final class XceiverClientRatis extends XceiverClientSpi {
* Constructs a client.
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
- RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+ int maxOutStandingChunks, RetryPolicy retryPolicy,
+ GrpcTlsConfig tlsConfig, TimeDuration timeout,
Configuration configuration) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
+ this.maxOutstandingRequests = maxOutStandingChunks;
this.retryPolicy = retryPolicy;
commitInfoMap = new ConcurrentHashMap<>();
this.tlsConfig = tlsConfig;
+ this.clientRequestTimeout = timeout;
metrics = XceiverClientManager.getXceiverClientMetrics();
this.ozoneConfiguration = configuration;
}
@@ -170,7 +181,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
- tlsConfig, ozoneConfiguration))) {
+ maxOutstandingRequests, tlsConfig, clientRequestTimeout,
+ ozoneConfiguration))) {
throw new IllegalStateException("Client is already connected.");
}
}
@@ -244,7 +256,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
}
@Override
- public XceiverClientReply watchForCommit(long index)
+ public XceiverClientReply watchForCommit(long index, long timeout)
throws InterruptedException, ExecutionException, TimeoutException,
IOException {
long commitIndex = getReplicatedMinCommitIndex();
@@ -255,11 +267,14 @@ public final class XceiverClientRatis extends XceiverClientSpi {
clientReply.setLogIndex(commitIndex);
return clientReply;
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("commit index : {} watch timeout : {}", index, timeout);
+ }
RaftClientReply reply;
try {
CompletableFuture<RaftClientReply> replyFuture = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
- replyFuture.get();
+ replyFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Throwable t = HddsClientUtils.checkForException(e);
LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
@@ -268,7 +283,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
}
reply = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
- .get();
+ .get(timeout, TimeUnit.MILLISECONDS);
List<RaftProtos.CommitInfoProto> commitInfoProtoList =
reply.getCommitInfos().stream()
.filter(i -> i.getCommitIndex() < index)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 9131f5c..15aebe1 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -125,6 +125,7 @@ public class BlockOutputStream extends OutputStream {
* @param bufferPool pool of buffers
* @param streamBufferFlushSize flush size
* @param streamBufferMaxSize max size of the currentBuffer
+ * @param watchTimeout watch timeout
* @param checksumType checksum type
* @param bytesPerChecksum Bytes per checksum
*/
@@ -132,7 +133,8 @@ public class BlockOutputStream extends OutputStream {
public BlockOutputStream(BlockID blockID,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
int chunkSize, long streamBufferFlushSize, long streamBufferMaxSize,
- BufferPool bufferPool, ChecksumType checksumType, int bytesPerChecksum)
+ long watchTimeout, BufferPool bufferPool, ChecksumType checksumType,
+ int bytesPerChecksum)
throws IOException {
this.blockID = new AtomicReference<>(blockID);
this.chunkSize = chunkSize;
@@ -152,7 +154,7 @@ public class BlockOutputStream extends OutputStream {
// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
- commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
+ commitWatcher = new CommitWatcher(bufferPool, xceiverClient, watchTimeout);
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 34d0d7c..ebcc6dc 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -71,13 +71,17 @@ public class CommitWatcher {
private XceiverClientSpi xceiverClient;
+ private final long watchTimeout;
+
// total data which has been successfully flushed and acknowledged
// by all servers
private long totalAckDataLength;
- public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
+ public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient,
+ long watchTimeout) {
this.bufferPool = bufferPool;
this.xceiverClient = xceiverClient;
+ this.watchTimeout = watchTimeout;
commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
totalAckDataLength = 0;
futureMap = new ConcurrentHashMap<>();
@@ -187,7 +191,7 @@ public class CommitWatcher {
long index;
try {
XceiverClientReply reply =
- xceiverClient.watchForCommit(commitIndex);
+ xceiverClient.watchForCommit(commitIndex, watchTimeout);
if (reply == null) {
index = 0;
} else {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 75e07c3..98c36b6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
@@ -148,12 +149,26 @@ public interface RatisHelper {
}
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
- RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+ RetryPolicy retryPolicy, int maxOutStandingRequest,
+ GrpcTlsConfig tlsConfig, TimeDuration timeout,
Configuration ozoneConfiguration) throws IOException {
return newRaftClient(rpcType,
toRaftPeerId(pipeline.getLeaderNode()),
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
- pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration);
+ pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig,
+ timeout, ozoneConfiguration);
+ }
+
+ static TimeDuration getClientRequestTimeout(Configuration conf) {
+ // Set the client requestTimeout
+ final TimeUnit timeUnit =
+ OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+ .getUnit();
+ final long duration = conf.getTimeDuration(
+ OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
+ OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+ .getDuration(), timeUnit);
+ return TimeDuration.valueOf(duration, timeUnit);
}
static RpcType getRpcType(Configuration conf) {
@@ -163,30 +178,35 @@ public interface RatisHelper {
}
static RaftClient newRaftClient(RaftPeer leader, Configuration conf) {
- return newRaftClient(getRpcType(conf), leader,
- RatisHelper.createRetryPolicy(conf), conf);
+ return newRaftClient(getRpcType(conf), leader, RetryPolicies.noRetry(),
+ GrpcConfigKeys.OutputStream.OUTSTANDING_APPENDS_MAX_DEFAULT,
+ getClientRequestTimeout(conf), conf);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
- RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+ RetryPolicy retryPolicy, int maxOutstandingRequests,
+ GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout,
Configuration configuration) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(Collections.singletonList(leader)), retryPolicy,
- tlsConfig, configuration);
+ maxOutstandingRequests, tlsConfig, clientRequestTimeout, configuration);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
- RetryPolicy retryPolicy,
+ RetryPolicy retryPolicy, int maxOutstandingRequests,
+ TimeDuration clientRequestTimeout,
Configuration ozoneConfiguration) {
return newRaftClient(rpcType, leader.getId(),
- newRaftGroup(Collections.singletonList(leader)), retryPolicy, null,
+ newRaftGroup(Collections.singletonList(leader)), retryPolicy,
+ maxOutstandingRequests, null, clientRequestTimeout,
ozoneConfiguration);
}
@SuppressWarnings("checkstyle:ParameterNumber")
static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
- RaftGroup group, RetryPolicy retryPolicy,
- GrpcTlsConfig tlsConfig, Configuration ozoneConfiguration) {
+ RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
+ GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout,
+ Configuration ozoneConfiguration) {
if (LOG.isTraceEnabled()) {
LOG.trace("newRaftClient: {}, leader={}, group={}",
rpcType, leader, group);
@@ -200,10 +220,16 @@ public interface RatisHelper {
createRaftGrpcProperties(ozoneConfiguration, properties);
RaftConfigKeys.Rpc.setType(properties, rpcType);
+ RaftClientConfigKeys.Rpc
+ .setRequestTimeout(properties, clientRequestTimeout);
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));
+ // set async max outstanding requests.
+ RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties,
+ maxOutStandingRequest);
+
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(group)
.setLeaderId(leader)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index ab17a52..737add0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -114,7 +114,12 @@ public final class ScmConfigKeys {
"dfs.container.ratis.leader.pending.bytes.limit";
public static final String
DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT = "1GB";
-
+
+ public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
+ "dfs.ratis.client.request.timeout.duration";
+ public static final TimeDuration
+ DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT =
+ TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
"dfs.ratis.client.request.max.retries";
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 3287777..f938448 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -170,6 +170,7 @@ public abstract class XceiverClientSpi implements Closeable {
/**
* Check if an specfic commitIndex is replicated to majority/all servers.
* @param index index to watch for
+ * @param timeout timeout provided for the watch operation to complete
* @return reply containing the min commit index replicated to all or majority
* servers in case of a failure
* @throws InterruptedException
@@ -177,7 +178,7 @@ public abstract class XceiverClientSpi implements Closeable {
* @throws TimeoutException
* @throws IOException
*/
- public abstract XceiverClientReply watchForCommit(long index)
+ public abstract XceiverClientReply watchForCommit(long index, long timeout)
throws InterruptedException, ExecutionException, TimeoutException,
IOException;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index f2f4a6a..857f1de 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -134,6 +134,12 @@ public final class OzoneConfigKeys {
public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT =
"128MB";
+ public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT =
+ "ozone.client.watch.request.timeout";
+
+ public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
+ "30s";
+
public static final String OZONE_CLIENT_MAX_RETRIES =
"ozone.client.max.retries";
public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 100;
@@ -263,6 +269,11 @@ public final class OzoneConfigKeys {
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
"dfs.container.ratis.datanode.storage.dir";
+ public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
+ ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY;
+ public static final TimeDuration
+ DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT =
+ ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT;
public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY;
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a49c198..c8682bd 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -253,6 +253,14 @@
</description>
</property>
<property>
+ <name>dfs.ratis.client.request.timeout.duration</name>
+ <value>3s</value>
+ <tag>OZONE, RATIS, MANAGEMENT</tag>
+ <description>The timeout duration for ratis client request.It should be
+ set greater than leader election timeout in Ratis.
+ </description>
+ </property>
+ <property>
<name>dfs.ratis.client.request.max.retries</name>
<value>180</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
@@ -428,6 +436,14 @@
</description>
</property>
<property>
+ <name>ozone.client.watch.request.timeout</name>
+ <value>30s</value>
+ <tag>OZONE, CLIENT</tag>
+ <description>Timeout for the watch API in Ratis client to acknowledge
+ a particular request getting replayed to all servers.
+ </description>
+ </property>
+ <property>
<name>ozone.client.max.retries</name>
<value>100</value>
<tag>OZONE, CLIENT</tag>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 4af792a..1aa10d8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -111,7 +111,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
this.outputStream =
new BlockOutputStream(blockID, xceiverClientManager,
pipeline, chunkSize, streamBufferFlushSize,
- streamBufferMaxSize, bufferPool, checksumType,
+ streamBufferMaxSize, watchTimeout, bufferPool, checksumType,
bytesPerChecksum);
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 7cf55d6..28916f9 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -586,6 +586,11 @@ public class KeyOutputStream extends OutputStream {
return this;
}
+ public Builder setWatchTimeout(long timeout) {
+ this.watchTimeout = timeout;
+ return this;
+ }
+
public Builder setChecksumType(ChecksumType cType) {
this.checksumType = cType;
return this;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 2982e38..66b789f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -99,6 +99,7 @@ import java.net.URI;
import java.security.InvalidKeyException;
import java.security.SecureRandom;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -127,6 +128,7 @@ public class RpcClient implements ClientProtocol {
private final long streamBufferFlushSize;
private final long streamBufferMaxSize;
private final long blockSize;
+ private final long watchTimeout;
private final ClientId clientId = ClientId.randomId();
private final int maxRetryCount;
private final long retryInterval;
@@ -186,6 +188,10 @@ public class RpcClient implements ClientProtocol {
StorageUnit.BYTES);
blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
+ watchTimeout =
+ conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+ OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
int configuredChecksumSize = (int) conf.getStorageSize(
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
@@ -884,6 +890,7 @@ public class RpcClient implements ClientProtocol {
.setFactor(openKey.getKeyInfo().getFactor())
.setStreamBufferFlushSize(streamBufferFlushSize)
.setStreamBufferMaxSize(streamBufferMaxSize)
+ .setWatchTimeout(watchTimeout)
.setBlockSize(blockSize)
.setBytesPerChecksum(bytesPerChecksum)
.setChecksumType(checksumType)
@@ -1184,6 +1191,7 @@ public class RpcClient implements ClientProtocol {
.setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
.setStreamBufferFlushSize(streamBufferFlushSize)
.setStreamBufferMaxSize(streamBufferMaxSize)
+ .setWatchTimeout(watchTimeout)
.setBlockSize(blockSize)
.setChecksumType(checksumType)
.setBytesPerChecksum(bytesPerChecksum)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index 3f5d33e..f862fd2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
@@ -38,6 +39,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,8 +123,13 @@ public interface RatisTestHelper {
RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
final RaftPeer p = RatisHelper.toRaftPeer(dd);
final OzoneConfiguration conf = new OzoneConfiguration();
+ final int maxOutstandingRequests =
+ HddsClientUtils.getMaxOutstandingRequests(conf);
+ final TimeDuration requestTimeout =
+ RatisHelper.getClientRequestTimeout(conf);
final RaftClient client =
- newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), conf);
+ newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf),
+ maxOutstandingRequests, requestTimeout, conf);
client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index fda6228..fd2cea3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -113,6 +113,8 @@ public class Test2WayCommitInRatis {
@Test
public void test2WayCommitForRetryfailure() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+ TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
startCluster(conf);
GenericTestUtils.LogCapturer logCapturer =
@@ -140,7 +142,7 @@ public class Test2WayCommitInRatis {
.getCloseContainer(pipeline,
container1.getContainerInfo().getContainerID()));
reply.getResponse().get();
- xceiverClient.watchForCommit(reply.getLogIndex());
+ xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
// commitInfo Map will be reduced to 2 here
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 96226d8..2b41012 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -76,6 +76,7 @@ public class TestBlockOutputStream {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 3f1d9ff..3cbe06d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -83,6 +83,7 @@ public class TestBlockOutputStreamWithFailures {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index c2444c1..e18b222 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -85,6 +85,7 @@ public class TestCloseContainerHandlingByClient {
public static void init() throws Exception {
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index eaceb04..8089ac3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -87,6 +87,7 @@ public class TestCommitWatcher {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
@@ -138,7 +139,7 @@ public class TestCommitWatcher {
Assert.assertEquals(1, xceiverClient.getRefcount());
Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
- CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient);
+ CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
List<XceiverClientReply> replies = new ArrayList<>();
long length = 0;
@@ -212,7 +213,7 @@ public class TestCommitWatcher {
Assert.assertEquals(1, xceiverClient.getRefcount());
Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
- CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient);
+ CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
List<XceiverClientReply> replies = new ArrayList<>();
long length = 0;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index d1f2016..a7f5960 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -85,6 +85,8 @@ public class TestFailureHandlingByClient {
conf = new OzoneConfiguration();
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;
+ conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5,
+ TimeUnit.SECONDS);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index bb7b6f0..d834350 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -75,6 +75,7 @@ public class TestKeyInputStream {
flushSize = 4 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setQuietMode(false);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index f532f4d..5717f58 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -77,6 +77,8 @@ public class TestMultiBlockWritesWithDnFailures {
conf = new OzoneConfiguration();
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;
+ conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5,
+ TimeUnit.SECONDS);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 0151c6e..6758f4f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -84,6 +84,7 @@ public class TestOzoneClientRetriesOnException {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
// conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
index 9f9d5af..1343a03 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
@@ -174,7 +174,7 @@ public class TestReadRetries {
Assert.assertTrue(clientSpi instanceof XceiverClientRatis);
XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi;
- ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId());
+ ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000);
// shutdown the datanode
cluster.shutdownHddsDatanode(datanodeDetails);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 645db6a..5808655 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -136,6 +136,8 @@ public class TestWatchForCommit {
// and will be captured in keyOutputStream and the failover will happen
// to a different block
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+ TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
@@ -269,9 +271,53 @@ public class TestWatchForCommit {
}
@Test
+ public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
+ TimeUnit.SECONDS);
+ conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
+ startCluster(conf);
+ XceiverClientManager clientManager = new XceiverClientManager(conf);
+ ContainerWithPipeline container1 = storageContainerLocationClient
+ .allocateContainer(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE, OzoneConsts.OZONE);
+ XceiverClientSpi xceiverClient = clientManager
+ .acquireClient(container1.getPipeline());
+ Assert.assertEquals(1, xceiverClient.getRefcount());
+ Assert.assertEquals(container1.getPipeline(),
+ xceiverClient.getPipeline());
+ Pipeline pipeline = xceiverClient.getPipeline();
+ XceiverClientReply reply = xceiverClient.sendCommandAsync(
+ ContainerTestHelper.getCreateContainerRequest(
+ container1.getContainerInfo().getContainerID(),
+ xceiverClient.getPipeline()));
+ reply.getResponse().get();
+ long index = reply.getLogIndex();
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+ try {
+ // just watch for a log index which in not updated in the commitInfo Map
+ // as well as there is no logIndex generate in Ratis.
+ // The basic idea here is just to test if its throws an exception.
+ xceiverClient
+ .watchForCommit(index + new Random().nextInt(100) + 10, 3000);
+ Assert.fail("expected exception not thrown");
+ } catch (Exception e) {
+ Assert.assertTrue(
+ HddsClientUtils.checkForException(e) instanceof TimeoutException);
+ }
+ // After releasing the xceiverClient, this connection should be closed
+ // and any container operations should fail
+ clientManager.releaseClient(xceiverClient, false);
+ shutdown();
+ }
+
+ @Test
public void testWatchForCommitForRetryfailure() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
- conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+ conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+ 100, TimeUnit.SECONDS);
+ conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
startCluster(conf);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerWithPipeline container1 = storageContainerLocationClient
@@ -297,7 +343,7 @@ public class TestWatchForCommit {
// as well as there is no logIndex generate in Ratis.
// The basic idea here is just to test if its throws an exception.
xceiverClient
- .watchForCommit(index + new Random().nextInt(100) + 10);
+ .watchForCommit(index + new Random().nextInt(100) + 10, 20000);
Assert.fail("expected exception not thrown");
} catch (Exception e) {
Assert.assertTrue(e instanceof ExecutionException);
@@ -314,8 +360,9 @@ public class TestWatchForCommit {
@Test
public void test2WayCommitForTimeoutException() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
+ TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
- conf.set("raft.client.watch.request.timeout", "3s");
startCluster(conf);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
@@ -342,7 +389,7 @@ public class TestWatchForCommit {
.getCloseContainer(pipeline,
container1.getContainerInfo().getContainerID()));
reply.getResponse().get();
- xceiverClient.watchForCommit(reply.getLogIndex());
+ xceiverClient.watchForCommit(reply.getLogIndex(), 3000);
// commitInfo Map will be reduced to 2 here
Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
@@ -358,6 +405,8 @@ public class TestWatchForCommit {
@Test
public void testWatchForCommitForGroupMismatchException() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+ TimeUnit.SECONDS);
conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
// mark the node stale early so that pipleline gets destroyed quickly
@@ -391,7 +440,8 @@ public class TestWatchForCommit {
// as well as there is no logIndex generate in Ratis.
// The basic idea here is just to test if its throws an exception.
xceiverClient
- .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10);
+ .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10,
+ 20000);
Assert.fail("Expected exception not thrown");
} catch(Exception e) {
Assert.assertTrue(HddsClientUtils
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index 7857e1f..fdcb822 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.junit.Assert;
import org.junit.Test;
@@ -39,6 +40,7 @@ public abstract class TestDataValidate {
*
*/
static void startCluster(OzoneConfiguration conf) throws Exception {
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(5).build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
index bef3330..315d1ee 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.freon;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -67,6 +68,7 @@ public class TestOzoneClientKeyGenerator {
if (conf == null) {
conf = new OzoneConfiguration();
}
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(5)
.build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
index 218c570..45ea23d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -44,6 +45,7 @@ public class TestRandomKeyGenerator {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
index 16973ac..c4c84cb 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
@@ -181,7 +181,7 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements
XceiverClientReply xceiverClientReply =
xceiverClientSpi.sendCommandAsync(request);
xceiverClientSpi
- .watchForCommit(xceiverClientReply.getLogIndex());
+ .watchForCommit(xceiverClientReply.getLogIndex(), 1000L);
} else {
xceiverClientSpi.sendCommand(request);
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org