You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2020/04/27 11:48:51 UTC

[incubator-ratis] branch master updated: RATIS-876. Introduce max timeout in RequestTypeDependentRetryPolicy.

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c42b44c  RATIS-876. Introduce max timeout in RequestTypeDependentRetryPolicy.
c42b44c is described below

commit c42b44cfe1577b97dcd490860780e81e19e48106
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Mon Apr 27 17:17:26 2020 +0530

    RATIS-876. Introduce max timeout in RequestTypeDependentRetryPolicy.
---
 .../org/apache/ratis/client/ClientRetryEvent.java  | 27 ++++++++----
 .../org/apache/ratis/client/impl/OrderedAsync.java | 12 +++--
 .../apache/ratis/client/impl/RaftClientImpl.java   | 38 ++++++++++------
 .../apache/ratis/client/impl/UnorderedAsync.java   |  8 ++--
 .../retry/RequestTypeDependentRetryPolicy.java     | 18 ++++++--
 .../ratis/retry/TestExceptionDependentRetry.java   |  4 +-
 .../org/apache/ratis/retry/TestRetryPolicy.java    | 51 +++++++++++++++++++---
 7 files changed, 113 insertions(+), 45 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java b/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
index 5c4ec5d..58a10ff 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.client;
+package org.apache.ratis.client.retry;
 
+import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.TimeDuration;
 
 /** An {@link RetryPolicy.Event} specific to client request failure. */
 public class ClientRetryEvent implements RetryPolicy.Event {
@@ -27,20 +29,25 @@ public class ClientRetryEvent implements RetryPolicy.Event {
   private final int causeCount;
   private final RaftClientRequest request;
   private final Throwable cause;
+  private PendingClientRequest pending;
 
-  public ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount,
-      Throwable cause) {
+  @VisibleForTesting
+  public ClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) {
+    this(attemptCount, request, attemptCount, cause);
+  }
+
+  public ClientRetryEvent(RaftClientRequest request, Throwable t, PendingClientRequest pending) {
+    this(pending.getAttemptCount(), request, pending.getExceptionCount(t), t);
+    this.pending = pending;
+  }
+
+  private ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause) {
     this.attemptCount = attemptCount;
     this.causeCount = causeCount;
     this.request = request;
     this.cause = cause;
   }
 
