You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/09/06 08:20:57 UTC
[ratis] branch master updated: RATIS-1696. Support linearizable read-only requests in leader (#735)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 117b33311 RATIS-1696. Support linearizable read-only requests in leader (#735)
117b33311 is described below
commit 117b333114daf37e3d7a31919e49f8fdc0d9e201
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Sep 6 16:20:51 2022 +0800
RATIS-1696. Support linearizable read-only requests in leader (#735)
---
.../ratis/protocol/exceptions/ReadException.java | 27 +++
.../apache/ratis/grpc/server/GrpcLogAppender.java | 2 +-
.../apache/ratis/server/RaftServerConfigKeys.java | 32 +++
.../apache/ratis/server/leader/LeaderState.java | 2 +-
.../apache/ratis/server/impl/LeaderStateImpl.java | 7 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 48 +++-
.../org/apache/ratis/server/impl/ReadRequests.java | 109 +++++++--
.../org/apache/ratis/server/impl/ServerState.java | 6 +-
.../ratis/server/impl/StateMachineUpdater.java | 16 +-
.../ratis/server/leader/LogAppenderDefault.java | 2 +-
.../org/apache/ratis/ReadOnlyRequestTests.java | 261 +++++++++++++++++++++
.../ratis/grpc/TestReadOnlyRequestsWithGrpc.java | 25 ++
12 files changed, 503 insertions(+), 34 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
new file mode 100644
index 000000000..32828418d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+/**
+ * This exception indicates the failure of a read request.
+ */
+public class ReadException extends RaftException {
+ public ReadException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index f83656cb0..5bb035bde 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -382,7 +382,7 @@ public class GrpcLogAppender extends LogAppenderBase {
default:
throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
}
- getLeaderState().onAppendEntriesReply(getFollower(), reply);
+ getLeaderState().onAppendEntriesReply(GrpcLogAppender.this, reply);
notifyLogAppender();
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e24631378..625de2e1a 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -157,6 +157,38 @@ public interface RaftServerConfigKeys {
}
}
+ interface Read {
+ String PREFIX = RaftServerConfigKeys.PREFIX
+ + "." + JavaUtils.getClassSimpleName(Read.class).toLowerCase();
+
+ String TIMEOUT_KEY = PREFIX + ".timeout";
+ TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+ static TimeDuration timeout(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()),
+ TIMEOUT_KEY, TIMEOUT_DEFAULT, getDefaultLog(), requirePositive());
+ }
+ static void setTimeout(RaftProperties properties, TimeDuration readOnlyTimeout) {
+ setTimeDuration(properties::setTimeDuration, TIMEOUT_KEY, readOnlyTimeout);
+ }
+
+ enum Option {
+ /** Directly query statemachine. Efficient but may undermine linearizability */
+ DEFAULT,
+
+ /** Use ReadIndex (see Raft Paper section 6.4). Maintains linearizability */
+ LINEARIZABLE
+ }
+
+ String OPTION_KEY = ".option";
+ Option OPTION_DEFAULT = Option.DEFAULT;
+ static Option option(RaftProperties properties) {
+ return get(properties::getEnum, OPTION_KEY, OPTION_DEFAULT, getDefaultLog());
+ }
+ static void setOption(RaftProperties properties, Option option) {
+ set(properties::setEnum, OPTION_KEY, option);
+ }
+ }
+
interface Write {
String PREFIX = RaftServerConfigKeys.PREFIX + ".write";
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index 32c7d6ce0..e906dd209 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -64,6 +64,6 @@ public interface LeaderState {
boolean isFollowerBootstrapping(FollowerInfo follower);
/** Received an {@link AppendEntriesReplyProto} */
- void onAppendEntriesReply(FollowerInfo follower, AppendEntriesReplyProto reply);
+ void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3cb83b1a8..65c586530 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -1087,7 +1087,7 @@ class LeaderStateImpl implements LeaderState {
final MemoizedSupplier<AppendEntriesListener> supplier = MemoizedSupplier.valueOf(
() -> new AppendEntriesListener(readIndex));
- final AppendEntriesListener listener = server.getState().getReadRequests().addAppendEntriesListener(
+ final AppendEntriesListener listener = server.getReadRequests().addAppendEntriesListener(
readIndex, key -> supplier.get());
// the readIndex is already acknowledged before
@@ -1097,7 +1097,6 @@ class LeaderStateImpl implements LeaderState {
if (supplier.isInitialized()) {
senders.forEach(sender -> {
- listener.init(sender);
try {
sender.triggerHeartbeat();
} catch (IOException e) {
@@ -1110,8 +1109,8 @@ class LeaderStateImpl implements LeaderState {
}
@Override
- public void onAppendEntriesReply(FollowerInfo follower, RaftProtos.AppendEntriesReplyProto reply) {
- server.getState().getReadRequests().onAppendEntriesReply(reply, this::hasMajority);
+ public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesReplyProto reply) {
+ server.getReadRequests().onAppendEntriesReply(appender, reply, this::hasMajority);
}
void replyPendingRequest(long logIndex, RaftClientReply reply) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 258e94d9f..edea92ced 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
@@ -163,6 +164,7 @@ class RaftServerImpl implements RaftServer.Division,
private final RoleInfo role;
private final DataStreamMap dataStreamMap;
+ private final RaftServerConfigKeys.Read.Option readOption;
private final MemoizedSupplier<RaftClient> raftClient;
@@ -206,6 +208,7 @@ class RaftServerImpl implements RaftServer.Division,
this.state = new ServerState(id, group, stateMachine, this, option, properties);
this.retryCache = new RetryCacheImpl(properties);
this.dataStreamMap = new DataStreamMapImpl(id);
+ this.readOption = RaftServerConfigKeys.Read.option(properties);
this.jmxAdapter = new RaftServerJmxAdapter();
this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(
@@ -845,7 +848,7 @@ class RaftServerImpl implements RaftServer.Division,
if (type.is(TypeCase.READ)) {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
- replyFuture = processQueryFuture(stateMachine.query(request.getMessage()), request);
+ replyFuture = readAsync(request);
} else if (type.is(TypeCase.WATCH)) {
replyFuture = watchAsync(request);
} else if (type.is(TypeCase.MESSAGESTREAM)) {
@@ -907,6 +910,45 @@ class RaftServerImpl implements RaftServer.Division,
return processQueryFuture(stateMachine.queryStale(request.getMessage(), minIndex), request);
}
+ ReadRequests getReadRequests() {
+ return getState().getReadRequests();
+ }
+
+ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
+ if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+ /*
+ Linearizable read using ReadIndex. See Raft paper section 6.4.
+ 1. First obtain readIndex from Leader.
+ 2. Then waits for statemachine to advance at least as far as readIndex.
+ 3. Finally, query the statemachine and return the result.
+ */
+ final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+ // TODO support follower linearizable read
+ if (leader == null) {
+ return JavaUtils.completeExceptionally(generateNotLeaderException());
+ }
+ return leader.getReadIndex()
+ .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+ .thenCompose(readIndex -> queryStateMachine(request))
+ .exceptionally(e -> readException2Reply(request, e));
+ } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
+ return queryStateMachine(request);
+ } else {
+ throw new IllegalStateException("Unexpected read option: " + readOption);
+ }
+ }
+
+ private RaftClientReply readException2Reply(RaftClientRequest request, Throwable e) {
+ e = JavaUtils.unwrapCompletionException(e);
+ if (e instanceof StateMachineException ) {
+ return newExceptionReply(request, (StateMachineException) e);
+ } else if (e instanceof ReadException) {
+ return newExceptionReply(request, (ReadException) e);
+ } else {
+ throw new CompletionException(e);
+ }
+ }
+
private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
return role.getLeaderState()
.map(ls -> ls.streamAsync(request))
@@ -920,6 +962,10 @@ class RaftServerImpl implements RaftServer.Division,
.orElse(null);
}
+ CompletableFuture<RaftClientReply> queryStateMachine(RaftClientRequest request) {
+ return processQueryFuture(stateMachine.query(request.getMessage()), request);
+ }
+
CompletableFuture<RaftClientReply> processQueryFuture(
CompletableFuture<Message> queryFuture, RaftClientRequest request) {
return queryFuture.thenApply(r -> newReplyBuilder(request).setSuccess().setMessage(r).build())
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
index 76f974a03..bbd2a92ba 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -17,21 +17,31 @@
*/
package org.apache.ratis.server.impl;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIndex;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -90,21 +100,19 @@ class ReadRequests {
this.commitIndex = commitIndex;
}
- void init(LogAppender appender) {
- replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
- }
-
CompletableFuture<Long> getFuture() {
return future;
}
- boolean receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+ boolean receive(LogAppender logAppender, AppendEntriesReplyProto proto,
+ Predicate<Predicate<RaftPeerId>> hasMajority) {
if (isCompletedNormally()) {
return true;
}
- final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
- final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
- if (!reply.receive(proto)) {
+
+ final HeartbeatAck reply = replies.computeIfAbsent(
+ logAppender.getFollowerId(), key -> new HeartbeatAck(logAppender));
+ if (reply.receive(proto)) {
if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
future.complete(commitIndex);
return true;
@@ -126,31 +134,80 @@ class ReadRequests {
return sorted.computeIfAbsent(commitIndex, constructor);
}
- synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply,
+ synchronized void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply,
Predicate<Predicate<RaftPeerId>> hasMajority) {
final long callId = reply.getServerReply().getCallId();
- for (;;) {
- final Map.Entry<Long, AppendEntriesListener> first = sorted.firstEntry();
- if (first == null || first.getKey() > callId) {
+
+ Iterator<Map.Entry<Long, AppendEntriesListener>> iterator = sorted.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, AppendEntriesListener> entry = iterator.next();
+ if (entry.getKey() > callId) {
return;
}
- final AppendEntriesListener listener = first.getValue();
+ final AppendEntriesListener listener = entry.getValue();
if (listener == null) {
continue;
}
- if (listener.receive(reply, hasMajority)) {
- final AppendEntriesListener removed = sorted.remove(callId);
- Preconditions.assertSame(listener, removed, "AppendEntriesListener");
+ if (listener.receive(appender, reply, hasMajority)) {
ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("{}: {}", ReadRequests.this, s));
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ static class ReadIndexQueue {
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+ private final NavigableMap<Long, CompletableFuture<Long>> sorted = new ConcurrentSkipListMap<>();
+ private final TimeDuration readTimeout;
+
+ ReadIndexQueue(TimeDuration readTimeout) {
+ this.readTimeout = readTimeout;
+ }
+
+ CompletableFuture<Long> add(long readIndex) {
+ final MemoizedSupplier<CompletableFuture<Long>> supplier = MemoizedSupplier.valueOf(CompletableFuture::new);
+ final CompletableFuture<Long> f = sorted.computeIfAbsent(readIndex, i -> supplier.get());
+
+ if (supplier.isInitialized()) {
+ scheduler.onTimeout(readTimeout, () -> handleTimeout(readIndex),
+ LOG, () -> "Failed to handle read timeout for index " + readIndex);
+ }
+ return f;
+ }
+
+ private void handleTimeout(long readIndex) {
+ Optional.ofNullable(sorted.remove(readIndex)).ifPresent(consumer -> {
+ consumer.completeExceptionally(
+ new ReadException(new TimeoutIOException("Read timeout for index " + readIndex)));
+ });
+ }
+
+ void complete(Long appliedIndex) {
+ for(;;) {
+ if (sorted.isEmpty()) {
+ return;
+ }
+ final Long first = sorted.firstKey();
+ if (first == null || first > appliedIndex) {
+ return;
}
+ Optional.ofNullable(sorted.remove(first)).ifPresent(f -> f.complete(appliedIndex));
}
}
}
private final AppendEntriesListeners appendEntriesListeners = new AppendEntriesListeners();
private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+ private final ReadIndexQueue readIndexQueue;
+ private final StateMachine stateMachine;
+
+ ReadRequests(RaftProperties properties, StateMachine stateMachine) {
+ this.readIndexQueue = new ReadIndexQueue(RaftServerConfigKeys.Read.timeout(properties));
+ this.stateMachine = stateMachine;
+ }
AppendEntriesListener addAppendEntriesListener(long commitIndex,
Function<Long, AppendEntriesListener> constructor) {
@@ -160,7 +217,19 @@ class ReadRequests {
return appendEntriesListeners.add(commitIndex, constructor);
}
- void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {
- appendEntriesListeners.onAppendEntriesReply(reply, hasMajority);
+ void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply,
+ Predicate<Predicate<RaftPeerId>> hasMajority) {
+ appendEntriesListeners.onAppendEntriesReply(appender, reply, hasMajority);
+ }
+
+ Consumer<Long> getAppliedIndexConsumer() {
+ return readIndexQueue::complete;
+ }
+
+ CompletableFuture<Long> waitToAdvance(long readIndex) {
+ if (stateMachine.getLastAppliedTermIndex().getIndex() >= readIndex) {
+ return CompletableFuture.completedFuture(readIndex);
+ }
+ return readIndexQueue.add(readIndex);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index d62e8f50e..d90cddf01 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -128,10 +128,10 @@ class ServerState {
.filter(i -> i >= 0)
.orElse(RaftLog.INVALID_LOG_INDEX);
this.log = JavaUtils.memoize(() -> initRaftLog(getSnapshotIndexFromStateMachine, prop));
+ this.readRequests = new ReadRequests(prop, stateMachine);
this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
- stateMachine, server, this, getLog().getSnapshotIndex(), prop));
-
- this.readRequests = new ReadRequests();
+ stateMachine, server, this, getLog().getSnapshotIndex(), prop,
+ this.readRequests.getAppliedIndexConsumer()));
}
void initialize(StateMachine stateMachine) throws IOException {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index bd989a389..9f1a8ca5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -88,9 +88,12 @@ class StateMachineUpdater implements Runnable {
private final MemoizedSupplier<StateMachineMetrics> stateMachineMetrics;
+ private final Consumer<Long> appliedIndexConsumer;
+
StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
- ServerState serverState, long lastAppliedIndex, RaftProperties properties) {
+ ServerState serverState, long lastAppliedIndex, RaftProperties properties, Consumer<Long> appliedIndexConsumer) {
this.name = serverState.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
+ this.appliedIndexConsumer = appliedIndexConsumer;
this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
@@ -122,6 +125,7 @@ class StateMachineUpdater implements Runnable {
//wait for RaftServerImpl and ServerState constructors to complete
stateMachineMetrics.get();
updater.start();
+ notifyAppliedIndex(appliedIndex.get());
}
private void stop() {
@@ -218,6 +222,7 @@ class StateMachineUpdater implements Runnable {
final long i = snapshot.getIndex();
snapshotIndex.setUnconditionally(i, infoIndexChange);
appliedIndex.setUnconditionally(i, infoIndexChange);
+ notifyAppliedIndex(i);
state = State.RUNNING;
}
@@ -235,11 +240,12 @@ class StateMachineUpdater implements Runnable {
}
final CompletableFuture<Message> f = server.applyLogToStateMachine(next);
+ final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
+ Preconditions.assertTrue(incremented == nextIndex);
if (f != null) {
futures.get().add(f);
+ f.thenAccept(m -> notifyAppliedIndex(incremented));
}
- final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
- Preconditions.assertTrue(incremented == nextIndex);
} else {
LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
this, nextIndex, state);
@@ -320,6 +326,10 @@ class StateMachineUpdater implements Runnable {
return appliedIndex.get();
}
+ private void notifyAppliedIndex(long index) {
+ appliedIndexConsumer.accept(index);
+ }
+
long getStateMachineLastAppliedIndex() {
return Optional.ofNullable(stateMachine.getLastAppliedTermIndex())
.map(TermIndex::getIndex)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index ca654d1de..8f71f91fc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -193,7 +193,7 @@ class LogAppenderDefault extends LogAppenderBase {
break;
default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
}
- getLeaderState().onAppendEntriesReply(getFollower(), reply);
+ getLeaderState().onAppendEntriesReply(this, reply);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
new file mode 100644
index 000000000..5c7383000
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+ {
+ Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+ }
+
+ static final int NUM_SERVERS = 3;
+
+ static final String INCREMENT = "INCREMENT";
+ static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
+ static final String TIMEOUT_INCREMENT = "TIMEOUT_INCREMENT";
+ static final String QUERY = "QUERY";
+ final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
+ final Message waitAndIncrementMessage = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
+ final Message timeoutIncrement = new RaftTestUtil.SimpleMessage(TIMEOUT_INCREMENT);
+ final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
+
+ @Before
+ public void setup() {
+ final RaftProperties p = getProperties();
+ p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ CounterStateMachine.class, StateMachine.class);
+
+ p.setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
+ p.setTimeDuration(RaftServerConfigKeys.Read.TIMEOUT_KEY, TimeDuration.valueOf(1, TimeUnit.SECONDS));
+ p.setTimeDuration(RaftServerConfigKeys.Rpc.FIRST_ELECTION_TIMEOUT_MIN_KEY,
+ TimeDuration.valueOf(150, TimeUnit.MILLISECONDS));
+ p.setTimeDuration(RaftServerConfigKeys.Rpc.FIRST_ELECTION_TIMEOUT_MAX_KEY,
+ TimeDuration.valueOf(300, TimeUnit.MILLISECONDS));
+ p.setTimeDuration(RaftServerConfigKeys.Rpc.TIMEOUT_MIN_KEY, TimeDuration.valueOf(3, TimeUnit.SECONDS));
+ p.setTimeDuration(RaftServerConfigKeys.Rpc.TIMEOUT_MAX_KEY, TimeDuration.valueOf(6, TimeUnit.SECONDS));
+ p.setTimeDuration(RaftServerConfigKeys.Rpc.REQUEST_TIMEOUT_KEY,
+ TimeDuration.valueOf(10, TimeUnit.SECONDS));
+
+ p.setTimeDuration(RaftClientConfigKeys.Rpc.REQUEST_TIMEOUT_KEY,
+ TimeDuration.valueOf(10, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testLinearizableRead() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::testLinearizableReadImpl);
+ }
+
+ private void testLinearizableReadImpl(CLUSTER cluster) throws Exception {
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ for (int i = 1; i <= 10; i++) {
+ RaftClientReply reply = client.io().send(incrementMessage);
+ Assert.assertTrue(reply.isSuccess());
+ reply = client.io().sendReadOnly(queryMessage);
+ Assert.assertEquals(retrieve(reply), i);
+ }
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testLinearizableReadParallel() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::testLinearizableReadParallelImpl);
+ }
+
+ private void testLinearizableReadParallelImpl(CLUSTER cluster) throws Exception {
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try (RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
+
+ RaftClientReply reply = client.io().send(incrementMessage);
+ Assert.assertTrue(reply.isSuccess());
+ Semaphore canRead = new Semaphore(0);
+
+ // this future will complete after 500 ms
+ Thread thread = new Thread(() -> {
+ try {
+ RaftClientReply staleValueBefore = client.io()
+ .sendStaleRead(queryMessage, 0, leaderId);
+
+ Assert.assertEquals(retrieve(staleValueBefore), 1);
+
+ canRead.acquire();
+ // we still have to sleep for a while to guarantee that the async write arrives at RaftServer
+ Thread.sleep(100);
+ // send a linearizable read request
+ // linearizable read will wait the statemachine to advance
+ RaftClientReply linearizableReadValue = client.io()
+ .sendReadOnly(queryMessage);
+ Assert.assertEquals(retrieve(linearizableReadValue), 2);
+ }
+ catch (Exception ignored) {}
+ });
+
+ thread.start();
+ CompletableFuture<RaftClientReply> result = client.async().send(waitAndIncrementMessage);
+ canRead.release();
+ thread.join();
+
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testLinearizableReadTimeout() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::testLinearizableReadTimeoutImpl);
+ }
+
+ private void testLinearizableReadTimeoutImpl(CLUSTER cluster) throws Exception {
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+
+ Semaphore canRead = new Semaphore(0);
+
+ Thread thread = new Thread(() -> {
+ try (RaftClient noRetryClient = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
+ canRead.acquire();
+ // we still have to sleep for a while to guarantee that the async write arrives at RaftServer
+ Thread.sleep(100);
+ RaftClientReply timeoutReply = noRetryClient.io().sendReadOnly(queryMessage);
+ Assert.assertNotNull(timeoutReply.getException());
+ } catch (Exception ignored) {}
+ });
+
+ thread.start();
+ CompletableFuture<RaftClientReply> result = client.async().send(timeoutIncrement);
+ canRead.release();
+
+ thread.join();
+ }
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private int retrieve(RaftClientReply reply) {
+ return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
+ }
+
+
+ /**
+ * CounterStateMachine support 3 operations
+ * 1. increment
+ * 2. get
+ * 3. waitAndIncrement
+ */
+ static class CounterStateMachine extends BaseStateMachine {
+ private final AtomicLong counter = new AtomicLong(0L);
+
+ @Override
+ public CompletableFuture<Message> query(Message request) {
+ return CompletableFuture.completedFuture(
+ Message.valueOf(String.valueOf(counter.get())));
+ }
+
+ @Override
+ public CompletableFuture<Message> queryStale(Message request, long minIndex) {
+ return query(request);
+ }
+
+ private void sleepQuietly(int millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ LOG.debug("{} be interrupted", Thread.currentThread());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void increment() {
+ counter.incrementAndGet();
+ }
+
+ private void waitAndIncrement() {
+ sleepQuietly(500);
+ increment();
+ }
+
+ private void timeoutIncrement() {
+ sleepQuietly(1500);
+ increment();
+ }
+
+
+ @Override
+ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+ LOG.debug("apply trx with index=" + trx.getLogEntry().getIndex());
+ updateLastAppliedTermIndex(trx.getLogEntry().getTerm(), trx.getLogEntry().getIndex());
+
+ String command = trx.getLogEntry().getStateMachineLogEntry()
+ .getLogData().toString(StandardCharsets.UTF_8);
+
+ LOG.info("receive command: {}", command);
+ if (command.equals(INCREMENT)) {
+ increment();
+ } else if (command.equals(WAIT_AND_INCREMENT)) {
+ waitAndIncrement();
+ } else {
+ timeoutIncrement();
+ }
+
+ return CompletableFuture.completedFuture(Message.valueOf("OK"));
+ }
+ }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
new file mode 100644
index 000000000..b89523c61
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.ReadOnlyRequestTests;
+
+public class TestReadOnlyRequestsWithGrpc
+ extends ReadOnlyRequestTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+}