You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/09/11 18:07:44 UTC

incubator-ratis git commit: RATIS-310. Add support for Retry Policy in Ratis. Contributed by Shashikant Banerjee

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 09b099c71 -> 2ce10dc1a


RATIS-310. Add support for Retry Policy in Ratis.  Contributed by Shashikant Banerjee


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2ce10dc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2ce10dc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2ce10dc1

Branch: refs/heads/master
Commit: 2ce10dc1a88b7657c7d63e02774459afd8724c4d
Parents: 09b099c
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Tue Sep 11 11:04:22 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Tue Sep 11 11:04:22 2018 -0700

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  13 +-
 .../ratis/client/impl/ClientImplUtils.java      |   8 +-
 .../ratis/client/impl/RaftClientImpl.java       |  22 +--
 .../org/apache/ratis/retry/RetryPolicies.java   | 141 +++++++++++++++++++
 .../org/apache/ratis/retry/RetryPolicy.java     |  40 ++++++
 .../java/org/apache/ratis/TestRetryPolicy.java  |  42 ++++++
 6 files changed, 254 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index ead9155..d28d50a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -22,6 +22,8 @@ import org.apache.ratis.client.impl.ClientImplUtils;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.slf4j.Logger;
@@ -111,6 +113,7 @@ public interface RaftClient extends Closeable {
     private RaftPeerId leaderId;
     private RaftProperties properties;
     private Parameters parameters;
+    private RetryPolicy retryPolicy;
 
     private Builder() {}
 
@@ -126,11 +129,13 @@ public interface RaftClient extends Closeable {
           clientRpc = factory.newRaftClientRpc(clientId, properties);
         }
       }
+      retryPolicy =
+          retryPolicy == null ? RetryPolicies.RETRY_FOREVER : retryPolicy;
       return ClientImplUtils.newRaftClient(clientId,
           Objects.requireNonNull(group, "The 'group' field is not initialized."),
           leaderId,
           Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."),
-          properties);
+          properties, retryPolicy);
     }
 
     /** Set {@link RaftClient} ID. */
@@ -168,5 +173,11 @@ public interface RaftClient extends Closeable {
       this.parameters = parameters;
       return this;
     }
