You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/11 22:33:44 UTC
[incubator-ratis] branch master updated: RATIS-345. Watch requests
should bypass sliding window.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5680cf5 RATIS-345. Watch requests should bypass sliding window.
5680cf5 is described below
commit 5680cf5fbb03362c65f0514adf104a5e29ea9b57
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Mar 11 12:16:22 2019 -0700
RATIS-345. Watch requests should bypass sliding window.
---
dev-support/run-test-repeatedly.sh | 4 +-
.../org/apache/ratis/client/RaftClientRpc.java | 7 +
.../apache/ratis/client/impl/RaftClientImpl.java | 107 +++++++-----
.../apache/ratis/client/impl/UnorderedAsync.java | 104 ++++++++++++
.../org/apache/ratis/protocol/RaftClientReply.java | 5 -
.../apache/ratis/protocol/TimeoutIOException.java | 4 +
.../org/apache/ratis/util/CollectionUtils.java | 4 +-
.../main/java/org/apache/ratis/util/JavaUtils.java | 9 +-
.../java/org/apache/ratis/util/PeerProxyMap.java | 8 +-
.../main/java/org/apache/ratis/grpc/GrpcUtil.java | 18 +-
.../grpc/client/GrpcClientProtocolClient.java | 122 ++++++++++----
.../grpc/client/GrpcClientProtocolService.java | 184 ++++++++++++++++-----
.../apache/ratis/grpc/client/GrpcClientRpc.java | 36 +++-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 8 +-
ratis-proto/src/main/proto/Grpc.proto | 4 +
.../org/apache/ratis/server/impl/LeaderState.java | 2 +
.../apache/ratis/server/impl/ServerProtoUtils.java | 18 +-
.../org/apache/ratis/server/storage/RaftLog.java | 6 +-
.../ratis/server/storage/SegmentedRaftLog.java | 12 ++
.../java/org/apache/ratis/WatchRequestTests.java | 60 +++----
.../ratis/grpc/TestWatchRequestWithGrpc.java | 14 +-
21 files changed, 567 insertions(+), 169 deletions(-)
diff --git a/dev-support/run-test-repeatedly.sh b/dev-support/run-test-repeatedly.sh
index 2a666a9..dd3b874 100755
--- a/dev-support/run-test-repeatedly.sh
+++ b/dev-support/run-test-repeatedly.sh
@@ -27,6 +27,8 @@ TEST_NAME=`echo ${TEST_PATTERN} | cut -d# -f 1`
MVN="mvn"
set -ex
+mvn clean
+
for i in `seq 1 99`;
do
OUTDIR=${TEST_NAME}.${i}
@@ -35,7 +37,7 @@ do
echo
echo Running ${OUTDIR}
echo
- time ${MVN} test -DskipShade -Dtest=${TEST_PATTERN} 2>&1 | tee ${OUTF}
+ time ${MVN} test -Dtest=${TEST_PATTERN} 2>&1 | tee ${OUTF}
find */target/surefire-reports/ -name \*${TEST_NAME}\* | xargs -I{} cp {} ${OUTDIR}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index 8fb0987..abdfd41 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -21,6 +21,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.JavaUtils;
import java.io.Closeable;
import java.io.IOException;
@@ -33,6 +34,12 @@ public interface RaftClientRpc extends Closeable {
throw new UnsupportedOperationException(getClass() + " does not support this method.");
}
+ /** Async call to send a request. */
+ default CompletableFuture<RaftClientReply> sendRequestAsyncUnordered(RaftClientRequest request) {
+ throw new UnsupportedOperationException(getClass() + " does not support "
+ + JavaUtils.getCurrentStackTraceElement().getMethodName());
+ }
+
/** Send a request. */
RaftClientReply sendRequest(RaftClientRequest request) throws IOException;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index c14417d..b49fbbf 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -31,6 +31,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongFunction;
@@ -44,24 +45,39 @@ import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.
final class RaftClientImpl implements RaftClient {
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
- private static long nextCallId() {
+ static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
- static class PendingAsyncRequest implements SlidingWindow.Request<RaftClientReply> {
- private final long seqNum;
- private final LongFunction<RaftClientRequest> requestConstructor;
+ static class PendingClientRequest {
+ private final Supplier<RaftClientRequest> requestConstructor;
private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
- private volatile int attemptCount;
+ private final AtomicInteger attemptCount = new AtomicInteger();
- PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
- this.seqNum = seqNum;
+ PendingClientRequest(Supplier<RaftClientRequest> requestConstructor) {
this.requestConstructor = requestConstructor;
}
RaftClientRequest newRequest() {
- attemptCount++;
- return requestConstructor.apply(seqNum);
+ attemptCount.incrementAndGet();
+ return requestConstructor.get();
+ }
+
+ CompletableFuture<RaftClientReply> getReplyFuture() {
+ return replyFuture;
+ }
+
+ int getAttemptCount() {
+ return attemptCount.get();
+ }
+ }
+
+ static class PendingAsyncRequest extends PendingClientRequest implements SlidingWindow.Request<RaftClientReply> {
+ private final long seqNum;
+
+ PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
+ super(() -> requestConstructor.apply(seqNum));
+ this.seqNum = seqNum;
}
@Override
@@ -71,25 +87,17 @@ final class RaftClientImpl implements RaftClient {
@Override
public boolean hasReply() {
- return replyFuture.isDone();
+ return getReplyFuture().isDone();
}
@Override
public void setReply(RaftClientReply reply) {
- replyFuture.complete(reply);
+ getReplyFuture().complete(reply);
}
@Override
public void fail(Exception e) {
- replyFuture.completeExceptionally(e);
- }
-
- CompletableFuture<RaftClientReply> getReplyFuture() {
- return replyFuture;
- }
-
- int getAttemptCount() {
- return attemptCount;
+ getReplyFuture().completeExceptionally(e);
}
@Override
@@ -133,6 +141,14 @@ final class RaftClientImpl implements RaftClient {
return clientId;
}
+ RetryPolicy getRetryPolicy() {
+ return retryPolicy;
+ }
+
+ TimeoutScheduler getScheduler() {
+ return scheduler;
+ }
+
private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftClientRequest request) {
return getSlidingWindow(request.is(STALEREAD)? request.getServerId(): null);
}
@@ -159,7 +175,7 @@ final class RaftClientImpl implements RaftClient {
@Override
public CompletableFuture<RaftClientReply> sendWatchAsync(long index, ReplicationLevel replication) {
- return sendAsync(RaftClientRequest.watchRequestType(index, replication), null, null);
+ return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, replication), this);
}
private CompletableFuture<RaftClientReply> sendAsync(
@@ -183,7 +199,7 @@ final class RaftClientImpl implements RaftClient {
).whenComplete((r, e) -> asyncRequestSemaphore.release());
}
- private RaftClientRequest newRaftClientRequest(
+ RaftClientRequest newRaftClientRequest(
RaftPeerId server, long callId, long seq, Message message, RaftClientRequest.Type type) {
return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
callId, seq, message, type);
@@ -288,10 +304,10 @@ final class RaftClientImpl implements RaftClient {
return;
}
if (reply == null) {
- LOG.debug("schedule attempt #{} with policy {} for {}", pending.getAttemptCount(), retryPolicy, request);
+ LOG.debug("schedule* attempt #{} with policy {} for {}", pending.getAttemptCount(), retryPolicy, request);
scheduler.onTimeout(retryPolicy.getSleepTime(),
() -> getSlidingWindow(request).retry(pending, this::sendRequestWithRetryAsync),
- LOG, () -> "Failed to retry " + request);
+ LOG, () -> "Failed* to retry " + request);
} else {
f.complete(reply);
}
@@ -323,7 +339,7 @@ final class RaftClientImpl implements RaftClient {
LOG.debug("{}: send* {}", clientId, request);
return clientRpc.sendRequestAsync(request).thenApply(reply -> {
LOG.debug("{}: receive* {}", clientId, reply);
- reply = handleNotLeaderException(request, reply);
+ reply = handleNotLeaderException(request, reply, true);
if (reply != null) {
getSlidingWindow(request).receiveReply(
request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
@@ -333,16 +349,16 @@ final class RaftClientImpl implements RaftClient {
return reply;
}).exceptionally(e -> {
if (LOG.isTraceEnabled()) {
- LOG.trace(clientId + ": Failed " + request, e);
+ LOG.trace(clientId + ": Failed* " + request, e);
} else {
- LOG.debug("{}: Failed {} with {}", clientId, request, e);
+ LOG.debug("{}: Failed* {} with {}", clientId, request, e);
}
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
if (!retryPolicy.shouldRetry(attemptCount)) {
handleAsyncRetryFailure(request, attemptCount);
} else {
- handleIOException(request, (IOException) e, null);
+ handleIOException(request, (IOException) e, null, true);
}
return null;
}
@@ -350,9 +366,14 @@ final class RaftClientImpl implements RaftClient {
});
}
- private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
- final RaftRetryFailureException rfe = new RaftRetryFailureException(
+ static RaftRetryFailureException newRaftRetryFailureException(
+ RaftClientRequest request, int attemptCount, RetryPolicy retryPolicy) {
+ return new RaftRetryFailureException(
"Failed " + request + " for " + (attemptCount-1) + " attempts with " + retryPolicy);
+ }
+
+ private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
+ final RaftRetryFailureException rfe = newRaftRetryFailureException(request, attemptCount, retryPolicy);
getSlidingWindow(request).fail(request.getSeqNum(), rfe);
}
@@ -365,10 +386,10 @@ final class RaftClientImpl implements RaftClient {
} catch (GroupMismatchException gme) {
throw gme;
} catch (IOException ioe) {
- handleIOException(request, ioe, null);
+ handleIOException(request, ioe, null, false);
}
LOG.debug("{}: receive {}", clientId, reply);
- reply = handleNotLeaderException(request, reply);
+ reply = handleNotLeaderException(request, reply, false);
reply = handleStateMachineException(reply, Function.identity());
return reply;
}
@@ -388,7 +409,8 @@ final class RaftClientImpl implements RaftClient {
* @return null if the reply is null or it has {@link NotLeaderException};
* otherwise return the same reply.
*/
- private RaftClientReply handleNotLeaderException(RaftClientRequest request, RaftClientReply reply) {
+ RaftClientReply handleNotLeaderException(RaftClientRequest request, RaftClientReply reply,
+ boolean resetSlidingWindow) {
if (reply == null) {
return null;
}
@@ -396,10 +418,15 @@ final class RaftClientImpl implements RaftClient {
if (nle == null) {
return reply;
}
+ return handleNotLeaderException(request, nle, resetSlidingWindow);
+ }
+
+ RaftClientReply handleNotLeaderException(RaftClientRequest request, NotLeaderException nle,
+ boolean resetSlidingWindow) {
refreshPeers(Arrays.asList(nle.getPeers()));
final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
: nle.getSuggestedLeader().getId();
- handleIOException(request, nle, newLeader);
+ handleIOException(request, nle, newLeader, resetSlidingWindow);
return null;
}
@@ -412,25 +439,29 @@ final class RaftClientImpl implements RaftClient {
}
}
- private void handleIOException(RaftClientRequest request, IOException ioe,
- RaftPeerId newLeader) {
+ void handleIOException(RaftClientRequest request, IOException ioe,
+ RaftPeerId newLeader, boolean resetSlidingWindow) {
LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
clientId, newLeader, request, ioe);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace", new Throwable("TRACE"));
}
- getSlidingWindow(request).resetFirstSeqNum();
+ if (resetSlidingWindow) {
+ getSlidingWindow(request).resetFirstSeqNum();
+ }
if (ioe instanceof LeaderNotReadyException) {
return;
}
final RaftPeerId oldLeader = request.getServerId();
- final boolean stillLeader = oldLeader.equals(leaderId);
+ final RaftPeerId curLeader = request.getServerId();
+ final boolean stillLeader = oldLeader.equals(curLeader);
if (newLeader == null && stillLeader) {
newLeader = CollectionUtils.random(oldLeader,
CollectionUtils.as(peers, RaftPeer::getId));
}
+ LOG.debug("{}: oldLeader={}, curLeader={}, newLeader{}", clientId, oldLeader, curLeader, newLeader);
final boolean changeLeader = newLeader != null && stillLeader;
if (changeLeader) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
new file mode 100644
index 0000000..d4ab14d
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** Send unordered asynchronous requests to a raft service. */
+public interface UnorderedAsync {
+ Logger LOG = LoggerFactory.getLogger(UnorderedAsync.class);
+
+ static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, RaftClientImpl client) {
+ final long callId = RaftClientImpl.nextCallId();
+ final PendingClientRequest pending = new PendingClientRequest(
+ () -> client.newRaftClientRequest(null, callId, -1L, null, type));
+ sendRequestWithRetry(pending, client);
+ return pending.getReplyFuture()
+ .thenApply(reply -> RaftClientImpl.handleStateMachineException(reply, CompletionException::new));
+ }
+
+ static void sendRequestWithRetry(PendingClientRequest pending, RaftClientImpl client) {
+ final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
+ if (f.isDone()) {
+ return;
+ }
+
+ final RaftClientRequest request = pending.newRequest();
+ final int attemptCount = pending.getAttemptCount();
+
+ final ClientId clientId = client.getId();
+ LOG.debug("{}: attempt #{} send~ {}", clientId, attemptCount, request);
+ client.getClientRpc().sendRequestAsyncUnordered(request).whenCompleteAsync((reply, e) -> {
+ try {
+ LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount, reply);
+ reply = client.handleNotLeaderException(request, reply, false);
+ if (reply != null) {
+ f.complete(reply);
+ return;
+ }
+ final RetryPolicy retryPolicy = client.getRetryPolicy();
+ if (!retryPolicy.shouldRetry(attemptCount)) {
+ f.completeExceptionally(RaftClientImpl.newRaftRetryFailureException(request, attemptCount, retryPolicy));
+ return;
+ }
+
+ if (e != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(clientId + ": attempt #" + attemptCount + " failed~ " + request, e);
+ } else {
+ LOG.debug("{}: attempt #{} failed {} with {}", clientId, attemptCount, request, e);
+ }
+ e = JavaUtils.unwrapCompletionException(e);
+
+ if (e instanceof IOException) {
+ if (e instanceof NotLeaderException) {
+ client.handleNotLeaderException(request, (NotLeaderException) e, false);
+ } else if (!(e instanceof GroupMismatchException)) {
+ client.handleIOException(request, (IOException) e, null, false);
+ }
+ } else {
+ if (!client.getClientRpc().handleException(request.getServerId(), e, false)) {
+ f.completeExceptionally(e);
+ return;
+ }
+ }
+ }
+
+ LOG.debug("schedule retry for attempt #{}, policy={}, request={}", attemptCount, retryPolicy, request);
+ client.getScheduler().onTimeout(retryPolicy.getSleepTime(), () -> sendRequestWithRetry(pending, client),
+ LOG, () -> clientId + ": Failed~ to retry " + request);
+ } catch (Throwable t) {
+ LOG.error(clientId + ": XXX Failed " + request, t);
+ f.completeExceptionally(t);
+ }
+ });
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 7a9574f..826eeee 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -92,11 +92,6 @@ public class RaftClientReply extends RaftClientMessage {
request.getCallId(), false, request.getMessage(), nre, nre.getLogIndex(), commitInfos);
}
- public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) {
- this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(),
- reply.getCallId(), false, reply.getMessage(), nre, reply.getLogIndex(), reply.getCommitInfos());
- }
-
/**
* Get the commit information for the entire group.
* The commit information may be unavailable for exception reply.
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
index 6effb30..bfdff88 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
@@ -25,6 +25,10 @@ import java.io.IOException;
public class TimeoutIOException extends IOException {
static final long serialVersionUID = 1L;
+ public TimeoutIOException(String message) {
+ super(message);
+ }
+
public TimeoutIOException(String message, Throwable throwable) {
super(message, throwable);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index cb49847..a215d3d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -62,7 +62,6 @@ public interface CollectionUtils {
static <T> T random(final T given, Iterable<T> iteration) {
Objects.requireNonNull(given, "given == null");
Objects.requireNonNull(iteration, "iteration == null");
- Preconditions.assertTrue(iteration.iterator().hasNext(), "iteration is empty");
final List<T> list = StreamSupport.stream(iteration.spliterator(), false)
.filter(e -> !given.equals(e))
@@ -92,10 +91,11 @@ public interface CollectionUtils {
return as(Arrays.asList(array), converter);
}
- static <K, V> void putNew(K key, V value, Map<K, V> map, Supplier<String> name) {
+ static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<String> name) {
final V returned = map.put(key, value);
Preconditions.assertTrue(returned == null,
() -> "Entry already exists for key " + key + " in map " + name.get());
+ return value;
}
static <K, V> void replaceExisting(K key, V oldValue, V newValue, Map<K, V> map, Supplier<String> name) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 916d27a..d004c3f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -27,8 +27,8 @@ import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.Collection;
import java.util.Date;
-import java.util.List;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
@@ -74,6 +74,11 @@ public interface JavaUtils {
return trace[3];
}
+ static StackTraceElement getCurrentStackTraceElement() {
+ final StackTraceElement[] trace = Thread.currentThread().getStackTrace();
+ return trace[2];
+ }
+
static <T extends Throwable> void runAsUnchecked(CheckedRunnable<T> runnable) {
runAsUnchecked(runnable, RuntimeException::new);
}
@@ -238,7 +243,7 @@ public interface JavaUtils {
return t instanceof CompletionException && t.getCause() != null? t.getCause(): t;
}
- static <T> CompletableFuture<Void> allOf(List<CompletableFuture<T>> futures) {
+ static <T> CompletableFuture<Void> allOf(Collection<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY));
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 813fac7..b2720ff 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -121,9 +121,11 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
LOG.debug("{}: reset proxy for {}", name, id );
synchronized (resetLock) {
final PeerAndProxy pp = peers.remove(id);
- final RaftPeer peer = pp.getPeer();
- pp.close();
- computeIfAbsent(peer);
+ if (pp != null) {
+ final RaftPeer peer = pp.getPeer();
+ pp.close();
+ computeIfAbsent(peer);
+ }
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 482a8de..ce524db 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -38,6 +38,8 @@ import java.util.function.Supplier;
public interface GrpcUtil {
Metadata.Key<String> EXCEPTION_TYPE_KEY =
Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
+ Metadata.Key<byte[]> EXCEPTION_OBJECT_KEY =
+ Metadata.Key.of("exception-object-bin", Metadata.BINARY_BYTE_MARSHALLER);
Metadata.Key<String> CALL_ID =
Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER);
@@ -50,6 +52,7 @@ public interface GrpcUtil {
Metadata trailers = new Metadata();
trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
+ trailers.put(EXCEPTION_OBJECT_KEY, IOUtils.object2Bytes(t));
if (callId > 0) {
trailers.put(CALL_ID, String.valueOf(callId));
}
@@ -74,8 +77,21 @@ public interface GrpcUtil {
static IOException tryUnwrapException(StatusRuntimeException se) {
final Metadata trailers = se.getTrailers();
+ if (trailers == null) {
+ return null;
+ }
+
+ final byte[] bytes = trailers.get(EXCEPTION_OBJECT_KEY);
+ if (bytes != null) {
+ try {
+ return IOUtils.bytes2Object(bytes, IOException.class);
+ } catch (Exception e) {
+ se.addSuppressed(e);
+ }
+ }
+
final Status status = se.getStatus();
- if (trailers != null && status != null) {
+ if (status != null) {
final String className = trailers.get(EXCEPTION_TYPE_KEY);
if (className != null) {
try {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 8e6503d..cfd4ce1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -31,6 +31,7 @@ import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc;
@@ -66,6 +67,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
public class GrpcClientProtocolClient implements Closeable {
@@ -76,7 +78,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final ManagedChannel channel;
private final TimeDuration requestTimeoutDuration;
- private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+ private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(3);
private final RaftClientProtocolServiceBlockingStub blockingStub;
private final RaftClientProtocolServiceStub asyncStub;
@@ -84,9 +86,9 @@ public class GrpcClientProtocolClient implements Closeable {
private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
- public GrpcClientProtocolClient(ClientId id, RaftPeer target,
- RaftProperties properties,
- GrpcTlsConfig tlsConf) {
+ private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers = new AtomicReference<>();
+
+ GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties, GrpcTlsConfig tlsConf) {
this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
@@ -126,10 +128,8 @@ public class GrpcClientProtocolClient implements Closeable {
@Override
public void close() {
- final AsyncStreamObservers observers = appendStreamObservers.get();
- if (observers != null) {
- observers.close();
- }
+ Optional.ofNullable(appendStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
+ Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
channel.shutdownNow();
}
@@ -181,22 +181,79 @@ public class GrpcClientProtocolClient implements Closeable {
}
AsyncStreamObservers getAppendStreamObservers() {
- return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers());
+ return appendStreamObservers.updateAndGet(
+ a -> a != null? a : new AsyncStreamObservers(appendStreamObservers, this::append));
+ }
+
+ AsyncStreamObservers getUnorderedAsyncStreamObservers() {
+ return unorderedStreamObservers.updateAndGet(
+ a -> a != null? a : new AsyncStreamObservers(unorderedStreamObservers, asyncStub::unordered));
}
public RaftPeer getTarget() {
return target;
}
- class AsyncStreamObservers implements Closeable {
+ class ReplyMap {
+ private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> map
+ = new AtomicReference<>(new ConcurrentHashMap<>());
+
+ // synchronized to avoid putNew after getAndSetNull
+ synchronized CompletableFuture<RaftClientReply> putNew(long callId) {
+ return Optional.ofNullable(map.get())
+ .map(m -> CollectionUtils.putNew(callId, new CompletableFuture<>(), m, this::toString))
+ .orElse(null);
+ }
+
+ Optional<CompletableFuture<RaftClientReply>> remove(long callId) {
+ return Optional.ofNullable(map.get()).map(m -> m.remove(callId));
+ }
+
+ // synchronized to avoid putNew after getAndSetNull
+ synchronized Map<Long, CompletableFuture<RaftClientReply>> getAndSetNull() {
+ return map.getAndSet(null);
+ }
+
+ @Override
+ public String toString() {
+ return getName() + ":" + getClass().getSimpleName();
+ }
+ }
+
+ static class RequestStreamer {
+ private final AtomicReference<StreamObserver<RaftClientRequestProto>> streamObserver;
+
+ RequestStreamer(StreamObserver<RaftClientRequestProto> streamObserver) {
+ this.streamObserver = new AtomicReference<>(streamObserver);
+ }
+
+ synchronized boolean onNext(RaftClientRequestProto request) {
+ final StreamObserver<RaftClientRequestProto> s = streamObserver.get();
+ if (s != null) {
+ s.onNext(request);
+ return true;
+ }
+ return false;
+ }
+
+ synchronized void onCompleted() {
+ final StreamObserver<RaftClientRequestProto> s = streamObserver.getAndSet(null);
+ if (s != null) {
+ s.onCompleted();
+ }
+ }
+ }
+
+ class AsyncStreamObservers {
/** Request map: callId -> future */
- private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
+ private final ReplyMap replies = new ReplyMap();
private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
@Override
public void onNext(RaftClientReplyProto proto) {
final long callId = proto.getRpcReply().getCallId();
try {
final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
+ LOG.info("{}: receive {}", getName(), reply);
final NotLeaderException nle = reply.getNotLeaderException();
if (nle != null) {
completeReplyExceptionally(nle, NotLeaderException.class.getName());
@@ -219,46 +276,53 @@ public class GrpcClientProtocolClient implements Closeable {
completeReplyExceptionally(null, "completed");
}
};
- private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
+ private final RequestStreamer requestStreamer;
+ private final AtomicReference<AsyncStreamObservers> ref;
+
+ AsyncStreamObservers(AtomicReference<AsyncStreamObservers> ref,
+ Function<StreamObserver<RaftClientReplyProto>, StreamObserver<RaftClientRequestProto>> f) {
+ this.requestStreamer = new RequestStreamer(f.apply(replyStreamObserver));
+ this.ref = ref;
+ }
CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
- final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
- if (map == null) {
+ final CompletableFuture<RaftClientReply> f = replies.putNew(request.getCallId());
+ if (f == null) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(getName() + " is closed."));
}
- final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
- CollectionUtils.putNew(request.getCallId(), f, map,
- () -> getName() + ":" + getClass().getSimpleName());
try {
- requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
- scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
- () -> "Timeout check failed for client request: " + request);
+ if (!requestStreamer.onNext(ClientProtoUtils.toRaftClientRequestProto(request))) {
+ throw new AlreadyClosedException(getName() + ": the stream is closed.");
+ }
} catch(Throwable t) {
handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
+ return f;
}
+
+ LOG.info("schedule " + requestTimeoutDuration + " timeout check for " + request);
+ scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
+ () -> "Timeout check failed for client request: " + request);
return f;
}
private void timeoutCheck(RaftClientRequest request) {
handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
- new IOException("Request timeout " + requestTimeoutDuration + ": " + request)));
+ new TimeoutIOException("Request timeout " + requestTimeoutDuration + ": " + request)));
}
private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
- Optional.ofNullable(replies.get())
- .map(replyMap -> replyMap.remove(callId))
- .ifPresent(handler);
+ replies.remove(callId).ifPresent(handler);
}
- @Override
- public void close() {
- requestStreamObserver.onCompleted();
+ private void close() {
+ requestStreamer.onCompleted();
completeReplyExceptionally(null, "close");
}
private void completeReplyExceptionally(Throwable t, String event) {
- appendStreamObservers.compareAndSet(this, null);
- final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null);
+ ref.compareAndSet(this, null);
+
+ final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSetNull();
if (map == null) {
return;
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index a099a0a..db082a2 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -32,9 +32,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
@@ -104,65 +108,169 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
return new AppendRequestStreamObserver(responseObserver);
}
+ @Override
+ public StreamObserver<RaftClientRequestProto> unordered(StreamObserver<RaftClientReplyProto> responseObserver) {
+ return new UnorderedRequestStreamObserver(responseObserver);
+ }
+
private final AtomicInteger streamCount = new AtomicInteger();
- private class AppendRequestStreamObserver implements
- StreamObserver<RaftClientRequestProto> {
- private final String name = getId() + "-" + streamCount.getAndIncrement();
+ private abstract class RequestStreamObserver implements StreamObserver<RaftClientRequestProto> {
+ private final String name = getId() + "-" + getClass().getSimpleName() + streamCount.getAndIncrement();
private final StreamObserver<RaftClientReplyProto> responseObserver;
- private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
- = new SlidingWindow.Server<>(name, COMPLETED);
- private final AtomicBoolean isClosed;
+ private final AtomicBoolean isClosed = new AtomicBoolean();
- AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
- LOG.debug("new AppendRequestStreamObserver {}", name);
- this.responseObserver = ro;
- this.isClosed = new AtomicBoolean(false);
+ RequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
+ LOG.debug("new {}", name);
+ this.responseObserver = responseObserver;
}
- void processClientRequestAsync(PendingAppend pending) {
+ String getName() {
+ return name;
+ }
+
+ synchronized void responseNext(RaftClientReplyProto reply) {
+ responseObserver.onNext(reply);
+ }
+
+ synchronized void responseCompleted() {
+ responseObserver.onCompleted();
+ }
+
+ synchronized void responseError(Throwable t) {
+ responseObserver.onError(t);
+ }
+
+
+ boolean setClose() {
+ return isClosed.compareAndSet(false, true);
+ }
+
+ CompletableFuture<Void> processClientRequest(RaftClientRequest request, Consumer<RaftClientReply> replyHandler) {
try {
- protocol.submitClientRequestAsync(pending.getRequest()
- ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
- pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
+ return protocol.submitClientRequestAsync(request
+ ).thenAcceptAsync(replyHandler
).exceptionally(exception -> {
// TODO: the exception may be from either raft or state machine.
// Currently we skip all the following responses when getting an
// exception from the state machine.
- responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
+ responseError(exception, () -> "processClientRequest for " + request);
return null;
});
} catch (IOException e) {
- throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
+ throw new CompletionException("Failed processClientRequest for " + request + " in " + name, e);
}
}
+ abstract void processClientRequest(RaftClientRequest request);
+
@Override
public void onNext(RaftClientRequestProto request) {
try {
final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
- final PendingAppend p = new PendingAppend(r);
- slidingWindow.receivedRequest(p, this::processClientRequestAsync);
+ processClientRequest(r);
} catch (Throwable e) {
- responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
+ responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request) + " in " + name);
}
}
- private void sendReply(PendingAppend ready) {
- Preconditions.assertTrue(ready.hasReply());
- if (ready == COMPLETED) {
- close();
- } else {
- LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
- responseObserver.onNext(
- ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
+ @Override
+ public void onError(Throwable t) {
+ // for now we just log a msg
+ GrpcUtil.warn(LOG, () -> name + ": onError", t);
+ }
+
+
+ boolean responseError(Throwable t, Supplier<String> message) {
+ if (setClose()) {
+ t = JavaUtils.unwrapCompletionException(t);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(name + ": Failed " + message.get(), t);
}
+ responseError(GrpcUtil.wrapException(t));
+ return true;
+ }
+ return false;
+ }
+ }
+
+ private class UnorderedRequestStreamObserver extends RequestStreamObserver {
+ /** Map: callId -> futures (seqNum is not set for unordered requests) */
+ private final Map<Long, CompletableFuture<Void>> futures = new HashMap<>();
+
+ UnorderedRequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
+ super(responseObserver);
+ }
+
+ @Override
+ void processClientRequest(RaftClientRequest request) {
+ final CompletableFuture<Void> f = processClientRequest(request, reply -> {
+ if (!reply.isSuccess()) {
+ LOG.info("Failed " + request + ", reply=" + reply);
+ }
+ final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
+ responseNext(proto);
+ });
+ final long callId = request.getCallId();
+ put(callId, f);
+ f.thenAccept(dummy -> remove(callId));
+ }
+
+ private synchronized void put(long callId, CompletableFuture<Void> f) {
+ futures.put(callId, f);
+ }
+ private synchronized void remove(long callId) {
+ futures.remove(callId);
+ }
+
+ private synchronized CompletableFuture<Void> allOfFutures() {
+ return JavaUtils.allOf(futures.values());
+ }
+
+ @Override
+ public void onCompleted() {
+ allOfFutures().thenAccept(dummy -> {
+ if (setClose()) {
+ LOG.debug("{}: close", getName());
+ responseCompleted();
+ }
+ });
+ }
+ }
+
+ private class AppendRequestStreamObserver extends RequestStreamObserver {
+ private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
+ = new SlidingWindow.Server<>(getName(), COMPLETED);
+
+ AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
+ super(responseObserver);
+ }
+
+ void processClientRequest(PendingAppend pending) {
+ final long seq = pending.getSeqNum();
+ processClientRequest(pending.getRequest(),
+ reply -> slidingWindow.receiveReply(seq, reply, this::sendReply, this::processClientRequest));
+ }
+
+ @Override
+ void processClientRequest(RaftClientRequest r) {
+ slidingWindow.receivedRequest(new PendingAppend(r), this::processClientRequest);
+ }
+
+ private void sendReply(PendingAppend ready) {
+ Preconditions.assertTrue(ready.hasReply());
+ if (ready == COMPLETED) {
+ close();
+ } else {
+ LOG.debug("{}: sendReply seq={}, {}", getName(), ready.getSeqNum(), ready.getReply());
+ responseNext(ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
+ }
}
@Override
public void onError(Throwable t) {
// for now we just log a msg
- GrpcUtil.warn(LOG, () -> name + ": onError", t);
+ GrpcUtil.warn(LOG, () -> getName() + ": onError", t);
slidingWindow.close();
}
@@ -174,22 +282,20 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
}
private void close() {
- if (isClosed.compareAndSet(false, true)) {
- LOG.debug("{}: close", name);
- responseObserver.onCompleted();
+ if (setClose()) {
+ LOG.debug("{}: close", getName());
+ responseCompleted();
slidingWindow.close();
}
}
- void responseError(Throwable t, Supplier<String> message) {
- if (isClosed.compareAndSet(false, true)) {
- t = JavaUtils.unwrapCompletionException(t);
- if (LOG.isDebugEnabled()) {
- LOG.debug(name + ": Failed " + message.get(), t);
- }
- responseObserver.onError(GrpcUtil.wrapException(t));
+ @Override
+ boolean responseError(Throwable t, Supplier<String> message) {
+ if (super.responseError(t, message)) {
slidingWindow.close();
+ return true;
}
+ return false;
}
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 25fcc85..a63c2af 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,6 +24,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
@@ -57,10 +58,6 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
this.tlsConfig = tlsConfig;
}
- public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
- this(clientId, properties, null);
- }
-
@Override
public CompletableFuture<RaftClientReply> sendRequestAsync(
RaftClientRequest request) {
@@ -75,6 +72,19 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
}
@Override
+ public CompletableFuture<RaftClientReply> sendRequestAsyncUnordered(RaftClientRequest request) {
+ final RaftPeerId serverId = request.getServerId();
+ try {
+ final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
+ // Reuse the same grpc stream for all async calls.
+ return proxy.getUnorderedAsyncStreamObservers().onNext(request);
+ } catch (Throwable t) {
+ LOG.error(clientId + ": XXX Failed " + request, t);
+ return JavaUtils.completeExceptionally(t);
+ }
+ }
+
+ @Override
public RaftClientReply sendRequest(RaftClientRequest request)
throws IOException {
final RaftPeerId serverId = request.getServerId();
@@ -149,4 +159,20 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
}
return proto;
}
+
+ @Override
+ public boolean handleException(RaftPeerId serverId, Throwable e, boolean reconnect) {
+ final Throwable cause = e.getCause();
+ if (e instanceof IOException && cause instanceof StatusRuntimeException) {
+ if (!((StatusRuntimeException) cause).getStatus().isOk()) {
+ reconnect = true;
+ }
+ } else if (e instanceof IllegalArgumentException) {
+ if (e.getMessage().contains("null frame before EOS")) {
+ reconnect = true;
+ }
+ }
+ LOG.debug("{}->{}: reconnect? {}, e={}, cause={}", clientId, serverId, reconnect, e, cause);
+ return super.handleException(serverId, e, reconnect);
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index ed96468..b2d2e45 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -180,7 +180,7 @@ public class GrpcLogAppender extends LogAppender {
private void timeoutAppendRequest(AppendEntriesRequestProto request) {
AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId());
if (pendingRequest != null) {
- LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest()));
+ LOG.warn( "{}: appendEntries Timeout, request={}", this, ServerProtoUtils.toString(pendingRequest));
}
}
@@ -258,8 +258,8 @@ public class GrpcLogAppender extends LogAppender {
@Override
public void onCompleted() {
- LOG.info("{} stops appending log entries to follower {}", server.getId(),
- follower);
+ LOG.info("{}: follower {} response Completed", server.getId(), follower);
+ resetClient(null);
}
}
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index 950a73e..f177609 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -31,6 +31,10 @@ service RaftClientProtocolService {
// A client-to-server stream RPC to append data
rpc append(stream ratis.common.RaftClientRequestProto)
returns (stream ratis.common.RaftClientReplyProto) {}
+
+ // A client-to-server stream RPC for unordered async requested
+ rpc unordered(stream ratis.common.RaftClientRequestProto)
+ returns (stream ratis.common.RaftClientReplyProto) {}
}
service RaftServerProtocolService {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 5e9dbc6..8ece134 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -308,6 +308,8 @@ public class LeaderState {
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof NotReplicatedException) {
return new RaftClientReply(request, (NotReplicatedException)e, server.getCommitInfos());
+ } else if (e instanceof NotLeaderException) {
+ return new RaftClientReply(request, (NotLeaderException)e, server.getCommitInfos());
} else {
throw new CompletionException(e);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 512fd18..29dee9b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -61,6 +61,10 @@ public interface ServerProtoUtils {
return TermIndex.toString(entry.getTerm(), entry.getIndex());
}
+ static String toTermIndexString(TermIndexProto proto) {
+ return TermIndex.toString(proto.getTerm(), proto.getIndex());
+ }
+
static String toLogEntryString(LogEntryProto entry) {
if (entry == null) {
return null;
@@ -86,7 +90,17 @@ public interface ServerProtoUtils {
: "" + Arrays.stream(entries).map(ServerProtoUtils::toLogEntryString)
.collect(Collectors.toList());
}
-
+ static String toShortString(List<LogEntryProto> entries) {
+ return entries.size() == 0? "<empty>"
+ : "size=" + entries.size() + ", first=" + toLogEntryString(entries.get(0));
+ }
+ static String toString(AppendEntriesRequestProto proto) {
+ return ProtoUtils.toString(proto.getServerRequest()) + "-t" + proto.getLeaderTerm()
+ + ", previous=" + toTermIndexString(proto.getPreviousLog())
+ + ", leaderCommit=" + proto.getLeaderCommit()
+ + ", initializing? " + proto.getInitializing()
+ + ", entries: " + toShortString(proto.getEntriesList());
+ }
static String toString(AppendEntriesReplyProto reply) {
return toString(reply.getServerReply()) + "," + reply.getResult()
+ ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm()
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 155122e..b1971e5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -88,6 +88,10 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
state.assertOpen();
}
+ public boolean isOpened() {
+ return state.isOpened();
+ }
+
/**
* Update the last committed index.
* @param majorityIndex the index that has achieved majority.
@@ -352,7 +356,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
@Override
public String toString() {
- return getName() + ":" + state;
+ return getName() + ":" + state + ":c" + getLastCommittedIndex();
}
public static class Metadata {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 588b819..5d151c6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -413,4 +413,16 @@ public class SegmentedRaftLog extends RaftLog {
RaftLogCache getRaftLogCache() {
return cache;
}
+
+ @Override
+ public String toString() {
+ try(AutoCloseableLock readLock = readLock()) {
+ if (isOpened()) {
+ return super.toString() + ",f" + getLatestFlushedIndex()
+ + ",i" + Optional.ofNullable(getLastEntryTermIndex()).map(TermIndex::getIndex).orElse(0L);
+ } else {
+ return super.toString();
+ }
+ }
+ }
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 8d143eb..63aad09 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -72,23 +72,13 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
static class TestParameters {
final int numMessages;
final RaftClient writeClient;
- final RaftClient watchMajorityClient;
- final RaftClient watchAllClient;
- final RaftClient watchMajorityCommittedClient;
- final RaftClient watchAllCommittedClient;
final MiniRaftCluster cluster;
final Logger log;
TestParameters(int numMessages, RaftClient writeClient,
- RaftClient watchMajorityClient, RaftClient watchAllClient,
- RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient,
MiniRaftCluster cluster, Logger log) {
this.numMessages = numMessages;
this.writeClient = writeClient;
- this.watchMajorityClient = watchMajorityClient;
- this.watchAllClient = watchAllClient;
- this.watchMajorityCommittedClient = watchMajorityCommittedClient;
- this.watchAllCommittedClient = watchAllCommittedClient;
this.cluster = cluster;
this.log = log;
}
@@ -106,10 +96,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
final long logIndex = reply.getLogIndex();
log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex);
watchFuture.complete(new WatchReplies(logIndex,
- watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
- watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
- watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED),
- watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED),
+ writeClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
+ writeClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
+ writeClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED),
+ writeClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED),
log));
});
}
@@ -121,18 +111,13 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
}
}
- static void runTest(CheckedConsumer<TestParameters, Exception> testCase, MiniRaftCluster cluster, Logger LOG) throws Exception {
- try(final RaftClient writeClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
- final RaftClient watchMajorityClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
- final RaftClient watchAllClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
- final RaftClient watchMajorityCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
- final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
+ static void runTest(CheckedConsumer<TestParameters, Exception> testCase, MiniRaftCluster cluster, Logger LOG)
+ throws Exception {
+ try(final RaftClient client = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
final int[] numMessages = {1, 10, 100};
for(int i = 0; i < 5; i++) {
final int n = numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
- final TestParameters p = new TestParameters(
- n, writeClient, watchMajorityClient, watchAllClient,
- watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG);
+ final TestParameters p = new TestParameters(n, client, cluster, LOG);
LOG.info("{}) {}, {}", i, p, cluster.printServers());
testCase.accept(p);
}
@@ -159,26 +144,30 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
}
RaftClientReply getMajority() throws Exception {
- final RaftClientReply reply = majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- log.info("watchMajorityReply({}) = {}", logIndex, reply);
- return reply;
+ return get(majority, "majority");
}
RaftClientReply getMajorityCommitted() throws Exception {
- final RaftClientReply reply = majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- log.info("watchMajorityCommittedReply({}) = {}", logIndex, reply);
- return reply;
+ return get(majorityCommitted, "majorityCommitted");
}
RaftClientReply getAll() throws Exception {
- final RaftClientReply reply = all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- log.info("watchAllReply({}) = {}", logIndex, reply);
- return reply;
+ return get(all, "all");
}
RaftClientReply getAllCommitted() throws Exception {
- final RaftClientReply reply = allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
- log.info("watchAllCommittedReply({}) = {}", logIndex, reply);
+ return get(allCommitted, "allCommitted");
+ }
+
+ RaftClientReply get(CompletableFuture<RaftClientReply> f, String name) throws Exception {
+ final RaftClientReply reply;
+ try {
+ reply = f.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Failed to get {}({})", name, logIndex);
+ throw e;
+ }
+ log.info("{}-Watch({}) returns {}", name, logIndex, reply);
return reply;
}
}
@@ -324,11 +313,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
Assert.assertEquals(numMessages, replies.size());
Assert.assertEquals(numMessages, watches.size());
- // since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED.
+ // since only one follower is blocked commit, requests can be committed MAJORITY and ALL but not ALL_COMMITTED.
checkMajority(replies, watches, LOG);
TimeUnit.SECONDS.sleep(1);
- assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all));
assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted));
// Now change leader
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
index 7b9061b..77e2eda 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,9 +17,21 @@
*/
package org.apache.ratis.grpc;
+import org.apache.log4j.Level;
import org.apache.ratis.WatchRequestTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.impl.UnorderedAsync;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.grpc.client.GrpcClientRpc;
+import org.apache.ratis.util.LogUtils;
public class TestWatchRequestWithGrpc
extends WatchRequestTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
+ {
+ LogUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
+ LogUtils.setLogLevel(GrpcClientRpc.LOG, Level.ALL);
+ LogUtils.setLogLevel(UnorderedAsync.LOG, Level.ALL);
+ LogUtils.setLogLevel(RaftClient.LOG, Level.ALL);
+ }
}
\ No newline at end of file