You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/12/28 06:44:53 UTC
incubator-ratis git commit: RATIS-140. Raft client should reuse the
gRPC stream for all async calls.
Repository: incubator-ratis
Updated Branches:
refs/heads/master f7f97e050 -> 7872f3296
RATIS-140. Raft client should reuse the gRPC stream for all async calls.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/7872f329
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/7872f329
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/7872f329
Branch: refs/heads/master
Commit: 7872f32962b5fdb5229be6bfc36eb73ec01ff79f
Parents: f7f97e0
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Dec 28 14:44:04 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Dec 28 14:44:04 2017 +0800
----------------------------------------------------------------------
.../ratis/client/RaftClientConfigKeys.java | 2 +-
.../ratis/client/impl/ClientProtoUtils.java | 6 +-
.../ratis/client/impl/RaftClientImpl.java | 96 +++--
.../org/apache/ratis/util/SlidingWindow.java | 403 +++++++++++++++++++
.../apache/ratis/grpc/client/GrpcClientRpc.java | 9 +-
.../grpc/client/RaftClientProtocolClient.java | 96 ++++-
.../grpc/client/RaftClientProtocolService.java | 137 +++----
.../java/org/apache/ratis/RaftAsyncTests.java | 3 +-
.../java/org/apache/ratis/RaftTestUtil.java | 5 -
9 files changed, 648 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 03f12cb..bb76910 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -59,7 +59,7 @@ public interface RaftClientConfigKeys {
static int schedulerThreads(RaftProperties properties) {
return getInt(properties::getInt, SCHEDULER_THREADS_KEY,
- SCHEDULER_THREADS_DEFAULT);
+ SCHEDULER_THREADS_DEFAULT, requireMin(1));
}
static void setSchedulerThreads(RaftProperties properties, int schedulerThreads) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 97439ac..a7aaf54 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -209,10 +209,14 @@ public interface ClientProtoUtils {
}
static Message toMessage(final ClientMessageEntryProto p) {
+ return toMessage(p.getContent());
+ }
+
+ static Message toMessage(final ByteString bytes) {
return new Message() {
@Override
public ByteString getContent() {
- return p.getContent();
+ return bytes;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ba1a107..6ee415d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -21,18 +21,18 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.util.*;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
+import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -44,6 +44,45 @@ final class RaftClientImpl implements RaftClient {
return callIdCounter.getAndIncrement() & Long.MAX_VALUE;
}
+ static class PendingAsyncRequest implements SlidingWindow.Request<RaftClientReply> {
+ private final long seqNum;
+ private final LongFunction<RaftClientRequest> requestConstructor;
+ private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+
+ PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
+ this.seqNum = seqNum;
+ this.requestConstructor = requestConstructor;
+ }
+
+ RaftClientRequest newRequest() {
+ return requestConstructor.apply(seqNum);
+ }
+
+ @Override
+ public long getSeqNum() {
+ return seqNum;
+ }
+
+ @Override
+ public boolean hasReply() {
+ return replyFuture.isDone();
+ }
+
+ @Override
+ public void setReply(RaftClientReply reply) {
+ replyFuture.complete(reply);
+ }
+
+ CompletableFuture<RaftClientReply> getReplyFuture() {
+ return replyFuture;
+ }
+
+ @Override
+ public String toString() {
+ return "[seq=" + getSeqNum() + "]";
+ }
+ }
+
private final ClientId clientId;
private final RaftClientRpc clientRpc;
private final Collection<RaftPeer> peers;
@@ -52,7 +91,7 @@ final class RaftClientImpl implements RaftClient {
private volatile RaftPeerId leaderId;
- private final AtomicLong asyncSeqNum = new AtomicLong();
+ private final SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> slidingWindow;
private final ScheduledExecutorService scheduler;
private final Semaphore asyncRequestSemaphore;
@@ -68,13 +107,10 @@ final class RaftClientImpl implements RaftClient {
asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
+ slidingWindow = new SlidingWindow.Client<>(getId());
clientRpc.addServers(peers);
}
- private long nextSeqNum() {
- return asyncSeqNum.getAndIncrement() & Long.MAX_VALUE;
- }
-
@Override
public ClientId getId() {
return clientId;
@@ -100,9 +136,10 @@ final class RaftClientImpl implements RaftClient {
"Interrupted when sending " + message, e));
}
final long callId = nextCallId();
- final long seqNum = nextSeqNum();
- return sendRequestWithRetryAsync(
- () -> new RaftClientRequest(clientId, leaderId, groupId, callId, seqNum, message, readOnly)
+ final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
+ seq -> new RaftClientRequest(clientId, leaderId, groupId, callId, seq, message, readOnly));
+ return slidingWindow.submitNewRequest(constructor, this::sendRequestWithRetryAsync
+ ).getReplyFuture(
).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
).whenComplete((r, e) -> asyncRequestSemaphore.release());
}
@@ -164,13 +201,14 @@ final class RaftClientImpl implements RaftClient {
}
private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync(
- Supplier<RaftClientRequest> supplier) {
- return sendRequestAsync(supplier.get()).thenComposeAsync(reply -> {
- final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
+ PendingAsyncRequest pending) {
+ final RaftClientRequest request = pending.newRequest();
+ final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
+ return sendRequestAsync(request).thenCompose(reply -> {
if (reply == null) {
final TimeUnit unit = retryInterval.getUnit();
- scheduler.schedule(() -> sendRequestWithRetryAsync(supplier)
- .thenApply(r -> f.complete(r)), retryInterval.toLong(unit), unit);
+ scheduler.schedule(() -> slidingWindow.retry(pending, this::sendRequestWithRetryAsync),
+ retryInterval.toLong(unit), unit);
} else {
f.complete(reply);
}
@@ -204,14 +242,23 @@ final class RaftClientImpl implements RaftClient {
LOG.debug("{}: send* {}", clientId, request);
return clientRpc.sendRequestAsync(request).thenApply(reply -> {
LOG.debug("{}: receive* {}", clientId, reply);
- return handleNotLeaderException(request, reply);
+ reply = handleNotLeaderException(request, reply);
+ if (reply != null) {
+ slidingWindow.receiveReply(
+ request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
+ }
+ return reply;
}).exceptionally(e -> {
LOG.debug("{}: Failed {} with {}", clientId, request, e);
- final Throwable cause = e.getCause();
- if (cause instanceof GroupMismatchException) {
- return new RaftClientReply(request, (RaftException) cause);
- } else if (cause instanceof IOException) {
- handleIOException(request, (IOException) cause, null);
+ if (e instanceof CompletionException) {
+ e = e.getCause();
+ }
+ if (e instanceof GroupMismatchException) {
+ throw new CompletionException(e);
+ } else if (e instanceof IOException) {
+ handleIOException(request, (IOException)e, null);
+ } else {
+ throw new CompletionException(e);
}
return null;
});
@@ -281,6 +328,7 @@ final class RaftClientImpl implements RaftClient {
LOG.trace("Stack trace", new Throwable("TRACE"));
}
+ slidingWindow.resetFirstSeqNum();
if (ioe instanceof LeaderNotReadyException) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
new file mode 100644
index 0000000..6ded6f7
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.LongFunction;
+
+/**
+ * A single-client-to-multiple-server sliding window.
+ * The client only talks to a server at any time.
+ * When the current server fails, the client fails over to another server.
+ */
+public interface SlidingWindow {
+ Logger LOG = LoggerFactory.getLogger(SlidingWindow.class);
+
+ interface Request<REPLY> {
+ long getSeqNum();
+
+ void setReply(REPLY reply);
+
+ boolean hasReply();
+ }
+
+ /** A seqNum-to-request map, sorted by seqNum. */
+ class RequestMap<REQUEST extends Request<REPLY>, REPLY> implements Iterable<REQUEST> {
+ private final Object name;
+ /** Request map: seqNum -> request */
+ private final SortedMap<Long, REQUEST> requests = new TreeMap<>();
+
+ RequestMap(Object name) {
+ this.name = name;
+ if (LOG.isDebugEnabled()) {
+ JavaUtils.runRepeatedly(() -> log(), 5, 10, TimeUnit.SECONDS);
+ }
+ }
+
+ Object getName() {
+ return name;
+ }
+
+ boolean isEmpty() {
+ return requests.isEmpty();
+ }
+
+ /**
+ * If the request with the given seqNum is non-replied, return it.
+ * Otherwise, return null.
+ *
+ * A request is non-replied if
+ * (1) it is in the request map, and
+ * (2) it does not has reply.
+ */
+ REQUEST getNonRepliedRequest(long seqNum, String op) {
+ final REQUEST request = requests.get(seqNum);
+ if (request == null) {
+ LOG.debug("{}: {}, seq={} not found in {}", getName(), op, seqNum, this);
+ return null;
+ }
+ if (request.hasReply()) {
+ LOG.debug("{}: {}, seq={} already has replied in {}", getName(), op, seqNum, this);
+ return null;
+ }
+ return request;
+ }
+
+ long firstSeqNum() {
+ return requests.firstKey();
+ }
+
+ long lastSeqNum() {
+ return requests.lastKey();
+ }
+
+ /** Iterate the requests in the order of seqNum. */
+ @Override
+ public Iterator<REQUEST> iterator() {
+ return requests.values().iterator();
+ }
+
+ void putNewRequest(REQUEST request) {
+ final long seqNum = request.getSeqNum();
+ CollectionUtils.putNew(seqNum, request, requests, () -> getName() + ":requests");
+ }
+
+ /**
+ * Set reply for the request with the given seqNum if it is non-replied.
+ * Otherwise, do nothing.
+ *
+ * @return true iff this method does set the reply for the request.
+ */
+ boolean setReply(long seqNum, REPLY reply, String op) {
+ final REQUEST request = getNonRepliedRequest(seqNum, op);
+ if (request == null) {
+ LOG.debug("{}: DUPLICATED reply {} for seq={} in {}", getName(), reply, seqNum, this);
+ return false;
+ }
+
+ LOG.debug("{}: set reply {} for seq={} in {}", getName(), reply, seqNum, this);
+ request.setReply(reply);
+ return true;
+ }
+
+ synchronized void clear() {
+ LOG.debug("close {}", this);
+ requests.clear();
+ }
+
+ synchronized void log() {
+ LOG.debug(this.toString());
+ for(REQUEST r : requests.values()) {
+ LOG.debug(" {}: hasReply? {}", r.getSeqNum(), r.hasReply());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getName() + ": requests" + asString(requests);
+ }
+
+ private static String asString(SortedMap<Long, ?> map) {
+ return map.isEmpty()? "[]": "[" + map.firstKey() + ".." + map.lastKey() + "]";
+ }
+ }
+
+ /**
+ * Client side sliding window.
+ * A client may
+ * (1) allocate seqNum for new requests;
+ * (2) send requests/retries to the server;
+ * (3) receive replies/exceptions from the server;
+ * (4) return the replies/exceptions to client.
+ *
+ * Depend on the replies/exceptions, the client may retry the requests
+ * to the same or a different server.
+ */
+ class Client<REQUEST extends Request<REPLY>, REPLY> {
+ /** The requests in the sliding window. */
+ private final RequestMap<REQUEST, REPLY> requests;
+ /** Delayed requests. */
+ private final SortedMap<Long, Long> delayedRequests = new TreeMap<>();
+
+ /** The seqNum for the next new request. */
+ private long nextSeqNum = 0;
+ /** The seqNum of the first request. */
+ private long firstSeqNum = -1;
+ /** Is the first request replied? */
+ private boolean firstReplied;
+
+ public Client(Object name) {
+ this.requests = new RequestMap<REQUEST, REPLY>(name) {
+ @Override
+ synchronized void log() {
+ LOG.debug(toString());
+ for (REQUEST r : requests) {
+ LOG.debug(" {}: {}", r.getSeqNum(), r.hasReply() ? "replied"
+ : delayedRequests.containsKey(r.getSeqNum()) ? "delayed" : "submitted");
+ }
+ }
+ };
+ }
+
+ @Override
+ public synchronized String toString() {
+ return requests + ", nextSeqNum=" + nextSeqNum
+ + ", firstSubmitted=" + firstSeqNum + ", replied? " + firstReplied
+ + ", delayed=" + delayedRequests.keySet();
+ }
+
+ /**
+ * A new request arrives, create it with {@link #nextSeqNum}
+ * and then try sending it to the server.
+ *
+ * @param requestConstructor use seqNum to create a new request.
+ * @return the new request.
+ */
+ public synchronized REQUEST submitNewRequest(
+ LongFunction<REQUEST> requestConstructor, Consumer<REQUEST> sendMethod) {
+ if (!requests.isEmpty()) {
+ Preconditions.assertTrue(nextSeqNum == requests.lastSeqNum() + 1,
+ () -> "nextSeqNum=" + nextSeqNum + " but " + this);
+ }
+
+ final long seqNum = nextSeqNum++;
+ final REQUEST r = requestConstructor.apply(seqNum);
+ requests.putNewRequest(r);
+
+ final boolean submitted = sendOrDelayRequest(r, sendMethod);
+ LOG.debug("{}: submitting a new request {} in {}? {}",
+ requests.getName(), r, this, submitted? "submitted": "delayed");
+ return r;
+ }
+
+ private boolean sendOrDelayRequest(REQUEST request, Consumer<REQUEST> sendMethod) {
+ final long seqNum = request.getSeqNum();
+ Preconditions.assertTrue(requests.getNonRepliedRequest(seqNum, "sendOrDelayRequest") == request);
+
+ if (firstReplied) {
+ // already received the reply for the first request, submit any request.
+ sendMethod.accept(request);
+ return true;
+ }
+
+ if (firstSeqNum == -1 && seqNum == requests.firstSeqNum()) {
+ // first request is not yet submitted and this is the first request, submit it.
+ LOG.debug("{}: detect firstSubmitted {} in {}", requests.getName(), request, this);
+ firstSeqNum = seqNum;
+ sendMethod.accept(request);
+ return true;
+ }
+
+ // delay other requests
+ CollectionUtils.putNew(seqNum, seqNum, delayedRequests, () -> requests.getName() + ":delayedRequests");
+ return false;
+ }
+
+ /** Receive a retry from an existing request (may out-of-order). */
+ public synchronized void retry(REQUEST request, Consumer<REQUEST> sendMethod) {
+ if (requests.getNonRepliedRequest(request.getSeqNum(), "retry") != request) {
+ // out-dated or invalid retry
+ LOG.debug("{}: Ignore retry {} in {}", requests.getName(), request, this);
+ return;
+ }
+ final boolean submitted = sendOrDelayRequest(request, sendMethod);
+ LOG.debug("{}: submitting a retry {} in {}? {}",
+ requests.getName(), request, this, submitted? "submitted": "delayed");
+ }
+
+ private void removeRepliedFromHead() {
+ for (final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) {
+ final REQUEST r = i.next();
+ if (!r.hasReply()) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Receive a reply with the given seqNum (may out-of-order).
+ * It may trigger the client to send delayed requests.
+ */
+ public synchronized void receiveReply(
+ long seqNum, REPLY reply, Consumer<REQUEST> sendMethod) {
+ if (!requests.setReply(seqNum, reply, "receiveReply")) {
+ return; // request already replied
+ }
+ if (seqNum == firstSeqNum) {
+ firstReplied = true; // received the reply for the first submitted request
+ }
+ removeRepliedFromHead();
+ trySendDelayed(sendMethod);
+ }
+
+ private void trySendDelayed(Consumer<REQUEST> sendMethod) {
+ if (firstReplied) {
+ // after first received, all other requests can be submitted (out-of-order)
+ if (!delayedRequests.isEmpty()) {
+ for (Long seqNum : delayedRequests.keySet()) {
+ sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed"));
+ }
+ delayedRequests.clear();
+ }
+ } else {
+ // Otherwise, submit the first only if it is a delayed request
+ final Iterator<REQUEST> i = requests.iterator();
+ if (i.hasNext()) {
+ final REQUEST r = i.next();
+ final Long delayed = delayedRequests.remove(r.getSeqNum());
+ if (delayed != null) {
+ sendOrDelayRequest(r, sendMethod);
+ }
+ }
+ }
+ }
+
+ /** Reset the {@link #firstSeqNum} The stream has an error. */
+ public synchronized void resetFirstSeqNum() {
+ firstSeqNum = -1;
+ firstReplied = false;
+ LOG.debug("After resetFirstSeqNum: {}", this);
+ }
+ }
+
+ /**
+ * Server side sliding window.
+ * A server may
+ * (1) receive requests from client;
+ * (2) submit the requests for processing;
+ * (3) receive replies from the processing unit;
+ * (4) send replies to the client.
+ */
+ class Server<REQUEST extends Request<REPLY>, REPLY> implements Closeable {
+ /** The requests in the sliding window. */
+ private final RequestMap<REQUEST, REPLY> requests;
+ /** The end of requests */
+ private final REQUEST end;
+
+ private long nextToProcess = -1;
+
+ public Server(Object name, REQUEST end) {
+ this.requests = new RequestMap<>(name);
+ this.end = end;
+ Preconditions.assertTrue(end.getSeqNum() == Long.MAX_VALUE);
+ }
+
+ @Override
+ public synchronized String toString() {
+ return requests + ", nextToProcess=" + nextToProcess;
+ }
+
+ /** A request (or a retry) arrives (may be out-of-order except for the first request). */
+ public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
+ final long seqNum = request.getSeqNum();
+ if (nextToProcess == -1) {
+ nextToProcess = seqNum;
+ LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", requests.getName(), seqNum, this);
+ } else {
+ LOG.debug("{}: got seq={} in {}", requests.getName(), seqNum, this);
+ }
+ requests.putNewRequest(request);
+ processRequestsFromHead(processingMethod);
+ }
+
+ private void processRequestsFromHead(Consumer<REQUEST> processingMethod) {
+ for(REQUEST r : requests) {
+ if (r.getSeqNum() != nextToProcess) {
+ return;
+ }
+ processingMethod.accept(r);
+ nextToProcess++;
+ }
+ }
+
+ /**
+ * Receives a reply for the given seqNum (may out-of-order) from the processor.
+ * It may trigger sending replies to client or processing more requests.
+ */
+ public synchronized void receiveReply(
+ long seqNum, REPLY reply, Consumer<REQUEST> replyMethod, Consumer<REQUEST> processingMethod) {
+ if (!requests.setReply(seqNum, reply, "receiveReply")) {
+ return; // request already replied
+ }
+ sendRepliesFromHead(replyMethod);
+ processRequestsFromHead(processingMethod);
+ }
+
+ private void sendRepliesFromHead(
+ Consumer<REQUEST> replyMethod
+ ) {
+ for(final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) {
+ final REQUEST r = i.next();
+ if (!r.hasReply()) {
+ return;
+ }
+ replyMethod.accept(r);
+ if (r == end) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Signal the end of requests.
+ * @return true if no more outstanding requests.
+ */
+ public synchronized boolean endOfRequests() {
+ if (requests.isEmpty()) {
+ return true;
+ } else {
+ LOG.debug("{}: put end-of-request in {}", requests.getName(), this);
+ requests.putNewRequest(end);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() {
+ requests.clear();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index c5c188e..ea1f204 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -58,7 +58,9 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
RaftClientRequest request) {
final RaftPeerId serverId = request.getServerId();
try {
- return sendRequestAsync(request, getProxies().getProxy(serverId));
+ final RaftClientProtocolClient proxy = getProxies().getProxy(serverId);
+ // Reuse the same grpc stream for all async calls.
+ return proxy.getAppendStreamObservers().onNext(request);
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
@@ -83,7 +85,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
return ClientProtoUtils.toServerInformationReply(
proxy.serverInformation(proto));
} else {
- final CompletableFuture<RaftClientReply> f = sendRequestAsync(request, proxy);
+ final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
// TODO: timeout support
try {
return f.get();
@@ -96,12 +98,13 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
}
}
- private CompletableFuture<RaftClientReply> sendRequestAsync(
+ private CompletableFuture<RaftClientReply> sendRequest(
RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException {
final RaftClientRequestProto requestProto =
toRaftClientRequestProto(request);
final CompletableFuture<RaftClientReplyProto> replyFuture =
new CompletableFuture<>();
+ // create a new grpc stream for each non-async call.
final StreamObserver<RaftClientRequestProto> requestObserver =
proxy.append(new StreamObserver<RaftClientReplyProto>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index ace90f2..0b05475 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -17,9 +17,9 @@
*/
package org.apache.ratis.grpc.client;
+import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.io.grpc.ManagedChannel;
import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
@@ -31,12 +31,17 @@ import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
import org.apache.ratis.util.CheckedSupplier;
+import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class RaftClientProtocolClient implements Closeable {
@@ -49,6 +54,8 @@ public class RaftClientProtocolClient implements Closeable {
private final RaftClientProtocolServiceStub asyncStub;
private final AdminProtocolServiceBlockingStub adminBlockingStub;
+ private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
+
public RaftClientProtocolClient(ClientId id, RaftPeer target) {
this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
@@ -65,6 +72,10 @@ public class RaftClientProtocolClient implements Closeable {
@Override
public void close() {
+ final AsyncStreamObservers observers = appendStreamObservers.get();
+ if (observers != null) {
+ observers.close();
+ }
channel.shutdownNow();
}
@@ -98,7 +109,88 @@ public class RaftClientProtocolClient implements Closeable {
return asyncStub.append(responseHandler);
}
+ AsyncStreamObservers getAppendStreamObservers() {
+ return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers());
+ }
+
public RaftPeer getTarget() {
return target;
}
+
+ class AsyncStreamObservers implements Closeable {
+ /** Request map: callId -> future */
+ private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
+ private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
+ @Override
+ public void onNext(RaftClientReplyProto proto) {
+ final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
+ if (map == null) {
+ LOG.warn("replyStreamObserver onNext map == null");
+ return;
+ }
+ final long callId = proto.getRpcReply().getCallId();
+ try {
+ final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
+ final NotLeaderException nle = reply.getNotLeaderException();
+ if (nle != null) {
+ completeReplyExceptionally(nle, NotLeaderException.class.getName());
+ return;
+ }
+ map.remove(callId).complete(reply);
+ } catch (Throwable t) {
+ map.get(callId).completeExceptionally(t);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ final IOException ioe = RaftGrpcUtil.unwrapIOException(t);
+ completeReplyExceptionally(ioe, "onError");
+ }
+
+ @Override
+ public void onCompleted() {
+ completeReplyExceptionally(null, "completed");
+ }
+ };
+ private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
+
+ CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
+ final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
+ if (map == null) {
+ return JavaUtils.completeExceptionally(new IOException("Already closed."));
+ }
+ final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
+ CollectionUtils.putNew(request.getCallId(), f, map,
+ () -> getName() + ":" + getClass().getSimpleName());
+ try {
+ requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
+ } catch(Throwable t) {
+ f.completeExceptionally(t);
+ }
+ return f;
+ }
+
+ @Override
+ public void close() {
+ requestStreamObserver.onCompleted();
+ completeReplyExceptionally(null, "close");
+ }
+
+ private void completeReplyExceptionally(Throwable t, String event) {
+ appendStreamObservers.compareAndSet(this, null);
+ final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null);
+ if (map == null) {
+ return;
+ }
+ for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) {
+ final CompletableFuture<RaftClientReply> f = entry.getValue();
+ if (!f.isDone()) {
+ f.completeExceptionally(t != null? t
+ : new IOException(getName() + ": Stream " + event
+ + ": no reply for async request cid=" + entry.getKey()));
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index f3ebe0f..6d19920 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -25,17 +25,21 @@ import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SlidingWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import java.io.IOException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase {
- static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class);
+ public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class);
- private static class PendingAppend implements Comparable<PendingAppend> {
+ private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> {
private final RaftClientRequest request;
private volatile RaftClientReply reply;
@@ -43,25 +47,27 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
this.request = request;
}
- boolean isReady() {
+ @Override
+ public boolean hasReply() {
return reply != null || this == COMPLETED;
}
- void setReply(RaftClientReply reply) {
+ @Override
+ public void setReply(RaftClientReply reply) {
this.reply = reply;
}
- RaftClientRequest getRequest() {
- return request;
+ RaftClientReply getReply() {
+ return reply;
}
- long getSeqNum() {
- return request != null? request.getSeqNum(): Long.MAX_VALUE;
+ RaftClientRequest getRequest() {
+ return request;
}
@Override
- public int compareTo(PendingAppend that) {
- return Long.compare(this.getSeqNum(), that.getSeqNum());
+ public long getSeqNum() {
+ return request != null? request.getSeqNum(): Long.MAX_VALUE;
}
@Override
@@ -97,97 +103,84 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
return new AppendRequestStreamObserver(responseObserver);
}
+ private final AtomicInteger streamCount = new AtomicInteger();
+
private class AppendRequestStreamObserver implements
StreamObserver<RaftClientRequestProto> {
- private final List<PendingAppend> pendingList = new LinkedList<>();
+ private final String name = getId() + "-" + streamCount.getAndIncrement();
private final StreamObserver<RaftClientReplyProto> responseObserver;
+ private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
+ = new SlidingWindow.Server<>(name, COMPLETED);
AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
+ LOG.debug("new AppendRequestStreamObserver {}", name);
this.responseObserver = ro;
}
+ void processClientRequestAsync(PendingAppend pending) {
+ try {
+ protocol.submitClientRequestAsync(pending.getRequest()
+ ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
+ pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
+ ).exceptionally(exception -> {
+ // TODO: the exception may be from either raft or state machine.
+ // Currently we skip all the following responses when getting an
+ // exception from the state machine.
+ responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
+ return null;
+ });
+ } catch (IOException e) {
+ throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
+ }
+ }
+
@Override
public void onNext(RaftClientRequestProto request) {
try {
final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
final PendingAppend p = new PendingAppend(r);
- final long replySeq = p.getSeqNum();
- synchronized (pendingList) {
- pendingList.add(p);
- }
-
- protocol.submitClientRequestAsync(r
- ).whenCompleteAsync((reply, exception) -> {
- if (exception != null) {
- // TODO: the exception may be from either raft or state machine.
- // Currently we skip all the following responses when getting an
- // exception from the state machine.
- responseObserver.onError(RaftGrpcUtil.wrapException(exception));
- } else {
- synchronized (pendingList) {
- Preconditions.assertTrue(!pendingList.isEmpty(),
- "PendingList is empty when handling onNext for seqNum %s", replySeq);
- final long headSeqNum = pendingList.get(0).getSeqNum();
- // stream seqNum is consecutive
- final PendingAppend pendingForReply = pendingList.get(
- (int) (replySeq - headSeqNum));
- Preconditions.assertTrue(pendingForReply != null &&
- pendingForReply.getSeqNum() == replySeq,
- "pending for reply is: %s, the pending list: %s",
- pendingForReply, pendingList);
- pendingForReply.setReply(reply);
-
- if (headSeqNum == replySeq) {
- Collection<PendingAppend> readySet = new ArrayList<>();
- // if this is head, we send back all the ready responses
- Iterator<PendingAppend> iter = pendingList.iterator();
- PendingAppend pending;
- while (iter.hasNext() && ((pending = iter.next()).isReady())) {
- readySet.add(pending);
- iter.remove();
- }
- sendReadyReplies(readySet);
- }
- }
- }
- });
+ slidingWindow.receivedRequest(p, this::processClientRequestAsync);
} catch (Throwable e) {
- LOG.info("{} got exception when handling client append request {}: {}",
- getId(), request.getRpcRequest(), e);
- responseObserver.onError(RaftGrpcUtil.wrapException(e));
+ responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
}
}
- private void sendReadyReplies(Collection<PendingAppend> readySet) {
- readySet.forEach(ready -> {
- Preconditions.assertTrue(ready.isReady());
+ private void sendReply(PendingAppend ready) {
+ Preconditions.assertTrue(ready.hasReply());
if (ready == COMPLETED) {
- responseObserver.onCompleted();
+ close();
} else {
+ LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
responseObserver.onNext(
- ClientProtoUtils.toRaftClientReplyProto(ready.reply));
+ ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
}
- });
}
@Override
public void onError(Throwable t) {
// for now we just log a msg
- LOG.warn("{} onError: client Append cancelled", getId(), t);
- synchronized (pendingList) {
- pendingList.clear();
- }
+ LOG.warn(name + ": onError", t);
+ slidingWindow.close();
}
@Override
public void onCompleted() {
- synchronized (pendingList) {
- if (pendingList.isEmpty()) {
- responseObserver.onCompleted();
- } else {
- pendingList.add(COMPLETED);
- }
+ if (slidingWindow.endOfRequests()) {
+ close();
}
}
+
+ private void close() {
+ LOG.debug("{}: close", name);
+ responseObserver.onCompleted();
+ slidingWindow.close();
+ }
+
+ void responseError(Throwable t, Supplier<String> message) {
+ t = JavaUtils.unwrapCompletionException(t);
+ LOG.warn(name + ": Failed " + message.get(), t);
+ responseObserver.onError(RaftGrpcUtil.wrapException(t));
+ slidingWindow.close();
+ }
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index b8bc636..e5f41b7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -140,10 +140,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
@Test
public void testBasicAppendEntriesAsync() throws Exception {
LOG.info("Running testBasicAppendEntriesAsync");
+ RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
cluster.start();
waitForLeader(cluster);
- RaftBasicTests.runTestBasicAppendEntries(true, 10, cluster, LOG);
+ RaftBasicTests.runTestBasicAppendEntries(true, 1000, cluster, LOG);
cluster.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index c8dfc0d..c55445a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -189,11 +189,6 @@ public interface RaftTestUtil {
}
}
- if (async) {
- Collections.sort(entries, Comparator
- .comparing(e -> e.getSmLogEntry().getData().toStringUtf8()));
- }
-
long logIndex = 0;
Assert.assertEquals(expectedMessages.length, entries.size());
for (int i = 0; i < expectedMessages.length; i++) {