+
+    /** Set {@link RetryPolicy}. */
+    public Builder setRetryPolicy(RetryPolicy retryPolicy) {
+      this.retryPolicy = retryPolicy;
+      return this;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index d813650..f2564a9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -21,6 +21,8 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -28,7 +30,9 @@ import org.apache.ratis.protocol.RaftPeerId;
 /** Client utilities for internal use. */
 public class ClientImplUtils {
   public static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
-      RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties) {
-    return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties);
+      RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties,
+      RetryPolicy retryPolicy) {
+    return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties,
+        retryPolicy);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9419c7f..d6c9a27 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -20,6 +20,8 @@ package org.apache.ratis.client.impl;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
@@ -91,6 +93,7 @@ final class RaftClientImpl implements RaftClient {
   private final Collection<RaftPeer> peers;
   private final RaftGroupId groupId;
   private final TimeDuration retryInterval;
+  private final RetryPolicy retryPolicy;
 
   private volatile RaftPeerId leaderId;
 
@@ -101,7 +104,7 @@ final class RaftClientImpl implements RaftClient {
   private final Semaphore asyncRequestSemaphore;
 
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
-      RaftClientRpc clientRpc, RaftProperties properties) {
+      RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
     this.clientId = clientId;
     this.clientRpc = clientRpc;
     this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
@@ -109,6 +112,8 @@ final class RaftClientImpl implements RaftClient {
     this.leaderId = leaderId != null? leaderId
         : !peers.isEmpty()? peers.iterator().next().getId(): null;
     this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties);
+    Preconditions.assertTrue(retryPolicy != null, "retry policy can't be null");
+    this.retryPolicy = retryPolicy;
 
     asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
     scheduler = TimeoutScheduler.newInstance(RaftClientConfigKeys.Async.schedulerThreads(properties));
@@ -258,20 +263,19 @@ final class RaftClientImpl implements RaftClient {
   private RaftClientReply sendRequestWithRetry(
       Supplier<RaftClientRequest> supplier)
       throws InterruptedIOException, StateMachineException, GroupMismatchException {
-    for(;;) {
+    for(int retryCount = 0;; retryCount++) {
       final RaftClientRequest request = supplier.get();
       final RaftClientReply reply = sendRequest(request);
       if (reply != null) {
         return reply;
       }
-
-      // sleep and then retry
+      if (!retryPolicy.shouldRetry(retryCount)) {
+        return null;
+      }
       try {
-        retryInterval.sleep();
-      } catch (InterruptedException ie) {
-        Thread.currentThread().interrupt();
-        throw IOUtils.toInterruptedIOException(
-            "Interrupted when sending " + request, ie);
+        retryPolicy.getSleepTime().sleep();
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("retry policy=" + retryPolicy);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
new file mode 100644
index 0000000..4505c8d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.retry;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A collection of {@link RetryPolicy} implementations
+ */
+public class RetryPolicies {
+  /**
+   * Keep retrying forever.
+   */
+  public static final RetryPolicy RETRY_FOREVER = new RetryForever();
+
+  /**
+   * Keep trying a limited number of times, waiting a fixed time between attempts,
+   * and then fail by re-throwing the exception.
+   */
+  public static final RetryPolicy retryUpToMaximumCountWithFixedSleep(
+      int maxRetries, TimeDuration sleepTime) {
+    return new RetryUpToMaximumCountWithFixedSleep(maxRetries, sleepTime);
+  }
+
+
+  static class RetryForever implements RetryPolicy {
+    @Override
+    public boolean shouldRetry(int retryCount) {
+      return true;
+    }
+
+    @Override
+    public TimeDuration getSleepTime() {
+      return TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  /**
+   * Retry up to maxRetries.
+   * The actual sleep time of the n-th retry is f(n, sleepTime),
+   * where f is a function provided by the subclass implementation.
+   *
+   * The object of the subclasses should be immutable;
+   * otherwise, the subclass must override hashCode(), equals(..) and toString().
+   */
+  static abstract class RetryLimited implements RetryPolicy {
+    private final int maxRetries;
+    private final TimeDuration sleepTime;
+
+    private String myString;
+
+    RetryLimited(int maxRetries, TimeDuration sleepTime) {
+      if (maxRetries < 0) {
+        throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0");
+      }
+      if (sleepTime.isNegative()) {
+        throw new IllegalArgumentException(
+            "sleepTime = " + sleepTime.getDuration() + " < 0");
+      }
+
+      this.maxRetries = maxRetries;
+      this.sleepTime = sleepTime;
+    }
+
+    @Override
+    public TimeDuration getSleepTime() {
+      return sleepTime;
+    }
+
+    public int getMaxRetries() {
+      return maxRetries;
+    }
+
+    @Override
+    public boolean shouldRetry(int retryCount) {
+      if (retryCount >= maxRetries) {
+        return false;
+      } else {
+        return true;
+      }
+    }
+
+    protected String getReason() {
+      return constructReasonString(maxRetries);
+    }
+
+    @VisibleForTesting
+    public static String constructReasonString(int retries) {
+      return "retries get failed due to exceeded maximum allowed retries " +
+          "number: " + retries;
+    }
+
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object that) {
+      if (this == that) {
+        return true;
+      } else if (that == null || this.getClass() != that.getClass()) {
+        return false;
+      }
+      return this.toString().equals(that.toString());
+    }
+
+    @Override
+    public String toString() {
+      if (myString == null) {
+        myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries
+            + ", sleepTime=" + sleepTime + ")";
+      }
+      return myString;
+    }
+  }
+
+  static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited {
+    public RetryUpToMaximumCountWithFixedSleep(int maxRetries, TimeDuration sleepTime) {
+      super(maxRetries, sleepTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
new file mode 100644
index 0000000..fe4ee80
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.retry;
+
+import org.apache.ratis.util.TimeDuration;
+
+/**
+ * Policy abstract for retrying.
+ */
+public interface RetryPolicy {
+
+  /**
+   * Determines whether it is supposed to retry the connection if the operation
+   * fails for some reason.
+   *
+   * @param retryCount The number of times retried so far
+   * @return true if it has to make another attempt, otherwise, false
+   */
+  boolean shouldRetry(int retryCount);
+
+  /**
+   * Returns the time duration for sleep in between the retries.
+   */
+  TimeDuration getSleepTime();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java b/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java
new file mode 100644
index 0000000..d481003
--- /dev/null
+++ b/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+
+
+public class TestRetryPolicy {
+
+  @Test
+  public void testRetryMultipleTimesWithFixedSleep() {
+    RetryPolicy retryPolicy = RetryPolicies
+        .retryUpToMaximumCountWithFixedSleep(2,
+            TimeDuration.valueOf(1000L, TimeUnit.MILLISECONDS));
+     boolean shouldRetry = retryPolicy.shouldRetry(1);
+    Assert.assertTrue(shouldRetry);
+    Assert.assertTrue(1000 == retryPolicy.getSleepTime().getDuration());
+    Assert.assertFalse(retryPolicy.shouldRetry(2));
+  }
+}
\ No newline at end of file