You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/09/06 02:52:51 UTC
[ratis] branch master updated: RATIS-1697. StateMachineUpdater.getStateMachineLastAppliedIndex should handle null. (#736)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 06846f092 RATIS-1697. StateMachineUpdater.getStateMachineLastAppliedIndex should handle null. (#736)
06846f092 is described below
commit 06846f09209e978348299d3814e61ae93566b8b0
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Sep 6 10:52:46 2022 +0800
RATIS-1697. StateMachineUpdater.getStateMachineLastAppliedIndex should handle null. (#736)
* RATIS-1697. StateMachineUpdater.getStateMachineLastAppliedIndex should handle null.
* Fix test failures.
---
.../ratis/server/impl/StateMachineUpdater.java | 28 +++++++++++-----------
1 file changed, 14 insertions(+), 14 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 1d9b85e79..bd989a389 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -22,6 +22,7 @@ import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -52,7 +53,7 @@ import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
* This class tracks the log entries that have been committed in a quorum and
* applies them to the state machine. We let a separate thread do this work
* asynchronously so that this will not block normal raft protocol.
- *
+ * <p>
* If the auto log compaction is enabled, the state machine updater thread will
* trigger a snapshot of the state machine by calling
* {@link StateMachine#takeSnapshot} when the log size exceeds a limit.
@@ -84,7 +85,8 @@ class StateMachineUpdater implements Runnable {
private volatile State state = State.RUNNING;
private final SnapshotRetentionPolicy snapshotRetentionPolicy;
- private StateMachineMetrics stateMachineMetrics = null;
+
+ private final MemoizedSupplier<StateMachineMetrics> stateMachineMetrics;
StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
ServerState serverState, long lastAppliedIndex, RaftProperties properties) {
@@ -112,27 +114,23 @@ class StateMachineUpdater implements Runnable {
updater = new Daemon(this);
this.awaitForSignal = new AwaitForSignal(name);
+ this.stateMachineMetrics = MemoizedSupplier.valueOf(
+ () -> StateMachineMetrics.getStateMachineMetrics(server, appliedIndex, stateMachine));
}
void start() {
//wait for RaftServerImpl and ServerState constructors to complete
- initializeMetrics();
+ stateMachineMetrics.get();
updater.start();
}
- private void initializeMetrics() {
- if (stateMachineMetrics == null) {
- stateMachineMetrics =
- StateMachineMetrics.getStateMachineMetrics(
- server, appliedIndex, stateMachine);
- }
- }
-
private void stop() {
state = State.STOP;
try {
stateMachine.close();
- stateMachineMetrics.unregister();
+ if (stateMachineMetrics.isInitialized()) {
+ stateMachineMetrics.get().unregister();
+ }
} catch(Throwable t) {
LOG.warn(name + ": Failed to close " + JavaUtils.getClassSimpleName(stateMachine.getClass())
+ " " + stateMachine, t);
@@ -266,7 +264,7 @@ class StateMachineUpdater implements Runnable {
private void takeSnapshot() {
final long i;
try {
- Timer.Context takeSnapshotTimerContext = stateMachineMetrics.getTakeSnapshotTimer().time();
+ Timer.Context takeSnapshotTimerContext = stateMachineMetrics.get().getTakeSnapshotTimer().time();
i = stateMachine.takeSnapshot();
takeSnapshotTimerContext.stop();
server.getSnapshotRequestHandler().completeTakingSnapshot(i);
@@ -323,6 +321,8 @@ class StateMachineUpdater implements Runnable {
}
long getStateMachineLastAppliedIndex() {
- return stateMachine.getLastAppliedTermIndex().getIndex();
+ return Optional.ofNullable(stateMachine.getLastAppliedTermIndex())
+ .map(TermIndex::getIndex)
+ .orElse(RaftLog.INVALID_LOG_INDEX);
}
}