You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/12/11 21:22:06 UTC
incubator-ratis git commit: RATIS-453. When retry failed on an async
call, it should fails all the following calls in the sliding window.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 3b0be0287 -> 00274fa39
RATIS-453. When retry failed on an async call, it should fails all the following calls in the sliding window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/00274fa3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/00274fa3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/00274fa3
Branch: refs/heads/master
Commit: 00274fa39073e359433005c96097efaa24024702
Parents: 3b0be02
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Tue Dec 11 13:21:41 2018 -0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Tue Dec 11 13:21:41 2018 -0800
----------------------------------------------------------------------
.../ratis/client/impl/RaftClientImpl.java | 62 ++++++-----
.../ratis/protocol/AlreadyClosedException.java | 6 +-
.../org/apache/ratis/retry/RetryPolicies.java | 4 +-
.../org/apache/ratis/retry/RetryPolicy.java | 5 +-
.../org/apache/ratis/util/SlidingWindow.java | 41 ++++++-
.../function/CheckedFunctionWithTimeout.java | 2 +-
.../ratis/util/function/FunctionUtils.java | 34 ++++++
.../test/java/org/apache/ratis/BaseTest.java | 54 +++++-----
.../grpc/client/GrpcClientProtocolClient.java | 3 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 9 +-
.../java/org/apache/ratis/RaftAsyncTests.java | 106 +++++++++++++++----
.../java/org/apache/ratis/RaftTestUtil.java | 12 +++
12 files changed, 252 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 4c73d45..58206bd 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -20,11 +20,12 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
-import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.*;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.*;
+import org.apache.ratis.util.function.FunctionUtils;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -78,11 +79,16 @@ final class RaftClientImpl implements RaftClient {
replyFuture.complete(reply);
}
+ @Override
+ public void fail(Exception e) {
+ replyFuture.completeExceptionally(e);
+ }
+
CompletableFuture<RaftClientReply> getReplyFuture() {
return replyFuture;
}
- public int getAttemptCount() {
+ int getAttemptCount() {
return attemptCount;
}
@@ -164,9 +170,10 @@ final class RaftClientImpl implements RaftClient {
try {
asyncRequestSemaphore.acquire();
} catch (InterruptedException e) {
- throw new CompletionException(IOUtils.toInterruptedIOException(
+ return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
"Interrupted when sending " + type + ", message=" + message, e));
}
+
final long callId = nextCallId();
final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
seq -> newRaftClientRequest(server, callId, seq, message, type));
@@ -268,11 +275,17 @@ final class RaftClientImpl implements RaftClient {
peersInNewConf.filter(p -> !peers.contains(p))::iterator);
}
- private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync(
- PendingAsyncRequest pending) {
- final RaftClientRequest request = pending.newRequest();
+ private void sendRequestWithRetryAsync(PendingAsyncRequest pending) {
final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
- return sendRequestAsync(request, pending.getAttemptCount()).thenCompose(reply -> {
+ if (f.isDone()) {
+ return;
+ }
+
+ final RaftClientRequest request = pending.newRequest();
+ sendRequestAsync(request, pending.getAttemptCount()).thenAccept(reply -> {
+ if (f.isDone()) {
+ return;
+ }
if (reply == null) {
LOG.debug("schedule attempt #{} with policy {} for {}", pending.getAttemptCount(), retryPolicy, request);
scheduler.onTimeout(retryPolicy.getSleepTime(),
@@ -281,8 +294,7 @@ final class RaftClientImpl implements RaftClient {
} else {
f.complete(reply);
}
- return f;
- });
+ }).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
}
private RaftClientReply sendRequestWithRetry(
@@ -315,7 +327,7 @@ final class RaftClientImpl implements RaftClient {
getSlidingWindow(request).receiveReply(
request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
} else if (!retryPolicy.shouldRetry(attemptCount)) {
- return handleAsyncRetry(request, attemptCount);
+ handleAsyncRetryFailure(request, attemptCount);
}
return reply;
}).exceptionally(e -> {
@@ -325,30 +337,22 @@ final class RaftClientImpl implements RaftClient {
LOG.debug("{}: Failed {} with {}", clientId, request, e);
}
e = JavaUtils.unwrapCompletionException(e);
- if (e instanceof GroupMismatchException) {
- throw new CompletionException(e);
- } else if (e instanceof IOException) {
- // once the retryLimit is hit, just remove the request from the
- // sliding window and throw an exception. The exception thrown here will
- // make sure its not retried any more with sendRequestWithRetryAsync call.
+ if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
if (!retryPolicy.shouldRetry(attemptCount)) {
- return handleAsyncRetry(request, attemptCount);
+ handleAsyncRetryFailure(request, attemptCount);
+ } else {
+ handleIOException(request, (IOException) e, null);
}
- handleIOException(request, (IOException)e, null);
- } else {
- throw new CompletionException(e);
+ return null;
}
- return null;
+ throw new CompletionException(e);
});
}
- private RaftClientReply handleAsyncRetry(RaftClientRequest request, int attemptCount) {
- RaftClientReply reply = new RaftClientReply(request,
- new RaftRetryFailureException(
- "Failed " + request + " for " + attemptCount + " attempts with " + retryPolicy), null);
- getSlidingWindow(request).receiveReply(
- request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
- return reply;
+ private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
+ final RaftRetryFailureException rfe = new RaftRetryFailureException(
+ "Failed " + request + " for " + (attemptCount-1) + " attempts with " + retryPolicy);
+ getSlidingWindow(request).fail(request.getSeqNum(), rfe);
}
private RaftClientReply sendRequest(RaftClientRequest request)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
index 85888a0..f69173f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,4 +24,8 @@ public class AlreadyClosedException extends RaftException {
public AlreadyClosedException(String message) {
super(message);
}
+
+ public AlreadyClosedException(String message, Throwable t) {
+ super(message, t);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index b405f81..e5cdeaa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -45,7 +45,7 @@ public interface RetryPolicies {
* Keep trying a limited number of times, waiting a fixed time between attempts,
* and then fail by re-throwing the exception.
*/
- static RetryPolicy retryUpToMaximumCountWithFixedSleep(int maxAttempts, TimeDuration sleepTime) {
+ static RetryLimited retryUpToMaximumCountWithFixedSleep(int maxAttempts, TimeDuration sleepTime) {
return new RetryLimited(maxAttempts, sleepTime);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index 771e524..ba90435 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
* Policy abstract for retrying.
*/
public interface RetryPolicy {
+ TimeDuration ZERO_MILLIS = TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
/**
* Determines whether it is supposed to retry the connection if the operation
@@ -39,6 +40,6 @@ public interface RetryPolicy {
* Returns the time duration for sleep in between the retries.
*/
default TimeDuration getSleepTime() {
- return TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
+ return ZERO_MILLIS;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index ca622dd..a616f07 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.protocol.AlreadyClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,9 @@ public interface SlidingWindow {
void setReply(REPLY reply);
boolean hasReply();
+
+ default void fail(Exception e) {
+ }
}
/** A seqNum-to-request map, sorted by seqNum. */
@@ -169,6 +173,8 @@ public interface SlidingWindow {
private long firstSeqNum = -1;
/** Is the first request replied? */
private boolean firstReplied;
+ /** The exception, if there is any. */
+ private Exception exception;
public Client(Object name) {
this.requests = new RequestMap<REQUEST, REPLY>(name) {
@@ -206,6 +212,12 @@ public interface SlidingWindow {
final long seqNum = nextSeqNum++;
final REQUEST r = requestConstructor.apply(seqNum);
+
+ if (exception != null) {
+ alreadyClosed(r, exception);
+ return r;
+ }
+
requests.putNewRequest(r);
final boolean submitted = sendOrDelayRequest(r, sendMethod);
@@ -302,6 +314,33 @@ public interface SlidingWindow {
firstReplied = false;
LOG.debug("After resetFirstSeqNum: {}", this);
}
+
+ /** Fail all requests starting from the given seqNum. */
+ public synchronized void fail(final long startingSeqNum, Exception e) {
+ exception = e;
+
+ boolean handled = false;
+ for(long i = startingSeqNum; i <= requests.lastSeqNum(); i++) {
+ final REQUEST request = requests.getNonRepliedRequest(i, "fail");
+ if (request != null) {
+ if (request.getSeqNum() == startingSeqNum) {
+ request.fail(e);
+ } else {
+ alreadyClosed(request, e);
+ }
+ handled = true;
+ }
+ }
+
+ if (handled) {
+ removeRepliedFromHead();
+ }
+ }
+
+ private void alreadyClosed(REQUEST request, Exception e) {
+ request.fail(new AlreadyClosedException(SlidingWindow.class.getSimpleName() + "$" + getClass().getSimpleName()
+ + " " + requests.getName() + " is closed.", e));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
index fddfab2..48b6b9f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeoutException;
@FunctionalInterface
public interface CheckedFunctionWithTimeout<INPUT, OUTPUT, THROWABLE extends Throwable> {
/**
- * The same as {@link org.apache.ratis.util.CheckedFunction#apply(Object)}
+ * The same as {@link CheckedFunction#apply(Object)}
* except that this method has a timeout parameter and throws {@link TimeoutException}.
*/
OUTPUT apply(INPUT input, TimeDuration timeout) throws TimeoutException, THROWABLE;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java
new file mode 100644
index 0000000..0e982cb
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public interface FunctionUtils {
+ /**
+ * Convert the given consumer to a function with any output type
+ * such that the returned function always returns null.
+ */
+ static <INPUT, OUTPUT> Function<INPUT, OUTPUT> consumerAsNullFunction(Consumer<INPUT> consumer) {
+ return input -> {
+ consumer.accept(input);
+ return null;
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 3612d21..f7015b7 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -22,6 +22,7 @@ import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedRunnable;
import org.junit.Assert;
import org.junit.Rule;
@@ -35,11 +36,14 @@ import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public abstract class BaseTest {
public final Logger LOG = LoggerFactory.getLogger(getClass());
+ public static final TimeDuration HUNDRED_MILLIS = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+
{
LogUtils.setLogLevel(ConfUtils.LOG, Level.WARN);
LogUtils.setLogLevel(FileUtils.LOG, Level.TRACE);
@@ -83,14 +87,14 @@ public abstract class BaseTest {
@SafeVarargs
public static void assertThrowable(
String description, Throwable t,
- Class<? extends Throwable> exceptedThrowableClass, Logger log,
- Class<? extends Throwable>... exceptedCauseClasses) {
+ Class<? extends Throwable> expectedThrowableClass, Logger log,
+ Class<? extends Throwable>... expectedCauseClasses) {
if (log != null) {
log.info("The test \"" + description + "\" throws " + t.getClass().getSimpleName(), t);
}
- Assert.assertEquals(exceptedThrowableClass, t.getClass());
+ Assert.assertEquals(expectedThrowableClass, t.getClass());
- for (Class<? extends Throwable> expectedCause : exceptedCauseClasses) {
+ for (Class<? extends Throwable> expectedCause : expectedCauseClasses) {
final Throwable previous = t;
t = Objects.requireNonNull(previous.getCause(),
() -> "previous.getCause() == null for previous=" + previous);
@@ -99,48 +103,46 @@ public abstract class BaseTest {
}
@SafeVarargs
- public static void testFailureCase(
+ public static Throwable testFailureCase(
String description, CheckedRunnable<?> testCode,
- Class<? extends Throwable> exceptedThrowableClass, Logger log,
- Class<? extends Throwable>... exceptedCauseClasses) {
- boolean caught = false;
+ Class<? extends Throwable> expectedThrowableClass, Logger log,
+ Class<? extends Throwable>... expectedCauseClasses) {
try {
testCode.run();
} catch (Throwable t) {
- caught = true;
- assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
- }
- if (!caught) {
- Assert.fail("The test \"" + description + "\" does not throw anything.");
+ assertThrowable(description, t, expectedThrowableClass, log, expectedCauseClasses);
+ return t;
}
+ throw new AssertionError("The test \"" + description + "\" does not throw anything.");
}
@SafeVarargs
- public final void testFailureCase(
+ public final Throwable testFailureCase(
String description, CheckedRunnable<?> testCode,
- Class<? extends Throwable> exceptedThrowableClass,
- Class<? extends Throwable>... exceptedCauseClasses) {
- testFailureCase(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses);
+ Class<? extends Throwable> expectedThrowableClass,
+ Class<? extends Throwable>... expectedCauseClasses) {
+ return testFailureCase(description, testCode, expectedThrowableClass, LOG, expectedCauseClasses);
}
@SafeVarargs
- public static void testFailureCaseAsync(
+ public static Throwable testFailureCaseAsync(
String description, Supplier<CompletableFuture<?>> testCode,
- Class<? extends Throwable> exceptedThrowableClass, Logger log,
- Class<? extends Throwable>... exceptedCauseClasses) {
+ Class<? extends Throwable> expectedThrowableClass, Logger log,
+ Class<? extends Throwable>... expectedCauseClasses) {
try {
testCode.get().join();
- Assert.fail("The test \"" + description + "\" does not throw anything.");
} catch (Throwable t) {
t = JavaUtils.unwrapCompletionException(t);
- assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
+ assertThrowable(description, t, expectedThrowableClass, log, expectedCauseClasses);
+ return t;
}
+ throw new AssertionError("The test \"" + description + "\" does not throw anything.");
}
@SafeVarargs
- public final void testFailureCaseAsync(
- String description, Supplier<CompletableFuture<?>> testCode, Class<? extends Throwable> exceptedThrowableClass,
- Class<? extends Throwable>... exceptedCauseClasses) {
- testFailureCaseAsync(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses);
+ public final Throwable testFailureCaseAsync(
+ String description, Supplier<CompletableFuture<?>> testCode, Class<? extends Throwable> expectedThrowableClass,
+ Class<? extends Throwable>... expectedCauseClasses) {
+ return testFailureCaseAsync(description, testCode, expectedThrowableClass, LOG, expectedCauseClasses);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index cf239b6..8a1b111 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -35,6 +35,7 @@ import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolService
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
+import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -201,7 +202,7 @@ public class GrpcClientProtocolClient implements Closeable {
CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
if (map == null) {
- return JavaUtils.completeExceptionally(new IOException("Already closed."));
+ return JavaUtils.completeExceptionally(new AlreadyClosedException(getName() + " is closed."));
}
final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
CollectionUtils.putNew(request.getCallId(), f, map,
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index f1f33e1..31faf35 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -103,11 +103,18 @@ public abstract class MiniRaftCluster implements Closeable {
}
default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
+ runWithNewCluster(numServers, true, testCase);
+ }
+
+ default void runWithNewCluster(int numServers, boolean startCluster, CheckedConsumer<CLUSTER, Exception> testCase)
+ throws Exception {
final StackTraceElement caller = JavaUtils.getCallerStackTraceElement();
LOG.info("Running " + caller.getMethodName());
final CLUSTER cluster = newCluster(numServers);
try {
- cluster.start();
+ if (startCluster) {
+ cluster.start();
+ }
testCase.accept(cluster);
} catch(Throwable t) {
LOG.error("Failed " + caller + ": " + cluster.printServers(), t);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 0719976..3821f5f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -25,6 +25,7 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
@@ -32,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicies.RetryLimited;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -42,6 +44,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferExce
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedRunnable;
import org.junit.Assert;
import org.junit.Test;
@@ -52,6 +55,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -95,31 +99,89 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
}
}
+ static void assertRaftRetryFailureException(RaftRetryFailureException rfe, RetryPolicy retryPolicy, String name) {
+ Assert.assertNotNull(name + " does not have RaftRetryFailureException", rfe);
+ Assert.assertTrue(name + ": unexpected error message, rfe=" + rfe + ", retryPolicy=" + retryPolicy,
+ rfe.getMessage().contains(retryPolicy.toString()));
+ }
+
@Test
- public void testRequestAsyncWithRetryPolicy() throws Exception {
- runWithNewCluster(NUM_SERVERS, this::runTestRequestAsyncWithRetryPolicy);
+ public void testRequestAsyncWithRetryFailure() throws Exception {
+ runWithNewCluster(1, false, cluster -> runTestRequestAsyncWithRetryFailure(false, cluster));
}
- void runTestRequestAsyncWithRetryPolicy(CLUSTER cluster) throws Exception {
- final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
- 3, TimeDuration.valueOf(1, TimeUnit.SECONDS));
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
-
- try(final RaftClient writeClient = cluster.createClient(leader.getId(), retryPolicy)) {
- // blockStartTransaction of the leader so that no transaction can be committed MAJORITY
- LOG.info("block leader {}", leader.getId());
- SimpleStateMachine4Testing.get(leader).blockStartTransaction();
- final SimpleMessage[] messages = SimpleMessage.create(2);
- final RaftClientReply reply = writeClient.sendAsync(messages[0]).get();
- RaftRetryFailureException rfe = reply.getRetryFailureException();
- Assert.assertNotNull(rfe);
- Assert.assertTrue(rfe.getMessage().contains(retryPolicy.toString()));
-
- // unblock leader so that the next transaction can be committed.
- SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
- // make sure the the next request succeeds. This will ensure the first
- // request completed
- writeClient.sendAsync(messages[1]).get();
+ @Test
+ public void testRequestAsyncWithRetryFailureAfterInitialMessages() throws Exception {
+ runWithNewCluster(1, true, cluster -> runTestRequestAsyncWithRetryFailure(true, cluster));
+ }
+
+ void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER cluster) throws Exception {
+ final RetryLimited retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(10, HUNDRED_MILLIS);
+
+ try(final RaftClient client = cluster.createClient(null, retryPolicy)) {
+ RaftPeerId leader = null;
+ if (initialMessages) {
+ // cluster is already started, send a few success messages
+ leader = RaftTestUtil.waitForLeader(cluster).getId();
+ final SimpleMessage[] messages = SimpleMessage.create(10, "initial-");
+ final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
+ for (int i = 0; i < messages.length; i++) {
+ replies.add(client.sendAsync(messages[i]));
+ }
+ for (int i = 0; i < messages.length; i++) {
+ RaftTestUtil.assertSuccessReply(replies.get(i));
+ }
+
+ // kill the only server
+ cluster.killServer(leader);
+ }
+
+ // now, either the cluster is not yet started or the server is killed.
+ final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
+ {
+ final SimpleMessage[] messages = SimpleMessage.create(10);
+ int i = 0;
+ // send half of the calls without starting the cluster
+ for (; i < messages.length/2; i++) {
+ replies.add(client.sendAsync(messages[i]));
+ }
+
+ // sleep most of the retry time
+ retryPolicy.getSleepTime().apply(t -> t * (retryPolicy.getMaxAttempts() - 1)).sleep();
+
+ // send another half of the calls without starting the cluster
+ for (; i < messages.length; i++) {
+ replies.add(client.sendAsync(messages[i]));
+ }
+ Assert.assertEquals(messages.length, replies.size());
+ }
+
+ // sleep again so that the first half calls will fail retries.
+ // the second half still have retry time remaining.
+ retryPolicy.getSleepTime().apply(t -> t*2).sleep();
+
+ if (leader != null) {
+ cluster.restartServer(leader, false);
+ } else {
+ cluster.start();
+ }
+
+ // all the calls should fail for ordering guarantee
+ for(int i = 0; i < replies.size(); i++) {
+ final CheckedRunnable<Exception> getReply = replies.get(i)::get;
+ final String name = "retry-failure-" + i;
+ if (i == 0) {
+ final Throwable t = testFailureCase(name, getReply,
+ ExecutionException.class, RaftRetryFailureException.class);
+ assertRaftRetryFailureException((RaftRetryFailureException) t.getCause(), retryPolicy, name);
+ } else {
+ testFailureCase(name, getReply,
+ ExecutionException.class, AlreadyClosedException.class, RaftRetryFailureException.class);
+ }
+ }
+
+ testFailureCaseAsync("last-request", () -> client.sendAsync(new SimpleMessage("last")),
+ AlreadyClosedException.class, RaftRetryFailureException.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index a96b917..6f0a20a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -23,6 +23,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
@@ -51,6 +52,8 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
@@ -424,4 +427,13 @@ public interface RaftTestUtil {
}
return null;
}
+
+ static void assertSuccessReply(CompletableFuture<RaftClientReply> reply) throws Exception {
+ assertSuccessReply(reply.get(10, TimeUnit.SECONDS));
+ }
+
+ static void assertSuccessReply(RaftClientReply reply) {
+ Assert.assertNotNull("reply == null", reply);
+ Assert.assertTrue("reply is not success: " + reply, reply.isSuccess());
+ }
}