You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/02/18 14:16:04 UTC
incubator-ratis git commit: RATIS-207. Implement stale read.
Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 4104860b9 -> 8fd74ede4
RATIS-207. Implement stale read. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8fd74ede
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8fd74ede
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8fd74ede
Branch: refs/heads/master
Commit: 8fd74ede411196a2b762b6ed741eccb30248e81c
Parents: 4104860
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Sun Feb 18 19:45:14 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Sun Feb 18 19:45:14 2018 +0530
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 6 ++
.../ratis/client/impl/ClientProtoUtils.java | 9 +-
.../ratis/client/impl/RaftClientImpl.java | 70 ++++++++++-----
.../ratis/protocol/RaftClientRequest.java | 43 ++++++++--
.../ratis/protocol/StaleReadException.java | 27 ++++++
.../ratis/protocol/StateMachineException.java | 4 +
.../test/java/org/apache/ratis/BaseTest.java | 55 ++++++++++--
.../ratis/grpc/client/AppendStreamer.java | 2 +-
ratis-proto-shaded/src/main/proto/Raft.proto | 11 ++-
.../ratis/server/impl/RaftServerImpl.java | 33 +++++++-
.../apache/ratis/statemachine/StateMachine.java | 13 +++
.../statemachine/impl/BaseStateMachine.java | 25 +++++-
.../java/org/apache/ratis/RaftAsyncTests.java | 89 +++++++++++++++++++-
.../org/apache/ratis/RaftExceptionBaseTest.java | 13 ++-
.../SimpleStateMachine4Testing.java | 6 +-
15 files changed, 355 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 89fb8f4..84fec9e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -51,6 +51,9 @@ public interface RaftClient extends Closeable {
/** Async call to send the given readonly message to the raft service. */
CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message);
+ /** Async call to send the given stale-read message to the given server (not the raft service). */
+ CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long minIndex, RaftPeerId server);
+
/**
* Send the given message to the raft service.
* The message may change the state of the service.
@@ -61,6 +64,9 @@ public interface RaftClient extends Closeable {
/** Send the given readonly message to the raft service. */
RaftClientReply sendReadOnly(Message message) throws IOException;
+ /** Send the given stale-read message to the given server (not the raft service). */
+ RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server) throws IOException;
+
/** Send set configuration request to the raft service. */
RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 9148633..de28e18 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -75,26 +75,27 @@ public interface ClientProtoUtils {
ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
request.getCallId(),
request.getSeqNum(),
- toMessage(p.getMessage()), p.getReadOnly());
+ p.getType(), toMessage(p.getMessage()), p.getMinIndex());
}
static RaftClientRequestProto toRaftClientRequestProto(
RaftClientRequest request) {
return RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(request))
+ .setType(request.getType())
.setMessage(toClientMessageEntryProtoBuilder(request.getMessage()))
- .setReadOnly(request.isReadOnly())
+ .setMinIndex(request.getMinIndex())
.build();
}
static RaftClientRequestProto toRaftClientRequestProto(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
- long seqNum, ByteString content, boolean readOnly) {
+ long seqNum, ByteString content) {
return RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(
clientId, serverId, groupId, callId, seqNum))
+ .setType(RaftClientRequestProto.Type.WRITE)
.setMessage(toClientMessageEntryProtoBuilder(content))
- .setReadOnly(readOnly)
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 6ee415d..6f3b8e0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.util.*;
import java.io.IOException;
@@ -36,6 +37,10 @@ import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Stream;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.READ;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.STALE_READ;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.WRITE;
+
/** A client who sends requests to a raft service. */
final class RaftClientImpl implements RaftClient {
private static final AtomicLong callIdCounter = new AtomicLong();
@@ -91,7 +96,9 @@ final class RaftClientImpl implements RaftClient {
private volatile RaftPeerId leaderId;
- private final SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> slidingWindow;
+ /** Map: id -> {@link SlidingWindow}, in order to support async calls to the RAFT service or individual servers. */
+ private final ConcurrentMap<String, SlidingWindow.Client<PendingAsyncRequest, RaftClientReply>>
+ slidingWindows = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler;
private final Semaphore asyncRequestSemaphore;
@@ -107,7 +114,6 @@ final class RaftClientImpl implements RaftClient {
asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
- slidingWindow = new SlidingWindow.Client<>(getId());
clientRpc.addServers(peers);
}
@@ -116,18 +122,32 @@ final class RaftClientImpl implements RaftClient {
return clientId;
}
+ private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftClientRequest request) {
+ return getSlidingWindow(request.isStaleRead()? request.getServerId(): null);
+ }
+
+ private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftPeerId target) {
+ final String id = target != null? target.toString(): "RAFT";
+ return slidingWindows.computeIfAbsent(id, key -> new SlidingWindow.Client<>(getId() + "->" + key));
+ }
+
@Override
public CompletableFuture<RaftClientReply> sendAsync(Message message) {
- return sendAsync(message, false);
+ return sendAsync(WRITE, message, 0L, null);
}
@Override
public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message) {
- return sendAsync(message, true);
+ return sendAsync(READ, message, 0L, null);
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long minIndex, RaftPeerId server) {
+ return sendAsync(STALE_READ, message, minIndex, server);
}
- private CompletableFuture<RaftClientReply> sendAsync(Message message,
- boolean readOnly) {
+ private CompletableFuture<RaftClientReply> sendAsync(
+ RaftClientRequestProto.Type type, Message message, long minIndex, RaftPeerId server) {
Objects.requireNonNull(message, "message == null");
try {
asyncRequestSemaphore.acquire();
@@ -137,29 +157,43 @@ final class RaftClientImpl implements RaftClient {
}
final long callId = nextCallId();
final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
- seq -> new RaftClientRequest(clientId, leaderId, groupId, callId, seq, message, readOnly));
- return slidingWindow.submitNewRequest(constructor, this::sendRequestWithRetryAsync
+ seq -> newRaftClientRequest(server, callId, seq, type, message, minIndex));
+ return getSlidingWindow(server).submitNewRequest(constructor, this::sendRequestWithRetryAsync
).getReplyFuture(
).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
).whenComplete((r, e) -> asyncRequestSemaphore.release());
}
+ private RaftClientRequest newRaftClientRequest(
+ RaftPeerId server, long callId, long seq,
+ RaftClientRequestProto.Type type, Message message, long minIndex) {
+ return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
+ callId, seq, type, message, minIndex);
+ }
+
@Override
public RaftClientReply send(Message message) throws IOException {
- return send(message, false);
+ return send(WRITE, message, 0L, null);
}
@Override
public RaftClientReply sendReadOnly(Message message) throws IOException {
- return send(message, true);
+ return send(READ, message, 0L, null);
+ }
+
+ @Override
+ public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server)
+ throws IOException {
+ return send(STALE_READ, message, minIndex, server);
}
- private RaftClientReply send(Message message, boolean readOnly) throws IOException {
+ private RaftClientReply send(RaftClientRequestProto.Type type, Message message, long minIndex, RaftPeerId server)
+ throws IOException {
Objects.requireNonNull(message, "message == null");
final long callId = nextCallId();
- return sendRequestWithRetry(() -> new RaftClientRequest(
- clientId, leaderId, groupId, callId, 0L, message, readOnly));
+ return sendRequestWithRetry(() -> newRaftClientRequest(
+ server, callId, 0L, type, message, minIndex));
}
@Override
@@ -207,7 +241,7 @@ final class RaftClientImpl implements RaftClient {
return sendRequestAsync(request).thenCompose(reply -> {
if (reply == null) {
final TimeUnit unit = retryInterval.getUnit();
- scheduler.schedule(() -> slidingWindow.retry(pending, this::sendRequestWithRetryAsync),
+ scheduler.schedule(() -> getSlidingWindow(request).retry(pending, this::sendRequestWithRetryAsync),
retryInterval.toLong(unit), unit);
} else {
f.complete(reply);
@@ -244,15 +278,13 @@ final class RaftClientImpl implements RaftClient {
LOG.debug("{}: receive* {}", clientId, reply);
reply = handleNotLeaderException(request, reply);
if (reply != null) {
- slidingWindow.receiveReply(
+ getSlidingWindow(request).receiveReply(
request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
}
return reply;
}).exceptionally(e -> {
LOG.debug("{}: Failed {} with {}", clientId, request, e);
- if (e instanceof CompletionException) {
- e = e.getCause();
- }
+ e = JavaUtils.unwrapCompletionException(e);
if (e instanceof GroupMismatchException) {
throw new CompletionException(e);
} else if (e instanceof IOException) {
@@ -328,7 +360,7 @@ final class RaftClientImpl implements RaftClient {
LOG.trace("Stack trace", new Throwable("TRACE"));
}
- slidingWindow.resetFirstSeqNum();
+ getSlidingWindow(request).resetFirstSeqNum();
if (ioe instanceof LeaderNotReadyException) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index c924ef8..d20b158 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -17,6 +17,13 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.util.Preconditions;
+
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.READ;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.STALE_READ;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.WRITE;
+
/**
* Request from client to server
*/
@@ -24,26 +31,32 @@ public class RaftClientRequest extends RaftClientMessage {
private final long callId;
private final long seqNum;
+ private final RaftClientRequestProto.Type type;
private final Message message;
- private final boolean readOnly;
+
+ private final long minIndex;
public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId, Message message) {
- this(clientId, serverId, groupId, callId, 0L, message, false);
+ this(clientId, serverId, groupId, callId, 0L, WRITE, message, 0L);
}
public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId, long seqNum, Message message) {
- this(clientId, serverId, groupId, callId, seqNum, message, false);
+ this(clientId, serverId, groupId, callId, seqNum, WRITE, message, 0L);
}
- public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId, long seqNum, Message message, boolean readOnly) {
+ public RaftClientRequest(
+ ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+ long callId, long seqNum, RaftClientRequestProto.Type type, Message message, long minIndex) {
super(clientId, serverId, groupId);
this.callId = callId;
this.seqNum = seqNum;
+ this.type = type;
this.message = message;
- this.readOnly = readOnly;
+ this.minIndex = minIndex;
+
+ Preconditions.assertTrue(minIndex >= 0, "minIndex < 0");
}
@Override
@@ -63,13 +76,27 @@ public class RaftClientRequest extends RaftClientMessage {
return message;
}
+ public RaftClientRequestProto.Type getType() {
+ return type;
+ }
+
public boolean isReadOnly() {
- return readOnly;
+ return getType() != WRITE;
+ }
+
+ public boolean isStaleRead() {
+ return getType() == STALE_READ;
+ }
+
+ /** @return the minimum required commit index for processing the request. */
+ public long getMinIndex() {
+ return minIndex;
}
@Override
public String toString() {
return super.toString() + ", cid=" + callId + ", seq=" + seqNum + " "
- + (isReadOnly()? "RO": "RW") + ", " + getMessage();
+ + (!isReadOnly()? "RW": isStaleRead()? "StaleRead(" + getMinIndex() + ")": "RO")
+ + ", " + getMessage();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-common/src/main/java/org/apache/ratis/protocol/StaleReadException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StaleReadException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StaleReadException.java
new file mode 100644
index 0000000..0240479
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StaleReadException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * This exception indicates the failure of a stale-read.
+ */
+public class StaleReadException extends RaftException {
+ public StaleReadException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index 49a64ef..56f1160 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -25,4 +25,8 @@ public class StateMachineException extends RaftException {
public StateMachineException(String msg) {
super(msg);
}
+
+ public StateMachineException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 54aad77..e487841 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -19,10 +19,7 @@ package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.conf.ConfUtils;
-import org.apache.ratis.util.CheckedRunnable;
-import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.*;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TestName;
@@ -32,6 +29,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
@@ -78,20 +77,58 @@ public abstract class BaseTest {
return new File(getClassTestDir(), testName.getMethodName());
}
+ public static void assertThrowable(
+ String description, Throwable t,
+ Class<? extends Throwable> exceptedThrowableClass, Logger log,
+ Class<? extends Throwable>... exceptedCauseClasses) {
+ if (log != null) {
+ log.info("The test \"" + description + "\" throws " + t.getClass().getSimpleName(), t);
+ }
+ Assert.assertEquals(exceptedThrowableClass, t.getClass());
+
+ for (Class<? extends Throwable> expectedCause : exceptedCauseClasses) {
+ final Throwable previous = t;
+ t = Objects.requireNonNull(previous.getCause(),
+ () -> "previous.getCause() == null for previous=" + previous);
+ Assert.assertEquals(expectedCause, t.getClass());
+ }
+ }
+
public static void testFailureCase(
String description, CheckedRunnable<?> testCode,
- Class<? extends Throwable> exceptedThrowableClass, Logger log) {
+ Class<? extends Throwable> exceptedThrowableClass, Logger log,
+ Class<? extends Throwable>... exceptedCauseClasses) {
try {
testCode.run();
Assert.fail("The test \"" + description + "\" does not throw anything.");
} catch (Throwable t) {
- log.info("The test \"" + description + "\" throws " + t.getClass().getSimpleName(), t);
- Assert.assertEquals(exceptedThrowableClass, t.getClass());
+ assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
}
}
public void testFailureCase(
- String description, CheckedRunnable<?> testCode, Class<? extends Throwable> exceptedThrowableClass) {
- testFailureCase(description, testCode, exceptedThrowableClass, LOG);
+ String description, CheckedRunnable<?> testCode,
+ Class<? extends Throwable> exceptedThrowableClass,
+ Class<? extends Throwable>... exceptedCauseClasses) {
+ testFailureCase(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses);
+ }
+
+ public static void testFailureCaseAsync(
+ String description, Supplier<CompletableFuture<?>> testCode,
+ Class<? extends Throwable> exceptedThrowableClass, Logger log,
+ Class<? extends Throwable>... exceptedCauseClasses) {
+ try {
+ testCode.get().join();
+ Assert.fail("The test \"" + description + "\" does not throw anything.");
+ } catch (Throwable t) {
+ t = JavaUtils.unwrapCompletionException(t);
+ assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
+ }
+ }
+
+ public void testFailureCaseAsync(
+ String description, Supplier<CompletableFuture<?>> testCode, Class<? extends Throwable> exceptedThrowableClass,
+ Class<? extends Throwable>... exceptedCauseClasses) {
+ testFailureCaseAsync(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 08e8376..ff3ed28 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -157,7 +157,7 @@ public class AppendStreamer implements Closeable {
if (isRunning()) {
// wrap the current buffer into a RaftClientRequestProto
final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto(
- clientId, leaderId, groupId, seqNum, seqNum, content, false);
+ clientId, leaderId, groupId, seqNum, seqNum, content);
if (request.getSerializedSize() > maxMessageSize.getSizeInt()) {
throw new IOException("msg size:" + request.getSerializedSize() +
" exceeds maximum:" + maxMessageSize.getSizeInt());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index fa823ab..d5f6d97 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -172,9 +172,16 @@ message ClientMessageEntryProto {
// normal client request
message RaftClientRequestProto {
+ enum Type {
+ WRITE = 0;
+ READ = 1;
+ STALE_READ = 2;
+ }
+
RaftRpcRequestProto rpcRequest = 1;
- ClientMessageEntryProto message = 2;
- bool readOnly = 3;
+ Type type = 2;
+ ClientMessageEntryProto message = 3;
+ uint64 minIndex = 4;
}
message NotLeaderExceptionProto {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 37cd16f..441e390 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -474,7 +474,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
+ assertLifeCycleState(RUNNING);
LOG.debug("{}: receive client request({})", getId(), request);
+ if (request.isStaleRead()) {
+ return staleReadAsync(request);
+ }
+
// first check the server's leader state
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
if (reply != null) {
@@ -486,8 +491,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
if (request.isReadOnly()) {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
- return stateMachine.query(request.getMessage())
- .thenApply(r -> new RaftClientReply(request, r, getCommitInfos()));
+ return processQueryFuture(stateMachine.query(request.getMessage()), request);
}
// query the retry cache
@@ -513,6 +517,31 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return appendTransaction(request, context, cacheEntry);
}
+ private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) {
+ final long minIndex = request.getMinIndex();
+ final long commitIndex = state.getLog().getLastCommittedIndex();
+ LOG.debug("{}: minIndex={}, commitIndex={}", getId(), minIndex, commitIndex);
+ if (commitIndex < minIndex) {
+ final StaleReadException e = new StaleReadException(
+ "Unable to serve stale-read due to server commit index = " + commitIndex + " < min = " + minIndex);
+ return CompletableFuture.completedFuture(
+ new RaftClientReply(request, new StateMachineException(getId(), e), getCommitInfos()));
+ }
+ return processQueryFuture(getStateMachine().queryStale(request.getMessage(), minIndex), request);
+ }
+
+ CompletableFuture<RaftClientReply> processQueryFuture(
+ CompletableFuture<Message> queryFuture, RaftClientRequest request) {
+ return queryFuture.thenApply(r -> new RaftClientReply(request, r, getCommitInfos()))
+ .exceptionally(e -> {
+ e = JavaUtils.unwrapCompletionException(e);
+ if (e instanceof StateMachineException) {
+ return new RaftClientReply(request, (StateMachineException)e, getCommitInfos());
+ }
+ throw new CompletionException(e);
+ });
+ }
+
@Override
public RaftClientReply submitClientRequest(RaftClientRequest request)
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index e2aaf29..7d35796 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -119,6 +119,19 @@ public interface StateMachine extends Closeable {
CompletableFuture<Message> query(Message request);
/**
+ * Query the state machine, provided minIndex <= commit index.
+ * The request must be read-only.
+ * Since the commit index of this server may lag behind the Raft service,
+ * the returned result may possibly be stale.
+ *
+ * When minIndex > {@link #getLastAppliedTermIndex()},
+ * the state machine may choose to either
+ * (1) return exceptionally, or
+ * (2) wait until minIndex <= {@link #getLastAppliedTermIndex()} before running the query.
+ */
+ CompletableFuture<Message> queryStale(Message request, long minIndex);
+
+ /**
* Validate/pre-process the incoming update request in the state machine.
* @return the content to be written to the log entry. Null means the request
* should be rejected.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 201eff7..b87143f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -36,6 +36,8 @@ import org.apache.ratis.util.Preconditions;
import java.io.IOException;
import java.util.Collection;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -51,6 +53,8 @@ public class BaseStateMachine implements StateMachine {
private final AtomicReference<TermIndex> lastAppliedTermIndex = new AtomicReference<>();
+ private final SortedMap<Long, CompletableFuture<Void>> transactionFutures = new TreeMap<>();
+
public RaftPeerId getId() {
return id;
}
@@ -131,6 +135,12 @@ public class BaseStateMachine implements StateMachine {
}
return true;
}
+
+ synchronized (transactionFutures) {
+ for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= index; ) {
+ transactionFutures.remove(i).complete(null);
+ }
+ }
return false;
}
@@ -158,8 +168,21 @@ public class BaseStateMachine implements StateMachine {
}
@Override
+ public CompletableFuture<Message> queryStale(Message request, long minIndex) {
+ if (getLastAppliedTermIndex().getIndex() < minIndex) {
+ synchronized (transactionFutures) {
+ if (getLastAppliedTermIndex().getIndex() < minIndex) {
+ return transactionFutures.computeIfAbsent(minIndex, key -> new CompletableFuture<>())
+ .thenCompose(v -> query(request));
+ }
+ }
+ }
+ return query(request);
+ }
+
+ @Override
public CompletableFuture<Message> query(Message request) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index f72ea5a..a1835c0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -22,15 +22,24 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.*;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.junit.*;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +57,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
public static final int NUM_SERVERS = 3;
@Before
- public void setup() throws IOException {
+ public void setup() {
properties = new RaftProperties();
properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
@@ -158,4 +167,80 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
cluster.shutdown();
}
+
+ @Test
+ public void testStaleReadAsync() throws Exception {
+ final int numMesssages = 10;
+ final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+
+ try (RaftClient client = cluster.createClient()) {
+ cluster.start();
+ RaftTestUtil.waitForLeader(cluster);
+
+ // submit some messages
+ final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
+ for (int i = 0; i < numMesssages; i++) {
+ final String s = "m" + i;
+ LOG.info("sendAsync " + s);
+ futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s)));
+ }
+ Assert.assertEquals(numMesssages, futures.size());
+ RaftClientReply lastWriteReply = null;
+ for (CompletableFuture<RaftClientReply> f : futures) {
+ lastWriteReply = f.join();
+ Assert.assertTrue(lastWriteReply.isSuccess());
+ }
+ futures.clear();
+
+ // Use a follower with the max commit index
+ final RaftPeerId leader = lastWriteReply.getServerId();
+ LOG.info("leader = " + leader);
+ final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos();
+ LOG.info("commitInfos = " + commitInfos);
+ final CommitInfoProto followerCommitInfo = commitInfos.stream()
+ .filter(info -> !RaftPeerId.valueOf(info.getServer().getId()).equals(leader))
+ .max(Comparator.comparing(CommitInfoProto::getCommitIndex)).get();
+ final RaftPeerId follower = RaftPeerId.valueOf(followerCommitInfo.getServer().getId());
+ LOG.info("max follower = " + follower);
+
+ // test a failure case
+ testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
+ () -> client.sendStaleReadAsync(
+ new RaftTestUtil.SimpleMessage("" + (numMesssages + 1)),
+ followerCommitInfo.getCommitIndex(), follower),
+ StateMachineException.class, IndexOutOfBoundsException.class);
+
+ // test sendStaleReadAsync
+ for (int i = 1; i < followerCommitInfo.getCommitIndex(); i++) {
+ final int query = i;
+ LOG.info("sendStaleReadAsync, query=" + query);
+ final Message message = new RaftTestUtil.SimpleMessage("" + query);
+ final CompletableFuture<RaftClientReply> readFuture = client.sendReadOnlyAsync(message);
+ final CompletableFuture<RaftClientReply> staleReadFuture = client.sendStaleReadAsync(
+ message, followerCommitInfo.getCommitIndex(), follower);
+
+ futures.add(readFuture.thenApply(r -> getMessageContent(r))
+ .thenCombine(staleReadFuture.thenApply(r -> getMessageContent(r)), (expected, computed) -> {
+ try {
+ LOG.info("query " + query + " returns "
+ + LogEntryProto.parseFrom(expected).getSmLogEntry().getData().toStringUtf8());
+ } catch (InvalidProtocolBufferException e) {
+ throw new CompletionException(e);
+ }
+
+ Assert.assertEquals("log entry mismatch for query=" + query, expected, computed);
+ return null;
+ })
+ );
+ }
+ JavaUtils.allOf(futures).join();
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ static ByteString getMessageContent(RaftClientReply reply) {
+ Assert.assertTrue(reply.isSuccess());
+ return reply.getMessage().getContent();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index f90012e..4562cb8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -45,7 +45,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
- public static final int NUM_PEERS = 5;
+ public static final int NUM_PEERS = 3;
private CLUSTER cluster;
@@ -191,4 +191,15 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
GroupMismatchException.class);
}
}
+
+ @Test
+ public void testStaleReadException() throws Exception {
+ RaftTestUtil.waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient()) {
+ final RaftPeerId follower = cluster.getFollowers().iterator().next().getId();
+ testFailureCase("sendStaleRead(..) with a large commit index",
+ () -> client.sendStaleRead(Message.EMPTY, 1_000_000_000L, follower),
+ StateMachineException.class, StaleReadException.class);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 261b9ef..4bf75e1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -23,6 +23,7 @@ import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
@@ -227,11 +228,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex()
: Long.parseLong(bytes.toStringUtf8());
LOG.info("query log index " + index);
- final LogEntryProto entry = list.get(Math.toIntExact(index));
+ final LogEntryProto entry = list.get(Math.toIntExact(index - 1));
return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
} catch (Exception e) {
LOG.warn("Failed request " + request, e);
- return JavaUtils.completeExceptionally(e);
+ return JavaUtils.completeExceptionally(new StateMachineException(
+ "Failed request " + request, e));
}
}