You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/15 10:12:10 UTC
[hadoop] branch trunk updated: HDDS-1098. Introduce Retry Policy in
Ozone Client. Contributed by Shashikant Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 155ab6d HDDS-1098. Introduce Retry Policy in Ozone Client. Contributed by Shashikant Banerjee.
155ab6d is described below
commit 155ab6d5d8ab4a80019e65351572320502d8a510
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Fri Mar 15 15:40:59 2019 +0530
HDDS-1098. Introduce Retry Policy in Ozone Client. Contributed by Shashikant Banerjee.
---
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 19 ++-
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 +-
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 5 +
.../main/java/org/apache/ratis/RatisHelper.java | 33 ++++-
.../common/src/main/resources/ozone-default.xml | 16 ++-
.../transport/server/ratis/XceiverServerRatis.java | 18 +--
.../TestCloseContainerCommandHandler.java | 8 +-
.../hdds/scm/pipeline/RatisPipelineUtils.java | 10 +-
.../hadoop/ozone/client/OzoneClientUtils.java | 11 ++
.../hadoop/ozone/client/io/KeyOutputStream.java | 149 +++++++++++++--------
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 22 +--
.../org/apache/hadoop/ozone/RatisTestHelper.java | 5 +-
.../web/storage/DistributedStorageHandler.java | 11 +-
13 files changed, 206 insertions(+), 105 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 673a82b..65241bf 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -47,6 +47,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +75,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+ final TimeDuration clientRequestTimeout =
+ RatisHelper.getClientRequestTimeout(ozoneConf);
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
@@ -81,7 +84,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
SecurityConfig(ozoneConf));
return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
- retryPolicy, tlsConfig);
+ retryPolicy, tlsConfig, clientRequestTimeout);
}
private final Pipeline pipeline;
@@ -90,6 +93,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
private final int maxOutstandingRequests;
private final RetryPolicy retryPolicy;
private final GrpcTlsConfig tlsConfig;
+ private final TimeDuration clientRequestTimeout;
// Map to track commit index at every server
private final ConcurrentHashMap<UUID, Long> commitInfoMap;
@@ -102,7 +106,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
*/
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
int maxOutStandingChunks, RetryPolicy retryPolicy,
- GrpcTlsConfig tlsConfig) {
+ GrpcTlsConfig tlsConfig, TimeDuration timeout) {
super();
this.pipeline = pipeline;
this.rpcType = rpcType;
@@ -111,6 +115,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
commitInfoMap = new ConcurrentHashMap<>();
watchClient = null;
this.tlsConfig = tlsConfig;
+ this.clientRequestTimeout = timeout;
}
private void updateCommitInfosMap(
@@ -160,7 +165,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
// requests to be handled by raft client
if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
- maxOutstandingRequests, tlsConfig))) {
+ maxOutstandingRequests, tlsConfig, clientRequestTimeout))) {
throw new IllegalStateException("Client is already connected.");
}
}
@@ -243,7 +248,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
if (watchClient == null) {
watchClient =
RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
- maxOutstandingRequests, tlsConfig);
+ maxOutstandingRequests, tlsConfig, clientRequestTimeout);
}
CompletableFuture<RaftClientReply> replyFuture = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
@@ -260,9 +265,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
// TODO : need to remove the code to create the new RaftClient instance
// here once the watch request bypassing sliding window in Raft Client
// gets fixed.
- watchClient = RatisHelper
- .newRaftClient(rpcType, getPipeline(), retryPolicy,
- maxOutstandingRequests, tlsConfig);
+ watchClient =
+ RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
+ maxOutstandingRequests, tlsConfig, clientRequestTimeout);
reply = watchClient
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1f84ebe..4c67eb3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -121,12 +121,12 @@ public final class ScmConfigKeys {
TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
"dfs.ratis.client.request.max.retries";
- public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
+ public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20;
public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
"dfs.ratis.client.request.retry.interval";
public static final TimeDuration
DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
- TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+ TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
"dfs.ratis.server.retry-cache.timeout.duration";
public static final TimeDuration
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index dc3ebe5..1388d00 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -133,6 +133,11 @@ public final class OzoneConfigKeys {
public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
"30s";
+ public static final String OZONE_CLIENT_MAX_RETRIES =
+ "ozone.client.max.retries";
+ public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 5;
+
+
// This defines the overall connection limit for the connection pool used in
// RestClient.
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
index 3713d7a..a63d18a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
@@ -134,33 +135,51 @@ public interface RatisHelper {
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
RetryPolicy retryPolicy, int maxOutStandingRequest,
- GrpcTlsConfig tlsConfig) throws IOException {
+ GrpcTlsConfig tlsConfig, TimeDuration timeout) throws IOException {
return newRaftClient(rpcType, toRaftPeerId(pipeline.getFirstNode()),
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
- pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig);
+ pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig,
+ timeout);
+ }
+
+ static TimeDuration getClientRequestTimeout(Configuration conf) {
+ // Set the client requestTimeout
+ final TimeUnit timeUnit =
+ OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+ .getUnit();
+ final long duration = conf.getTimeDuration(
+ OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
+ OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+ .getDuration(), timeUnit);
+ final TimeDuration clientRequestTimeout =
+ TimeDuration.valueOf(duration, timeUnit);
+ return clientRequestTimeout;
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy, int maxOutstandingRequests,
- GrpcTlsConfig tlsConfig) {
+ GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
- maxOutstandingRequests, tlsConfig);
+ maxOutstandingRequests, tlsConfig, clientRequestTimeout);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
- RetryPolicy retryPolicy, int maxOutstandingRequests) {
+ RetryPolicy retryPolicy, int maxOutstandingRequests,
+ TimeDuration clientRequestTimeout) {
return newRaftClient(rpcType, leader.getId(),
newRaftGroup(new ArrayList<>(Arrays.asList(leader))), retryPolicy,
- maxOutstandingRequests, null);
+ maxOutstandingRequests, null, clientRequestTimeout);
}
static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
- GrpcTlsConfig tlsConfig) {
+ GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout) {
LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);
final RaftProperties properties = new RaftProperties();
RaftConfigKeys.Rpc.setType(properties, rpcType);
+ RaftClientConfigKeys.Rpc
+ .setRequestTimeout(properties, clientRequestTimeout);
GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9918b8b..e68e05e 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -231,17 +231,19 @@
<name>dfs.ratis.client.request.timeout.duration</name>
<value>3s</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
- <description>The timeout duration for ratis client request.</description>
+ <description>The timeout duration for ratis client request.It should be
+ set greater than leader election timeout in Ratis.
+ </description>
</property>
<property>
<name>dfs.ratis.client.request.max.retries</name>
- <value>180</value>
+ <value>20</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>Number of retries for ratis client request.</description>
</property>
<property>
<name>dfs.ratis.client.request.retry.interval</name>
- <value>100ms</value>
+ <value>500ms</value>
<tag>OZONE, RATIS, MANAGEMENT</tag>
<description>Interval between successive retries for a ratis client request.
</description>
@@ -418,6 +420,14 @@
</description>
</property>
<property>
+ <name>ozone.client.max.retries</name>
+ <value>5</value>
+ <tag>OZONE, CLIENT</tag>
+ <description>Maximum number of retries by Ozone Client on encountering
+ exception while writing a key.
+ </description>
+ </property>
+ <property>
<name>ozone.client.protocol</name>
<value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>
<tag>OZONE, CLIENT, MANAGEMENT</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index b96e00a..19e43b9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import io.opentracing.Scope;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
@@ -176,7 +175,7 @@ public final class XceiverServerRatis extends XceiverServer {
setRaftSegmentPreallocatedSize(conf, properties);
// Set max write buffer size, which is the scm chunk size
- final int maxChunkSize = setMaxWriteBuffer(conf, properties);
+ final int maxChunkSize = setMaxWriteBuffer(properties);
TimeUnit timeUnit;
long duration;
@@ -329,23 +328,10 @@ public final class XceiverServerRatis extends XceiverServer {
.setRequestTimeout(properties, serverRequestTimeout);
}
- private int setMaxWriteBuffer(Configuration conf, RaftProperties properties) {
+ private int setMaxWriteBuffer(RaftProperties properties) {
final int maxChunkSize = OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE;
RaftServerConfigKeys.Log.setWriteBufferSize(properties,
SizeInBytes.valueOf(maxChunkSize));
-
- // Set the client requestTimeout
- TimeUnit timeUnit =
- OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
- .getUnit();
- long duration = conf.getTimeDuration(
- OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
- OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
- .getDuration(), timeUnit);
- final TimeDuration clientRequestTimeout =
- TimeDuration.valueOf(duration, timeUnit);
- RaftClientConfigKeys.Rpc
- .setRequestTimeout(properties, clientRequestTimeout);
return maxChunkSize;
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 7d8b3d6..16e0e9d 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
@@ -49,6 +50,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
/**
* Test cases to verify CloseContainerCommandHandler in datanode.
@@ -289,8 +291,10 @@ public class TestCloseContainerCommandHandler {
final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId,
Collections.singleton(datanodeDetails));
final int maxOutstandingRequests = 100;
- final RaftClient client = RatisHelper.newRaftClient(SupportedRpcType.GRPC,
- peer, retryPolicy, maxOutstandingRequests, null);
+ final RaftClient client = RatisHelper
+ .newRaftClient(SupportedRpcType.GRPC, peer, retryPolicy,
+ maxOutstandingRequests,
+ TimeDuration.valueOf(3, TimeUnit.SECONDS));
Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess());
Thread.sleep(2000);
final ContainerID containerId = ContainerID.valueof(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 3b36add..0af34fb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,9 +117,11 @@ final class RatisPipelineUtils {
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(
new SecurityConfig(ozoneConf));
+ final TimeDuration requestTimeout =
+ RatisHelper.getClientRequestTimeout(ozoneConf);
RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
- retryPolicy, maxOutstandingRequests, tlsConfig);
+ retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout);
client
.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), true, p.getId());
}
@@ -141,12 +144,13 @@ final class RatisPipelineUtils {
HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf));
-
+ final TimeDuration requestTimeout =
+ RatisHelper.getClientRequestTimeout(ozoneConf);
datanodes.parallelStream().forEach(d -> {
final RaftPeer p = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
- retryPolicy, maxOutstandingRequests, tlsConfig)) {
+ retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
rpc.accept(client, p);
} catch (IOException ioe) {
String errMsg =
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
index 9651518..012a225 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.client;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.rest.response.*;
import org.apache.ratis.protocol.AlreadyClosedException;
@@ -27,6 +29,7 @@ import org.apache.ratis.protocol.RaftRetryFailureException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** A utility class for OzoneClient. */
@@ -122,6 +125,14 @@ public final class OzoneClientUtils {
return keyInfo;
}
+ public static RetryPolicy createRetryPolicy(int maxRetryCount) {
+ // just retry without sleep
+ RetryPolicy retryPolicy = RetryPolicies
+ .retryUpToMaximumCountWithFixedSleep(maxRetryCount, 0,
+ TimeUnit.MILLISECONDS);
+ return retryPolicy;
+ }
+
public static List<Class<? extends Exception>> getExceptionList() {
return EXCEPTION_LIST;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index d1acbe1..a379889 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.om.helpers.*;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -45,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
@@ -87,6 +90,8 @@ public class KeyOutputStream extends OutputStream {
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private FileEncryptionInfo feInfo;
private ExcludeList excludeList;
+ private final RetryPolicy retryPolicy;
+ private int retryCount;
/**
* A constructor for testing purpose only.
*/
@@ -111,6 +116,8 @@ public class KeyOutputStream extends OutputStream {
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
this.bytesPerChecksum = OzoneConfigKeys
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
+ this.retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+ retryCount = 0;
}
@VisibleForTesting
@@ -147,7 +154,7 @@ public class KeyOutputStream extends OutputStream {
String requestId, ReplicationFactor factor, ReplicationType type,
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
ChecksumType checksumType, int bytesPerChecksum,
- String uploadID, int partNumber, boolean isMultipart) {
+ String uploadID, int partNumber, boolean isMultipart, int maxRetryCount) {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.omClient = omClient;
@@ -183,6 +190,8 @@ public class KeyOutputStream extends OutputStream {
this.bufferPool =
new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
this.excludeList = new ExcludeList();
+ this.retryPolicy = OzoneClientUtils.createRetryPolicy(maxRetryCount);
+ this.retryCount = 0;
}
/**
@@ -308,23 +317,18 @@ public class KeyOutputStream extends OutputStream {
current.write(b, off, writeLen);
}
} catch (IOException ioe) {
- Throwable t = checkForException(ioe);
- if (t != null) {
- // for the current iteration, totalDataWritten - currentPos gives the
- // amount of data already written to the buffer
-
- // In the retryPath, the total data to be written will always be equal
- // to or less than the max length of the buffer allocated.
- // The len specified here is the combined sum of the data length of
- // the buffers
- Preconditions.checkState(!retry || len <= streamBufferMaxSize);
- writeLen = retry ? (int) len :
- (int) (current.getWrittenDataLength() - currentPos);
- LOG.debug("writeLen {}, total len {}", writeLen, len);
- handleException(current, currentStreamIndex, t);
- } else {
- throw ioe;
- }
+ // for the current iteration, totalDataWritten - currentPos gives the
+ // amount of data already written to the buffer
+
+ // In the retryPath, the total data to be written will always be equal
+ // to or less than the max length of the buffer allocated.
+ // The len specified here is the combined sum of the data length of
+ // the buffers
+ Preconditions.checkState(!retry || len <= streamBufferMaxSize);
+ writeLen = retry ? (int) len :
+ (int) (current.getWrittenDataLength() - currentPos);
+ LOG.debug("writeLen {}, total len {}", writeLen, len);
+ handleException(current, currentStreamIndex, ioe);
}
if (current.getRemaining() <= 0) {
// since the current block is already written close the stream.
@@ -390,7 +394,8 @@ public class KeyOutputStream extends OutputStream {
* @throws IOException Throws IOException if Write fails
*/
private void handleException(BlockOutputStreamEntry streamEntry,
- int streamIndex, Throwable exception) throws IOException {
+ int streamIndex, IOException exception) throws IOException {
+ Throwable t = checkForException(exception);
boolean retryFailure = checkForRetryFailure(exception);
boolean closedContainerException = false;
if (!retryFailure) {
@@ -413,9 +418,9 @@ public class KeyOutputStream extends OutputStream {
if (!failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers);
}
- if (checkIfContainerIsClosed(exception)) {
+ if (checkIfContainerIsClosed(t)) {
excludeList.addConatinerId(ContainerID.valueof(containerId));
- } else if (retryFailure || exception instanceof TimeoutException) {
+ } else if (retryFailure || t instanceof TimeoutException) {
pipelineId = streamEntry.getPipeline().getId();
excludeList.addPipeline(pipelineId);
}
@@ -425,7 +430,7 @@ public class KeyOutputStream extends OutputStream {
// If the data is still cached in the underlying stream, we need to
// allocate new block and write this data in the datanode.
currentStreamIndex += 1;
- handleWrite(null, 0, bufferedDataLen, true);
+ handleRetry(exception, bufferedDataLen);
}
if (totalSuccessfulFlushedData == 0) {
streamEntries.remove(streamIndex);
@@ -448,6 +453,43 @@ public class KeyOutputStream extends OutputStream {
}
}
+ private void handleRetry(IOException exception, long len) throws IOException {
+ RetryPolicy.RetryAction action;
+ try {
+ action = retryPolicy
+ .shouldRetry(exception, retryCount, 0, true);
+ } catch (Exception e) {
+ throw e instanceof IOException ? (IOException) e : new IOException(e);
+ }
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ if (action.reason != null) {
+ LOG.error("Retry request failed. " + action.reason,
+ exception);
+ }
+ throw exception;
+ }
+
+ // Throw the exception if the thread is interrupted
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("Interrupted while trying for retry");
+ throw exception;
+ }
+ Preconditions.checkArgument(
+ action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
+ if (action.delayMillis > 0) {
+ try {
+ Thread.sleep(action.delayMillis);
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException(
+ "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
+ .initCause(e);
+ }
+ }
+ retryCount++;
+ LOG.trace("Retrying Write request. Already tried "
+ + retryCount + " time(s); retry policy is " + retryPolicy);
+ handleWrite(null, 0, len, true);
+ }
/**
* Checks if the provided exception signifies retry failure in ratis client.
* In case of retry failure, ratis client throws RaftRetryFailureException
@@ -462,7 +504,7 @@ public class KeyOutputStream extends OutputStream {
return t instanceof ContainerNotOpenException;
}
- private Throwable checkForException(IOException ioe) {
+ private Throwable checkForException(IOException ioe) throws IOException {
Throwable t = ioe.getCause();
while (t != null) {
for (Class<? extends Exception> cls : OzoneClientUtils
@@ -473,7 +515,7 @@ public class KeyOutputStream extends OutputStream {
}
t = t.getCause();
}
- return null;
+ throw ioe;
}
private long getKeyLength() {
@@ -512,36 +554,30 @@ public class KeyOutputStream extends OutputStream {
if (streamEntries.size() == 0) {
return;
}
- int size = streamEntries.size();
- int streamIndex =
- currentStreamIndex >= size ? size - 1 : currentStreamIndex;
- BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
- if (entry != null) {
- try {
- Collection<DatanodeDetails> failedServers = entry.getFailedServers();
-
- // failed servers can be null in case there is no data written in the
- // stream
- if (failedServers != null && !failedServers.isEmpty()) {
- excludeList.addDatanodes(failedServers);
- }
- if (close) {
- entry.close();
- } else {
- entry.flush();
- }
- } catch (IOException ioe) {
- Throwable t = checkForException(ioe);
- if (t != null) {
- // This call will allocate a new streamEntry and write the Data.
- // Close needs to be retried on the newly allocated streamEntry as
- // as well.
- handleException(entry, streamIndex, t);
- handleFlushOrClose(close);
- } else {
- throw ioe;
+ while (true) {
+ int size = streamEntries.size();
+ int streamIndex =
+ currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+ BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
+ if (entry != null) {
+ try {
+ Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+ // failed servers can be null in case there is no data written in the
+ // stream
+ if (failedServers != null && !failedServers.isEmpty()) {
+ excludeList.addDatanodes(failedServers);
+ }
+ if (close) {
+ entry.close();
+ } else {
+ entry.flush();
+ }
+ } catch (IOException ioe) {
+ handleException(entry, streamIndex, ioe);
+ continue;
}
}
+ break;
}
}
@@ -616,6 +652,7 @@ public class KeyOutputStream extends OutputStream {
private String multipartUploadID;
private int multipartNumber;
private boolean isMultipartKey;
+ private int maxRetryCount;
public Builder setMultipartUploadID(String uploadID) {
@@ -704,11 +741,17 @@ public class KeyOutputStream extends OutputStream {
return this;
}
+ public Builder setMaxRetryCount(int maxCount) {
+ this.maxRetryCount = maxCount;
+ return this;
+ }
+
public KeyOutputStream build() throws IOException {
return new KeyOutputStream(openHandler, xceiverManager, scmClient,
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
streamBufferMaxSize, blockSize, watchTimeout, checksumType,
- bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey);
+ bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
+ maxRetryCount);
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 7fab650..d059582 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -123,6 +123,7 @@ public class RpcClient implements ClientProtocol {
private final long blockSize;
private final long watchTimeout;
private final ClientId clientId = ClientId.randomId();
+ private final int maxRetryCount;
/**
* Creates RpcClient instance with the given configuration.
@@ -205,6 +206,9 @@ public class RpcClient implements ClientProtocol {
this.verifyChecksum =
conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
+ maxRetryCount =
+ conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
+ OZONE_CLIENT_MAX_RETRIES_DEFAULT);
}
private InetSocketAddress getScmAddressForClient() throws IOException {
@@ -594,7 +598,7 @@ public class RpcClient implements ClientProtocol {
.build();
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
- KeyOutputStream groupOutputStream =
+ KeyOutputStream keyOutputStream =
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
@@ -610,20 +614,21 @@ public class RpcClient implements ClientProtocol {
.setBlockSize(blockSize)
.setChecksumType(checksumType)
.setBytesPerChecksum(bytesPerChecksum)
+ .setMaxRetryCount(maxRetryCount)
.build();
- groupOutputStream.addPreallocateBlocks(
+ keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
- final FileEncryptionInfo feInfo = groupOutputStream
+ final FileEncryptionInfo feInfo = keyOutputStream
.getFileEncryptionInfo();
if (feInfo != null) {
KeyProvider.KeyVersion decrypted = getDEK(feInfo);
final CryptoOutputStream cryptoOut = new CryptoOutputStream(
- groupOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo),
+ keyOutputStream, OzoneKMSUtil.getCryptoCodec(conf, feInfo),
decrypted.getMaterial(), feInfo.getIV());
return new OzoneOutputStream(cryptoOut);
} else {
- return new OzoneOutputStream(groupOutputStream);
+ return new OzoneOutputStream(keyOutputStream);
}
}
@@ -856,7 +861,7 @@ public class RpcClient implements ClientProtocol {
.build();
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
- KeyOutputStream groupOutputStream =
+ KeyOutputStream keyOutputStream =
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
@@ -875,11 +880,12 @@ public class RpcClient implements ClientProtocol {
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
+ .setMaxRetryCount(maxRetryCount)
.build();
- groupOutputStream.addPreallocateBlocks(
+ keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
- return new OzoneOutputStream(groupOutputStream);
+ return new OzoneOutputStream(keyOutputStream);
}
@Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index 322339b..da06c59 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -31,6 +31,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,9 +127,11 @@ public interface RatisTestHelper {
final OzoneConfiguration conf = new OzoneConfiguration();
final int maxOutstandingRequests =
HddsClientUtils.getMaxOutstandingRequests(conf);
+ final TimeDuration requestTimeout =
+ RatisHelper.getClientRequestTimeout(conf);
final RaftClient client =
newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf),
- maxOutstandingRequests);
+ maxOutstandingRequests, requestTimeout);
client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
}
}
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index f0f8a60..6fff3e4 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -88,6 +88,7 @@ public final class DistributedStorageHandler implements StorageHandler {
private final ChecksumType checksumType;
private final int bytesPerChecksum;
private final boolean verifyChecksum;
+ private final int maxRetryCount;
/**
* Creates a new DistributedStorageHandler.
@@ -154,6 +155,9 @@ public final class DistributedStorageHandler implements StorageHandler {
this.verifyChecksum =
conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
+ this.maxRetryCount =
+ conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
+ OZONE_CLIENT_MAX_RETRIES_DEFAULT);
}
@Override
@@ -438,7 +442,7 @@ public final class DistributedStorageHandler implements StorageHandler {
.build();
// contact OM to allocate a block for key.
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
- KeyOutputStream groupOutputStream =
+ KeyOutputStream keyOutputStream =
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
@@ -454,11 +458,12 @@ public final class DistributedStorageHandler implements StorageHandler {
.setWatchTimeout(watchTimeout)
.setChecksumType(checksumType)
.setBytesPerChecksum(bytesPerChecksum)
+ .setMaxRetryCount(maxRetryCount)
.build();
- groupOutputStream.addPreallocateBlocks(
+ keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
openKey.getOpenVersion());
- return new OzoneOutputStream(groupOutputStream);
+ return new OzoneOutputStream(keyOutputStream);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org