You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/03/17 03:34:01 UTC
[ratis] branch master updated: RATIS-1552. Fix StateMachineUpdater NN_NAKED_NOTIFY. (#623)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c846363 RATIS-1552. Fix StateMachineUpdater NN_NAKED_NOTIFY. (#623)
c846363 is described below
commit c8463634bd635e3b2bf78206a7e45892bb6ec20c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Mar 17 11:33:55 2022 +0800
RATIS-1552. Fix StateMachineUpdater NN_NAKED_NOTIFY. (#623)
---
.../ratis/server/impl/StateMachineUpdater.java | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 056f614..87aac06 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.server.impl;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.Message;
@@ -42,7 +41,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.LongStream;
@@ -77,13 +76,14 @@ class StateMachineUpdater implements Runnable {
private final boolean purgeUptoSnapshotIndex;
private final Thread updater;
+ private final AwaitForSignal awaitForSignal;
+
private final RaftLogIndex appliedIndex;
private final RaftLogIndex snapshotIndex;
private final AtomicReference<Long> stopIndex = new AtomicReference<>();
private volatile State state = State.RUNNING;
- private final AtomicBoolean notified = new AtomicBoolean();
- private SnapshotRetentionPolicy snapshotRetentionPolicy;
+ private final SnapshotRetentionPolicy snapshotRetentionPolicy;
private StateMachineMetrics stateMachineMetrics = null;
StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
@@ -111,6 +111,7 @@ class StateMachineUpdater implements Runnable {
this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(properties);
updater = new Daemon(this);
+ this.awaitForSignal = new AwaitForSignal(name);
}
void start() {
@@ -159,10 +160,8 @@ class StateMachineUpdater implements Runnable {
notifyUpdater();
}
- @SuppressFBWarnings("NN_NAKED_NOTIFY")
- synchronized void notifyUpdater() {
- notified.set(true);
- notifyAll();
+ void notifyUpdater() {
+ awaitForSignal.signal();
}
@Override
@@ -199,14 +198,13 @@ class StateMachineUpdater implements Runnable {
}
}
- private synchronized void waitForCommit() throws InterruptedException {
+ private void waitForCommit() throws InterruptedException {
// When a peer starts, the committed is initialized to 0.
// It will be updated only after the leader contacts other peers.
// Thus it is possible to have applied > committed initially.
final long applied = getLastAppliedIndex();
for(; applied >= raftLog.getLastCommittedIndex() && state == State.RUNNING && !shouldStop(); ) {
- wait();
- if (notified.getAndSet(false)) {
+ if (awaitForSignal.await(100, TimeUnit.MILLISECONDS)) {
return;
}
}