You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2020/04/27 11:48:51 UTC
[incubator-ratis] branch master updated: RATIS-876. Introduce max
timeout in RequestTypeDependentRetryPolicy.
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c42b44c RATIS-876. Introduce max timeout in RequestTypeDependentRetryPolicy.
c42b44c is described below
commit c42b44cfe1577b97dcd490860780e81e19e48106
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Mon Apr 27 17:17:26 2020 +0530
RATIS-876. Introduce max timeout in RequestTypeDependentRetryPolicy.
---
.../org/apache/ratis/client/ClientRetryEvent.java | 27 ++++++++----
.../org/apache/ratis/client/impl/OrderedAsync.java | 12 +++--
.../apache/ratis/client/impl/RaftClientImpl.java | 38 ++++++++++------
.../apache/ratis/client/impl/UnorderedAsync.java | 8 ++--
.../retry/RequestTypeDependentRetryPolicy.java | 18 ++++++--
.../ratis/retry/TestExceptionDependentRetry.java | 4 +-
.../org/apache/ratis/retry/TestRetryPolicy.java | 51 +++++++++++++++++++---
7 files changed, 113 insertions(+), 45 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java b/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
index 5c4ec5d..58a10ff 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.client;
+package org.apache.ratis.client.retry;
+import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.TimeDuration;
/** An {@link RetryPolicy.Event} specific to client request failure. */
public class ClientRetryEvent implements RetryPolicy.Event {
@@ -27,20 +29,25 @@ public class ClientRetryEvent implements RetryPolicy.Event {
private final int causeCount;
private final RaftClientRequest request;
private final Throwable cause;
+ private PendingClientRequest pending;
- public ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount,
- Throwable cause) {
+ @VisibleForTesting
+ public ClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) {
+ this(attemptCount, request, attemptCount, cause);
+ }
+
+ public ClientRetryEvent(RaftClientRequest request, Throwable t, PendingClientRequest pending) {
+ this(pending.getAttemptCount(), request, pending.getExceptionCount(t), t);
+ this.pending = pending;
+ }
+
+ private ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause) {
this.attemptCount = attemptCount;
this.causeCount = causeCount;
this.request = request;
this.cause = cause;
}
- @VisibleForTesting
- public ClientRetryEvent(int attemptCount, RaftClientRequest request) {
- this(attemptCount, request, 0, null);
- }
-
@Override
public int getAttemptCount() {
return attemptCount;
@@ -60,6 +67,10 @@ public class ClientRetryEvent implements RetryPolicy.Event {
return cause;
}
+ boolean isRequestTimeout(TimeDuration timeout) {
+ return pending != null && pending.isRequestTimeout(timeout);
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + ":attempt=" + attemptCount + ",request=" + request + ",cause=" + cause;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 88546ca..3b6c396 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.client.impl;
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.conf.RaftProperties;
@@ -70,7 +70,7 @@ public final class OrderedAsync {
}
@Override
- RaftClientRequest newRequestImpl() {
+ public RaftClientRequest newRequestImpl() {
return Optional.ofNullable(requestConstructor.get())
.map(f -> f.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst)))
.orElse(null);
@@ -207,8 +207,7 @@ public final class OrderedAsync {
private void scheduleWithTimeout(PendingOrderedRequest pending,
RaftClientRequest request, RetryPolicy retryPolicy, Throwable e) {
final int attempt = pending.getAttemptCount();
- final ClientRetryEvent event = new ClientRetryEvent(attempt, request,
- pending.getExceptionCount(e), e);
+ final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
final TimeDuration sleepTime = client.getEffectiveSleepTime(e,
retryPolicy.handleAttemptFailure(event).getSleepTime());
LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", attempt, sleepTime, retryPolicy, request);
@@ -232,7 +231,6 @@ public final class OrderedAsync {
request = pending.newRequest();
LOG.debug("{}: send* {}", client.getId(), request);
f = client.getClientRpc().sendRequestAsync(request);
- int attemptCount = pending.getAttemptCount();
return f.thenApply(reply -> {
LOG.debug("{}: receive* {}", client.getId(), reply);
getSlidingWindow(request).receiveReply(
@@ -246,8 +244,8 @@ public final class OrderedAsync {
}
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
- final int exceptionCount = pending.incrementExceptionCount(e);
- final ClientRetryEvent event = new ClientRetryEvent(attemptCount, request, exceptionCount, e);
+ pending.incrementExceptionCount(e);
+ final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
handleAsyncRetryFailure(event);
} else {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 665f2e6..4a9040d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.client.impl;
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.client.api.StreamApi;
@@ -44,6 +44,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -52,19 +53,20 @@ import java.util.function.Supplier;
import java.util.stream.Stream;
/** A client who sends requests to a raft service. */
-final class RaftClientImpl implements RaftClient {
+public final class RaftClientImpl implements RaftClient {
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
- abstract static class PendingClientRequest {
+ public abstract static class PendingClientRequest {
+ private final long creationTimeInMs = System.currentTimeMillis();
private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
private final AtomicInteger attemptCount = new AtomicInteger();
private final Map<Class<?>, Integer> exceptionCount = new ConcurrentHashMap<>();
- abstract RaftClientRequest newRequestImpl();
+ public abstract RaftClientRequest newRequestImpl();
final RaftClientRequest newRequest() {
attemptCount.incrementAndGet();
@@ -75,16 +77,23 @@ final class RaftClientImpl implements RaftClient {
return replyFuture;
}
- int getAttemptCount() {
+ public int getAttemptCount() {
return attemptCount.get();
}
int incrementExceptionCount(Throwable t) {
- return exceptionCount.compute(t.getClass(), (k, v) -> v != null ? v + 1 : 1);
+ return t != null ? exceptionCount.compute(t.getClass(), (k, v) -> v != null ? v + 1 : 1) : 0;
}
- int getExceptionCount(Throwable t) {
- return Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0);
+ public int getExceptionCount(Throwable t) {
+ return t != null ? Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0) : 0;
+ }
+
+ public boolean isRequestTimeout(TimeDuration timeout) {
+ if (timeout == null) {
+ return false;
+ }
+ return System.currentTimeMillis() - creationTimeInMs > timeout.toLong(TimeUnit.MILLISECONDS);
}
}
@@ -275,12 +284,13 @@ final class RaftClientImpl implements RaftClient {
private RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest> supplier) throws IOException {
PendingClientRequest pending = new PendingClientRequest() {
- @Override RaftClientRequest newRequestImpl() {
- return null;
+ @Override
+ public RaftClientRequest newRequestImpl() {
+ return supplier.get();
}
};
- for(int attemptCount = 1;; attemptCount++) {
- final RaftClientRequest request = supplier.get();
+ while (true) {
+ final RaftClientRequest request = pending.newRequest();
IOException ioe = null;
try {
final RaftClientReply reply = sendRequest(request);
@@ -294,8 +304,8 @@ final class RaftClientImpl implements RaftClient {
ioe = e;
}
- final int exceptionCount = ioe != null ? pending.incrementExceptionCount(ioe) : 0;
- ClientRetryEvent event = new ClientRetryEvent(attemptCount, request, exceptionCount, ioe);
+ pending.incrementExceptionCount(ioe);
+ ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
TimeDuration sleepTime = getEffectiveSleepTime(ioe, action.getSleepTime());
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index c84e7c7..eeb6b96 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.client.impl;
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupMismatchException;
@@ -48,7 +48,7 @@ public interface UnorderedAsync {
}
@Override
- RaftClientRequest newRequestImpl() {
+ public RaftClientRequest newRequestImpl() {
return requestConstructor.get();
}
}
@@ -84,8 +84,8 @@ public interface UnorderedAsync {
}
final Throwable cause = replyException != null ? replyException : e;
- final int causeCount = pending.incrementExceptionCount(cause);
- final ClientRetryEvent event = new ClientRetryEvent(attemptCount, request, causeCount, cause);
+ pending.incrementExceptionCount(cause);
+ final ClientRetryEvent event = new ClientRetryEvent(request, cause, pending);
RetryPolicy retryPolicy = client.getRetryPolicy();
final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java b/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
index c4d7523..2e8d4c0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
@@ -17,11 +17,11 @@
*/
package org.apache.ratis.client.retry;
-import org.apache.ratis.client.ClientRetryEvent;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
import java.util.Collections;
import java.util.EnumMap;
@@ -39,6 +39,7 @@ public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
public static class Builder {
private final EnumMap<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> map
= new EnumMap<>(RaftProtos.RaftClientRequestProto.TypeCase.class);
+ private TimeDuration timeout = null;
/** Set the given policy for the given type. */
public Builder set(RaftProtos.RaftClientRequestProto.TypeCase type, RetryPolicy policy) {
@@ -47,8 +48,13 @@ public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
return this;
}
+ public Builder setTimeout(TimeDuration timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
public RequestTypeDependentRetryPolicy build() {
- return new RequestTypeDependentRetryPolicy(map);
+ return new RequestTypeDependentRetryPolicy(map, timeout);
}
}
@@ -57,10 +63,13 @@ public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
}
private final Map<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> map;
+ private TimeDuration timeout;
private final Supplier<String> myString;
- private RequestTypeDependentRetryPolicy(EnumMap<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> map) {
+ private RequestTypeDependentRetryPolicy(
+ EnumMap<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> map, TimeDuration timeout) {
this.map = Collections.unmodifiableMap(map);
+ this.timeout = timeout;
this.myString = () -> {
final StringBuilder b = new StringBuilder(getClass().getSimpleName()).append("{");
map.forEach((key, value) -> b.append(key).append("->").append(value).append(", "));
@@ -75,6 +84,9 @@ public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
return RetryPolicies.retryForeverNoSleep().handleAttemptFailure(event);
}
final ClientRetryEvent clientEvent = (ClientRetryEvent) event;
+ if (timeout != null && clientEvent.isRequestTimeout(timeout)) {
+ return NO_RETRY_ACTION;
+ }
return Optional.ofNullable(map.get(clientEvent.getRequest().getType().getTypeCase()))
.orElse(RetryPolicies.retryForeverNoSleep())
.handleAttemptFailure(event);
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index f1f3e43..3d374a8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -20,7 +20,7 @@ package org.apache.ratis.retry;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
@@ -153,7 +153,7 @@ public class TestExceptionDependentRetry implements MiniRaftClusterWithGrpc.Fact
long sleepTime) {
for (int i = 0; i < retries + 1; i++) {
RetryPolicy.Action action = exceptionDependentRetry
- .handleAttemptFailure(new ClientRetryEvent(i, null, i, exception));
+ .handleAttemptFailure(new ClientRetryEvent(i, null, exception));
final boolean expected = i < retries && i < maxAttempts;
Assert.assertEquals(expected, action.shouldRetry());
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
index ca93bb5..e736c94 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
@@ -18,7 +18,8 @@
package org.apache.ratis.retry;
import org.apache.ratis.BaseTest;
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.impl.RaftClientImpl;
+import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.retry.RequestTypeDependentRetryPolicy;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
@@ -87,7 +88,7 @@ public class TestRetryPolicy extends BaseTest {
RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
for(int i = 1; i < 2*n; i++) {
{ //write
- final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest);
+ final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest, null);
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
final boolean expected = i < n;
@@ -100,21 +101,21 @@ public class TestRetryPolicy extends BaseTest {
}
{ //read and stale read are using default
- final ClientRetryEvent event = new ClientRetryEvent(i, readRequest);
+ final ClientRetryEvent event = new ClientRetryEvent(i, readRequest, null);
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
Assert.assertTrue(action.shouldRetry());
Assert.assertEquals(0L, action.getSleepTime().getDuration());
}
{
- final ClientRetryEvent event = new ClientRetryEvent(i, staleReadRequest);
+ final ClientRetryEvent event = new ClientRetryEvent(i, staleReadRequest, null);
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
Assert.assertTrue(action.shouldRetry());
Assert.assertEquals(0L, action.getSleepTime().getDuration());
}
{ //watch has no retry
- final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest);
+ final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest, null);
final RetryPolicy.Action action = policy.handleAttemptFailure(event);
Assert.assertFalse(action.shouldRetry());
Assert.assertEquals(0L, action.getSleepTime().getDuration());
@@ -124,6 +125,43 @@ public class TestRetryPolicy extends BaseTest {
}
@Test
+ public void testRequestTypeDependentRetryWithTimeout()
+ throws InterruptedException {
+ final RequestTypeDependentRetryPolicy.Builder b = RequestTypeDependentRetryPolicy.newBuilder();
+ b.set(RaftClientRequestProto.TypeCase.WRITE, RetryPolicies.retryForeverNoSleep());
+ b.set(RaftClientRequestProto.TypeCase.WATCH, RetryPolicies.retryForeverNoSleep());
+ TimeDuration timeout = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
+ final RetryPolicy policy = b.setTimeout(timeout).build();
+ LOG.info("policy = {}", policy);
+
+ final RaftClientRequest writeRequest = newRaftClientRequest(RaftClientRequest.writeRequestType());
+ final RaftClientRequest watchRequest = newRaftClientRequest(
+ RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
+
+ RaftClientRequest[] requests = new RaftClientRequest[] {writeRequest, watchRequest};
+ RaftClientImpl.PendingClientRequest pending = new RaftClientImpl.PendingClientRequest() {
+ @Override
+ public RaftClientRequest newRequestImpl() {
+ return null;
+ }
+ };
+
+ for (RaftClientRequest request : requests) {
+ final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending);
+ final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+ Assert.assertTrue(action.shouldRetry());
+ Assert.assertEquals(0L, action.getSleepTime().getDuration());
+ }
+
+ Thread.sleep(timeout.toLong(TimeUnit.MILLISECONDS) * 10);
+ for (RaftClientRequest request : requests) {
+ final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending);
+ final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+ Assert.assertFalse(action.shouldRetry());
+ }
+ }
+
+ @Test
public void testRequestTypeDependentRetryWithExceptionDependentPolicy() throws Exception {
final RequestTypeDependentRetryPolicy.Builder retryPolicy =
RequestTypeDependentRetryPolicy.newBuilder();
@@ -178,8 +216,7 @@ public class TestRetryPolicy extends BaseTest {
private void checkEvent(int exceptionAttemptCount, RetryPolicy retryPolicy, RaftClientRequest raftClientRequest,
Throwable exception, Pair exceptionPolicyPair) {
final ClientRetryEvent event =
- new ClientRetryEvent(exceptionAttemptCount, raftClientRequest,
- exceptionAttemptCount, exception);
+ new ClientRetryEvent(exceptionAttemptCount, raftClientRequest, exception);
final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
final boolean expected = exceptionAttemptCount < exceptionPolicyPair.retries;