-  @VisibleForTesting
-  public ClientRetryEvent(int attemptCount, RaftClientRequest request) {
-    this(attemptCount, request, 0, null);
-  }
-
   @Override
   public int getAttemptCount() {
     return attemptCount;
@@ -60,6 +67,10 @@ public class ClientRetryEvent implements RetryPolicy.Event {
     return cause;
   }
 
+  boolean isRequestTimeout(TimeDuration timeout) {
+    return pending != null && pending.isRequestTimeout(timeout);
+  }
+
   @Override
   public String toString() {
     return getClass().getSimpleName() + ":attempt=" + attemptCount + ",request=" + request + ",cause=" + cause;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 88546ca..3b6c396 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.client.impl;
 
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.conf.RaftProperties;
@@ -70,7 +70,7 @@ public final class OrderedAsync {
     }
 
     @Override
-    RaftClientRequest newRequestImpl() {
+    public RaftClientRequest newRequestImpl() {
       return Optional.ofNullable(requestConstructor.get())
           .map(f -> f.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst)))
           .orElse(null);
@@ -207,8 +207,7 @@ public final class OrderedAsync {
   private void scheduleWithTimeout(PendingOrderedRequest pending,
       RaftClientRequest request, RetryPolicy retryPolicy, Throwable e) {
     final int attempt = pending.getAttemptCount();
-    final ClientRetryEvent event = new ClientRetryEvent(attempt, request,
-        pending.getExceptionCount(e), e);
+    final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
     final TimeDuration sleepTime = client.getEffectiveSleepTime(e,
         retryPolicy.handleAttemptFailure(event).getSleepTime());
     LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", attempt, sleepTime, retryPolicy, request);
@@ -232,7 +231,6 @@ public final class OrderedAsync {
     request = pending.newRequest();
     LOG.debug("{}: send* {}", client.getId(), request);
     f = client.getClientRpc().sendRequestAsync(request);
-    int attemptCount = pending.getAttemptCount();
     return f.thenApply(reply -> {
       LOG.debug("{}: receive* {}", client.getId(), reply);
       getSlidingWindow(request).receiveReply(
@@ -246,8 +244,8 @@ public final class OrderedAsync {
       }
       e = JavaUtils.unwrapCompletionException(e);
       if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
-        final int exceptionCount = pending.incrementExceptionCount(e);
-        final ClientRetryEvent event = new ClientRetryEvent(attemptCount, request, exceptionCount, e);
+        pending.incrementExceptionCount(e);
+        final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
         if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
           handleAsyncRetryFailure(event);
         } else {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 665f2e6..4a9040d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.client.impl;
 
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.client.api.StreamApi;
@@ -44,6 +44,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -52,19 +53,20 @@ import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /** A client who sends requests to a raft service. */
-final class RaftClientImpl implements RaftClient {
+public final class RaftClientImpl implements RaftClient {
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
 
   static long nextCallId() {
     return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
   }
 
-  abstract static class PendingClientRequest {
+  public abstract static class PendingClientRequest {
+    private final long creationTimeInMs = System.currentTimeMillis();
     private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
     private final AtomicInteger attemptCount = new AtomicInteger();
     private final Map<Class<?>, Integer> exceptionCount = new ConcurrentHashMap<>();
 
-    abstract RaftClientRequest newRequestImpl();
+    public abstract RaftClientRequest newRequestImpl();
 
     final RaftClientRequest newRequest() {
       attemptCount.incrementAndGet();
@@ -75,16 +77,23 @@ final class RaftClientImpl implements RaftClient {
       return replyFuture;
     }
 
-    int getAttemptCount() {
+    public int getAttemptCount() {
       return attemptCount.get();
     }
 
     int incrementExceptionCount(Throwable t) {
-      return exceptionCount.compute(t.getClass(), (k, v) -> v != null ? v + 1 : 1);
+      return t != null ? exceptionCount.compute(t.getClass(), (k, v) -> v != null ? v + 1 : 1) : 0;
     }
 
-    int getExceptionCount(Throwable t) {
-      return Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0);
+    public int getExceptionCount(Throwable t) {
+      return t != null ? Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0) : 0;
+    }
+
+    public boolean isRequestTimeout(TimeDuration timeout) {
+      if (timeout == null) {
+        return false;
+      }
+      return System.currentTimeMillis() - creationTimeInMs > timeout.toLong(TimeUnit.MILLISECONDS);
     }
   }
 
@@ -275,12 +284,13 @@ final class RaftClientImpl implements RaftClient {
 
   private RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest> supplier) throws IOException {
     PendingClientRequest pending = new PendingClientRequest() {
-      @Override RaftClientRequest newRequestImpl() {
-        return null;
+      @Override
+      public RaftClientRequest newRequestImpl() {
+        return supplier.get();
       }
     };
-    for(int attemptCount = 1;; attemptCount++) {
-      final RaftClientRequest request = supplier.get();
+    while (true) {
+      final RaftClientRequest request = pending.newRequest();
       IOException ioe = null;
       try {
         final RaftClientReply reply = sendRequest(request);
@@ -294,8 +304,8 @@ final class RaftClientImpl implements RaftClient {
         ioe = e;
       }
 
-      final int exceptionCount = ioe != null ? pending.incrementExceptionCount(ioe) : 0;
-      ClientRetryEvent event = new ClientRetryEvent(attemptCount, request, exceptionCount, ioe);
+      pending.incrementExceptionCount(ioe);
+      ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
       final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
       TimeDuration sleepTime = getEffectiveSleepTime(ioe, action.getSleepTime());
 
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index c84e7c7..eeb6b96 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.client.impl;
 
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.GroupMismatchException;
@@ -48,7 +48,7 @@ public interface UnorderedAsync {
     }
 
     @Override
-    RaftClientRequest newRequestImpl() {
+    public RaftClientRequest newRequestImpl() {
       return requestConstructor.get();
     }
   }
@@ -84,8 +84,8 @@ public interface UnorderedAsync {
         }
 
         final Throwable cause = replyException != null ? replyException : e;
-        final int causeCount = pending.incrementExceptionCount(cause);
-        final ClientRetryEvent event = new ClientRetryEvent(attemptCount, request, causeCount, cause);
+        pending.incrementExceptionCount(cause);
+        final ClientRetryEvent event = new ClientRetryEvent(request, cause, pending);
         RetryPolicy retryPolicy = client.getRetryPolicy();
         final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
         TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java b/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
index c4d7523..2e8d4c0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
@@ -17,11 +17,11 @@
  */
 package org.apache.ratis.client.retry;
 
-import org.apache.ratis.client.ClientRetryEvent;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
 
 import java.util.Collections;
 import java.util.EnumMap;
@@ -39,6 +39,7 @@ public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
   public static class Builder {
     private final EnumMap<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> map
         = new EnumMap<>(RaftProtos.RaftClientRequestProto.TypeCase.class);
+    private TimeDuration timeout = null;
 
     /** Set the given policy for the given type. */
     public Builder set(RaftProtos.RaftClientRequestProto.TypeCase type, RetryPolicy policy) {
@@ -47,8 +48,13 @@ public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
       return this;
     }
 
+    public Builder setTimeout(TimeDuration timeout) {
+      this.timeout = timeout;
+      return this;
+    }
+
     public RequestTypeDependentRetryPolicy build() {
-      return new RequestTypeDependentRetryPolicy(map);
+      return new RequestTypeDependentRetryPolicy(map, timeout);
     }
   }
 
@@ -57,10 +63,13 @@ public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
   }
 
   private final Map<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> map;
+  private TimeDuration timeout;
   private final Supplier<String> myString;
 
-  private RequestTypeDependentRetryPolicy(EnumMap<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> map) {
+  private RequestTypeDependentRetryPolicy(
+      EnumMap<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> map, TimeDuration timeout) {
     this.map = Collections.unmodifiableMap(map);
+    this.timeout = timeout;
     this.myString = () -> {
       final StringBuilder b = new StringBuilder(getClass().getSimpleName()).append("{");
       map.forEach((key, value) -> b.append(key).append("->").append(value).append(", "));
@@ -75,6 +84,9 @@ public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
       return RetryPolicies.retryForeverNoSleep().handleAttemptFailure(event);
     }
     final ClientRetryEvent clientEvent = (ClientRetryEvent) event;
+    if (timeout != null && clientEvent.isRequestTimeout(timeout)) {
+      return NO_RETRY_ACTION;
+    }
     return Optional.ofNullable(map.get(clientEvent.getRequest().getType().getTypeCase()))
         .orElse(RetryPolicies.retryForeverNoSleep())
         .handleAttemptFailure(event);
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index f1f3e43..3d374a8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -20,7 +20,7 @@ package org.apache.ratis.retry;
 
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
@@ -153,7 +153,7 @@ public class TestExceptionDependentRetry implements MiniRaftClusterWithGrpc.Fact
       long sleepTime) {
     for (int i = 0; i < retries + 1; i++) {
       RetryPolicy.Action action = exceptionDependentRetry
-          .handleAttemptFailure(new ClientRetryEvent(i, null, i, exception));
+          .handleAttemptFailure(new ClientRetryEvent(i, null, exception));
 
       final boolean expected = i < retries && i < maxAttempts;
       Assert.assertEquals(expected, action.shouldRetry());
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
index ca93bb5..e736c94 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
@@ -18,7 +18,8 @@
 package org.apache.ratis.retry;
 
 import org.apache.ratis.BaseTest;
-import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.impl.RaftClientImpl;
+import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.retry.RequestTypeDependentRetryPolicy;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
@@ -87,7 +88,7 @@ public class TestRetryPolicy extends BaseTest {
         RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
     for(int i = 1; i < 2*n; i++) {
       { //write
-        final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest);
+        final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
 
         final boolean expected = i < n;
@@ -100,21 +101,21 @@ public class TestRetryPolicy extends BaseTest {
       }
 
       { //read and stale read are using default
-        final ClientRetryEvent event = new ClientRetryEvent(i, readRequest);
+        final ClientRetryEvent event = new ClientRetryEvent(i, readRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertTrue(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
 
       {
-        final ClientRetryEvent event = new ClientRetryEvent(i, staleReadRequest);
+        final ClientRetryEvent event = new ClientRetryEvent(i, staleReadRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertTrue(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
 
       { //watch has no retry
-        final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest);
+        final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertFalse(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
@@ -124,6 +125,43 @@ public class TestRetryPolicy extends BaseTest {
   }
 
   @Test
+  public void testRequestTypeDependentRetryWithTimeout()
+      throws InterruptedException {
+    final RequestTypeDependentRetryPolicy.Builder b = RequestTypeDependentRetryPolicy.newBuilder();
+    b.set(RaftClientRequestProto.TypeCase.WRITE, RetryPolicies.retryForeverNoSleep());
+    b.set(RaftClientRequestProto.TypeCase.WATCH, RetryPolicies.retryForeverNoSleep());
+    TimeDuration timeout = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
+    final RetryPolicy policy = b.setTimeout(timeout).build();
+    LOG.info("policy = {}", policy);
+
+    final RaftClientRequest writeRequest = newRaftClientRequest(RaftClientRequest.writeRequestType());
+    final RaftClientRequest watchRequest = newRaftClientRequest(
+        RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
+
+    RaftClientRequest[] requests = new RaftClientRequest[] {writeRequest, watchRequest};
+    RaftClientImpl.PendingClientRequest pending = new RaftClientImpl.PendingClientRequest() {
+      @Override
+      public RaftClientRequest newRequestImpl() {
+        return null;
+      }
+    };
+
+    for (RaftClientRequest request : requests) {
+      final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending);
+      final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+      Assert.assertTrue(action.shouldRetry());
+      Assert.assertEquals(0L, action.getSleepTime().getDuration());
+    }
+
+    Thread.sleep(timeout.toLong(TimeUnit.MILLISECONDS) * 10);
+    for (RaftClientRequest request : requests) {
+      final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending);
+      final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+      Assert.assertFalse(action.shouldRetry());
+    }
+  }
+
+    @Test
   public void testRequestTypeDependentRetryWithExceptionDependentPolicy() throws Exception {
     final RequestTypeDependentRetryPolicy.Builder retryPolicy =
         RequestTypeDependentRetryPolicy.newBuilder();
@@ -178,8 +216,7 @@ public class TestRetryPolicy extends BaseTest {
   private void checkEvent(int exceptionAttemptCount, RetryPolicy retryPolicy, RaftClientRequest raftClientRequest,
       Throwable exception, Pair exceptionPolicyPair) {
     final ClientRetryEvent event =
-        new ClientRetryEvent(exceptionAttemptCount, raftClientRequest,
-            exceptionAttemptCount, exception);
+        new ClientRetryEvent(exceptionAttemptCount, raftClientRequest, exception);
     final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
 
     final boolean expected = exceptionAttemptCount < exceptionPolicyPair.retries;