You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/03/17 03:34:01 UTC

[ratis] branch master updated: RATIS-1552. Fix StateMachineUpdater NN_NAKED_NOTIFY. (#623)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c846363  RATIS-1552. Fix StateMachineUpdater NN_NAKED_NOTIFY. (#623)
c846363 is described below

commit c8463634bd635e3b2bf78206a7e45892bb6ec20c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Mar 17 11:33:55 2022 +0800

    RATIS-1552. Fix StateMachineUpdater NN_NAKED_NOTIFY. (#623)
---
 .../ratis/server/impl/StateMachineUpdater.java       | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 056f614..87aac06 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.server.impl;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.Message;
@@ -42,7 +41,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.LongStream;
@@ -77,13 +76,14 @@ class StateMachineUpdater implements Runnable {
   private final boolean purgeUptoSnapshotIndex;
 
   private final Thread updater;
+  private final AwaitForSignal awaitForSignal;
+
   private final RaftLogIndex appliedIndex;
   private final RaftLogIndex snapshotIndex;
   private final AtomicReference<Long> stopIndex = new AtomicReference<>();
   private volatile State state = State.RUNNING;
-  private final AtomicBoolean notified = new AtomicBoolean();
 
-  private SnapshotRetentionPolicy snapshotRetentionPolicy;
+  private final SnapshotRetentionPolicy snapshotRetentionPolicy;
   private StateMachineMetrics stateMachineMetrics = null;
 
   StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
@@ -111,6 +111,7 @@ class StateMachineUpdater implements Runnable {
     this.purgeUptoSnapshotIndex = RaftServerConfigKeys.Log.purgeUptoSnapshotIndex(properties);
 
     updater = new Daemon(this);
+    this.awaitForSignal = new AwaitForSignal(name);
   }
 
   void start() {
@@ -159,10 +160,8 @@ class StateMachineUpdater implements Runnable {
     notifyUpdater();
   }
 
-  @SuppressFBWarnings("NN_NAKED_NOTIFY")
-  synchronized void notifyUpdater() {
-    notified.set(true);
-    notifyAll();
+  void notifyUpdater() {
+    awaitForSignal.signal();
   }
 
   @Override
@@ -199,14 +198,13 @@ class StateMachineUpdater implements Runnable {
     }
   }
 
-  private synchronized void waitForCommit() throws InterruptedException {
+  private void waitForCommit() throws InterruptedException {
     // When a peer starts, the committed is initialized to 0.
     // It will be updated only after the leader contacts other peers.
     // Thus it is possible to have applied > committed initially.
     final long applied = getLastAppliedIndex();
     for(; applied >= raftLog.getLastCommittedIndex() && state == State.RUNNING && !shouldStop(); ) {
-      wait();
-      if (notified.getAndSet(false)) {
+      if (awaitForSignal.await(100, TimeUnit.MILLISECONDS)) {
         return;
       }
     }