You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/09/11 18:07:44 UTC
incubator-ratis git commit: RATIS-310. Add support for Retry Policy
in Ratis. Contributed by Shashikant Banerjee
Repository: incubator-ratis
Updated Branches:
refs/heads/master 09b099c71 -> 2ce10dc1a
RATIS-310. Add support for Retry Policy in Ratis. Contributed by Shashikant Banerjee
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2ce10dc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2ce10dc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2ce10dc1
Branch: refs/heads/master
Commit: 2ce10dc1a88b7657c7d63e02774459afd8724c4d
Parents: 09b099c
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Tue Sep 11 11:04:22 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Tue Sep 11 11:04:22 2018 -0700
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 13 +-
.../ratis/client/impl/ClientImplUtils.java | 8 +-
.../ratis/client/impl/RaftClientImpl.java | 22 +--
.../org/apache/ratis/retry/RetryPolicies.java | 141 +++++++++++++++++++
.../org/apache/ratis/retry/RetryPolicy.java | 40 ++++++
.../java/org/apache/ratis/TestRetryPolicy.java | 42 ++++++
6 files changed, 254 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index ead9155..d28d50a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -22,6 +22,8 @@ import org.apache.ratis.client.impl.ClientImplUtils;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.slf4j.Logger;
@@ -111,6 +113,7 @@ public interface RaftClient extends Closeable {
private RaftPeerId leaderId;
private RaftProperties properties;
private Parameters parameters;
+ private RetryPolicy retryPolicy;
private Builder() {}
@@ -126,11 +129,13 @@ public interface RaftClient extends Closeable {
clientRpc = factory.newRaftClientRpc(clientId, properties);
}
}
+ retryPolicy =
+ retryPolicy == null ? RetryPolicies.RETRY_FOREVER : retryPolicy;
return ClientImplUtils.newRaftClient(clientId,
Objects.requireNonNull(group, "The 'group' field is not initialized."),
leaderId,
Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."),
- properties);
+ properties, retryPolicy);
}
/** Set {@link RaftClient} ID. */
@@ -168,5 +173,11 @@ public interface RaftClient extends Closeable {
this.parameters = parameters;
return this;
}
+
+ /** Set {@link RetryPolicy}. */
+ public Builder setRetryPolicy(RetryPolicy retryPolicy) {
+ this.retryPolicy = retryPolicy;
+ return this;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index d813650..f2564a9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -21,6 +21,8 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
@@ -28,7 +30,9 @@ import org.apache.ratis.protocol.RaftPeerId;
/** Client utilities for internal use. */
public class ClientImplUtils {
public static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
- RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties) {
- return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties);
+ RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties,
+ RetryPolicy retryPolicy) {
+ return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties,
+ retryPolicy);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9419c7f..d6c9a27 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -20,6 +20,8 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
@@ -91,6 +93,7 @@ final class RaftClientImpl implements RaftClient {
private final Collection<RaftPeer> peers;
private final RaftGroupId groupId;
private final TimeDuration retryInterval;
+ private final RetryPolicy retryPolicy;
private volatile RaftPeerId leaderId;
@@ -101,7 +104,7 @@ final class RaftClientImpl implements RaftClient {
private final Semaphore asyncRequestSemaphore;
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
- RaftClientRpc clientRpc, RaftProperties properties) {
+ RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
this.clientId = clientId;
this.clientRpc = clientRpc;
this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
@@ -109,6 +112,8 @@ final class RaftClientImpl implements RaftClient {
this.leaderId = leaderId != null? leaderId
: !peers.isEmpty()? peers.iterator().next().getId(): null;
this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties);
+ Preconditions.assertTrue(retryPolicy != null, "retry policy can't be null");
+ this.retryPolicy = retryPolicy;
asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
scheduler = TimeoutScheduler.newInstance(RaftClientConfigKeys.Async.schedulerThreads(properties));
@@ -258,20 +263,19 @@ final class RaftClientImpl implements RaftClient {
private RaftClientReply sendRequestWithRetry(
Supplier<RaftClientRequest> supplier)
throws InterruptedIOException, StateMachineException, GroupMismatchException {
- for(;;) {
+ for(int retryCount = 0;; retryCount++) {
final RaftClientRequest request = supplier.get();
final RaftClientReply reply = sendRequest(request);
if (reply != null) {
return reply;
}
-
- // sleep and then retry
+ if (!retryPolicy.shouldRetry(retryCount)) {
+ return null;
+ }
try {
- retryInterval.sleep();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw IOUtils.toInterruptedIOException(
- "Interrupted when sending " + request, ie);
+ retryPolicy.getSleepTime().sleep();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("retry policy=" + retryPolicy);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
new file mode 100644
index 0000000..4505c8d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.retry;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A collection of {@link RetryPolicy} implementations
+ */
+public class RetryPolicies {
+ /**
+ * Keep retrying forever.
+ */
+ public static final RetryPolicy RETRY_FOREVER = new RetryForever();
+
+ /**
+ * Keep trying a limited number of times, waiting a fixed time between attempts,
+ * and then fail by re-throwing the exception.
+ */
+ public static final RetryPolicy retryUpToMaximumCountWithFixedSleep(
+ int maxRetries, TimeDuration sleepTime) {
+ return new RetryUpToMaximumCountWithFixedSleep(maxRetries, sleepTime);
+ }
+
+
+ static class RetryForever implements RetryPolicy {
+ @Override
+ public boolean shouldRetry(int retryCount) {
+ return true;
+ }
+
+ @Override
+ public TimeDuration getSleepTime() {
+ return TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Retry up to maxRetries.
+ * The actual sleep time of the n-th retry is f(n, sleepTime),
+ * where f is a function provided by the subclass implementation.
+ *
+ * The object of the subclasses should be immutable;
+ * otherwise, the subclass must override hashCode(), equals(..) and toString().
+ */
+ static abstract class RetryLimited implements RetryPolicy {
+ private final int maxRetries;
+ private final TimeDuration sleepTime;
+
+ private String myString;
+
+ RetryLimited(int maxRetries, TimeDuration sleepTime) {
+ if (maxRetries < 0) {
+ throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0");
+ }
+ if (sleepTime.isNegative()) {
+ throw new IllegalArgumentException(
+ "sleepTime = " + sleepTime.getDuration() + " < 0");
+ }
+
+ this.maxRetries = maxRetries;
+ this.sleepTime = sleepTime;
+ }
+
+ @Override
+ public TimeDuration getSleepTime() {
+ return sleepTime;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ @Override
+ public boolean shouldRetry(int retryCount) {
+ if (retryCount >= maxRetries) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ protected String getReason() {
+ return constructReasonString(maxRetries);
+ }
+
+ @VisibleForTesting
+ public static String constructReasonString(int retries) {
+ return "retries get failed due to exceeded maximum allowed retries " +
+ "number: " + retries;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object that) {
+ if (this == that) {
+ return true;
+ } else if (that == null || this.getClass() != that.getClass()) {
+ return false;
+ }
+ return this.toString().equals(that.toString());
+ }
+
+ @Override
+ public String toString() {
+ if (myString == null) {
+ myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries
+ + ", sleepTime=" + sleepTime + ")";
+ }
+ return myString;
+ }
+ }
+
+ static class RetryUpToMaximumCountWithFixedSleep extends RetryLimited {
+ public RetryUpToMaximumCountWithFixedSleep(int maxRetries, TimeDuration sleepTime) {
+ super(maxRetries, sleepTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
new file mode 100644
index 0000000..fe4ee80
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.retry;
+
+import org.apache.ratis.util.TimeDuration;
+
+/**
+ * Policy abstract for retrying.
+ */
+public interface RetryPolicy {
+
+ /**
+ * Determines whether it is supposed to retry the connection if the operation
+ * fails for some reason.
+ *
+ * @param retryCount The number of times retried so far
+ * @return true if it has to make another attempt, otherwise, false
+ */
+ boolean shouldRetry(int retryCount);
+
+ /**
+ * Returns the time duration for sleep in between the retries.
+ */
+ TimeDuration getSleepTime();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2ce10dc1/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java b/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java
new file mode 100644
index 0000000..d481003
--- /dev/null
+++ b/ratis-common/src/test/java/org/apache/ratis/TestRetryPolicy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+
+
+public class TestRetryPolicy {
+
+ @Test
+ public void testRetryMultipleTimesWithFixedSleep() {
+ RetryPolicy retryPolicy = RetryPolicies
+ .retryUpToMaximumCountWithFixedSleep(2,
+ TimeDuration.valueOf(1000L, TimeUnit.MILLISECONDS));
+ boolean shouldRetry = retryPolicy.shouldRetry(1);
+ Assert.assertTrue(shouldRetry);
+ Assert.assertTrue(1000 == retryPolicy.getSleepTime().getDuration());
+ Assert.assertFalse(retryPolicy.shouldRetry(2));
+ }
+}
\ No newline at end of file