You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/01 01:22:16 UTC

[ratis] branch branch-2_tmp created (now 8247d3c6b)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a change to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git


      at 8247d3c6b RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)

This branch includes the following new commits:

     new fb176cfea RATIS-1750. Add snapshot section in dev guide (#788)
     new 3923d64bf RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789)
     new e01c8e579 RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)
     new e40a5f8aa RATIS-1763. Purging logs in an ordered manner. (#801)
     new 3e1419c72 RATIS-1767. Initialize MatchIndex to RaftLog.INVALID_LOG_INDEX. (#805)
     new 070a3aa1a RATIS-1768. Fix stepDown command don't work issue (#806)
     new 272eeae2a RATIS-1766. Add descriptions to metrics entries (#804)
     new f18bce302 RATIS-1772. Refactor the startLeaderElection code in LeaderStateImpl. (#811)
     new ed481ff95 RATIS-1784. Ignore .vscode in source repo. (#822)
     new 4a0496b18 RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823)
     new c2e3bcca0 RATIS-1543. Log from GrpcLogAppender is confusing. (#821)
     new e47fcb962 RATIS-1786. Reset the digester of the follower at the beginning of each file transfer during a InstallSnapshot to avoid snapshot transfer failure (#825)
     new f348cac8b RATIS-1785. Use SingleThreadExecutor to manage the lifetime of single thread (#824)
     new 74830239e RATIS-1788. Improve the JvmPauseMonitor log messages. (#826)
     new beb6a5dfd RATIS-1791. Intermittent failure in ServerRestartTests#testRestartFollower (#827)
     new 8247d3c6b RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ratis] 11/16: RATIS-1543. Log from GrpcLogAppender is confusing. (#821)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit c2e3bcca06ab3b566959133358a7f4399bd613bb
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Feb 17 01:22:28 2023 -0800

    RATIS-1543. Log from GrpcLogAppender is confusing. (#821)
    
    (cherry picked from commit ca9fdd29e15b81464fd20d73a1d070a050bae30c)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 23 +++++++++++++---------
 1 file changed, 14 insertions(+), 9 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 125dd7dfa..7c742d91d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -123,13 +123,14 @@ public class GrpcLogAppender extends LogAppenderBase {
       firstResponseReceived = false;
       // clear the pending requests queue and reset the next index of follower
       pendingRequests.clear();
+      final FollowerInfo f = getFollower();
       final long nextIndex = 1 + Optional.ofNullable(request)
           .map(AppendEntriesRequest::getPreviousLog)
           .map(TermIndex::getIndex)
-          .orElseGet(getFollower()::getMatchIndex);
-      if (onError && getFollower().getMatchIndex() == 0 && request == null) {
-        LOG.warn("{}: Leader has not got in touch with Follower {} yet, " +
-          "just keep nextIndex unchanged and retry.", this, getFollower());
+          .orElseGet(f::getMatchIndex);
+      if (onError && f.getMatchIndex() == 0 && request == null) {
+        LOG.warn("{}: Follower failed when matchIndex == 0, " +
+          " keep nextIndex ({}) unchanged and retry.", this, f.getNextIndex());
         return;
       }
       getFollower().decreaseNextIndex(nextIndex);
@@ -324,8 +325,10 @@ public class GrpcLogAppender extends LogAppenderBase {
     }
   }
 
-  private void increaseNextIndex(final long installedSnapshotIndex) {
-    getFollower().updateNextIndex(installedSnapshotIndex + 1);
+  private void increaseNextIndex(final long installedSnapshotIndex, Object reason) {
+    final long newNextIndex = installedSnapshotIndex + 1;
+    LOG.info("{}: updateNextIndex {} for {}", this, newNextIndex, reason);
+    getFollower().updateNextIndex(newNextIndex);
   }
 
   /**
@@ -381,12 +384,14 @@ public class GrpcLogAppender extends LogAppenderBase {
           break;
         case NOT_LEADER:
           grpcServerMetrics.onRequestNotLeader(getFollowerId().toString());
+          LOG.warn("{}: received {} reply with term {}", this, reply.getResult(), reply.getTerm());
           if (onFollowerTerm(reply.getTerm())) {
             return;
           }
           break;
         case INCONSISTENCY:
           grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
+          LOG.warn("{}: received {} reply with nextIndex {}", this, reply.getResult(), reply.getNextIndex());
           updateNextIndex(reply.getNextIndex());
           break;
         default:
@@ -518,7 +523,7 @@ public class GrpcLogAppender extends LogAppenderBase {
           getFollower().setSnapshotIndex(followerSnapshotIndex);
           getFollower().setAttemptedToInstallSnapshot();
           getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex);
-          increaseNextIndex(followerSnapshotIndex);
+          increaseNextIndex(followerSnapshotIndex, reply.getResult());
           removePending(reply);
           break;
         case NOT_LEADER:
@@ -535,7 +540,7 @@ public class GrpcLogAppender extends LogAppenderBase {
           getFollower().setSnapshotIndex(followerSnapshotIndex);
           getFollower().setAttemptedToInstallSnapshot();
           getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex);
-          increaseNextIndex(followerSnapshotIndex);
+          increaseNextIndex(followerSnapshotIndex, reply.getResult());
           onFollowerCatchup(followerSnapshotIndex);
           removePending(reply);
           break;
@@ -547,7 +552,7 @@ public class GrpcLogAppender extends LogAppenderBase {
           removePending(reply);
           break;
         case UNRECOGNIZED:
-          LOG.error("Unrecongnized the reply result {}: Leader is {}, follower is {}",
+          LOG.error("Unrecognized the reply result {}: Leader is {}, follower is {}",
               reply.getResult(), getServer().getId(), getFollowerId());
           break;
         default:


[ratis] 15/16: RATIS-1791. Intermittent failure in ServerRestartTests#testRestartFollower (#827)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit beb6a5dfd3a59cbc78103490917e7ad473baa6f4
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Sat Feb 25 04:04:01 2023 +0800

    RATIS-1791. Intermittent failure in ServerRestartTests#testRestartFollower (#827)
    
    (cherry picked from commit 3a19630a8fe17e055f820b712450123201641421)
---
 .../test/java/org/apache/ratis/server/ServerRestartTests.java  | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 2c0e18167..31e673099 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -119,18 +119,22 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
     final RaftLog followerLog = follower.getRaftLog();
     final long followerLastIndex = followerLog.getLastEntryTermIndex().getIndex();
     Assert.assertTrue(followerLastIndex >= leaderLastIndex);
+    final long leaderFinalIndex = cluster.getLeader().getRaftLog().getLastEntryTermIndex().getIndex();
+    Assert.assertEquals(leaderFinalIndex, followerLastIndex);
 
     final File followerOpenLogFile = getOpenLogFile(follower);
     final File leaderOpenLogFile = getOpenLogFile(cluster.getDivision(leaderId));
 
     // shutdown all servers
-    for(RaftServer s : cluster.getServers()) {
-      s.close();
+    // shutdown followers first, so there won't be any new leader elected
+    for (RaftServer.Division d : cluster.getFollowers()) {
+      d.close();
     }
+    cluster.getDivision(leaderId).close();
 
     // truncate log and
     assertTruncatedLog(followerId, followerOpenLogFile, followerLastIndex, cluster);
-    assertTruncatedLog(leaderId, leaderOpenLogFile, leaderLastIndex, cluster);
+    assertTruncatedLog(leaderId, leaderOpenLogFile, leaderFinalIndex, cluster);
 
     // restart and write something.
     cluster.restart(false);


[ratis] 14/16: RATIS-1788. Improve the JvmPauseMonitor log messages. (#826)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 74830239efd359216406459cc50047d22e2c39f8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Feb 23 00:32:46 2023 -0800

    RATIS-1788. Improve the JvmPauseMonitor log messages. (#826)
    
    (cherry picked from commit 655db36c68a4c46a59150b548e76f2e92c33bf84)
---
 .../java/org/apache/ratis/util/JvmPauseMonitor.java    | 18 ++++++++----------
 .../main/java/org/apache/ratis/util/TimeDuration.java  | 18 +++++++++++++++++-
 .../org/apache/ratis/server/impl/RaftServerProxy.java  |  4 ++++
 .../java/org/apache/ratis/util/TestTimeDuration.java   | 18 ++++++++++--------
 4 files changed, 39 insertions(+), 19 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
index 1fcfc4d6a..532bd2255 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JvmPauseMonitor.java
@@ -65,28 +65,26 @@ public class JvmPauseMonitor {
   }
 
   static String toString(Map<String, GcInfo> beforeSleep, TimeDuration extraSleepTime, Map<String, GcInfo> afterSleep) {
-    final StringBuilder b = new StringBuilder("Detected pause in JVM or host machine (eg GC): pause of approximately ")
-        .append(extraSleepTime)
-        .append('.');
-
-    boolean detected = false;
+    final StringBuilder b = new StringBuilder();
+    long ms = 0;
     for(Map.Entry<String, GcInfo> before: beforeSleep.entrySet()) {
       final String name = before.getKey();
       final GcInfo after = afterSleep.get(name);
       if (after != null) {
         final GcInfo diff = after.subtract(before.getValue());
         if (diff.count != 0) {
+          ms += diff.timeMs;
           b.append(System.lineSeparator()).append("GC pool '").append(name)
               .append("' had collection(s): ").append(diff);
-          detected = true;
         }
       }
     }
 
-    if (!detected) {
-      b.append(" No GCs detected.");
-    }
-    return b.toString();
+    final String gc = b.length() == 0? " without any GCs."
+        : " with " + TimeDuration.valueOf(ms, TimeUnit.MILLISECONDS).toString(TimeUnit.SECONDS, 3)
+        + " GC time." + b;
+    return "Detected pause in JVM or host machine approximately "
+        + extraSleepTime.toString(TimeUnit.SECONDS, 3) + gc;
   }
 
   private static final TimeDuration SLEEP_TIME = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 58a61cc29..98fb3694e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -32,7 +32,7 @@ import java.util.function.LongUnaryOperator;
 
 /**
  * Time duration is represented by a long together with a {@link TimeUnit}.
- *
+ * <p>
  * This is a value-based class.
  */
 public final class TimeDuration implements Comparable<TimeDuration> {
@@ -375,4 +375,20 @@ public final class TimeDuration implements Comparable<TimeDuration> {
   public String toString() {
     return duration + Abbreviation.valueOf(unit).getDefault();
   }
+
+  /** @return a representation of this object in the given target unit and decimal places. */
+  public String toString(TimeUnit targetUnit, int decimalPlaces) {
+    Objects.requireNonNull(targetUnit, "targetUnit == null");
+    if (targetUnit.compareTo(unit) <= 0) {
+      return to(targetUnit).toString();
+    }
+    final double divisor = unit.convert(1, targetUnit);
+    if (duration % divisor == 0) {
+      return to(targetUnit).toString();
+    }
+    final String decimal = StringUtils.format("%." + decimalPlaces + "f", duration/divisor);
+    final String s = decimal + Abbreviation.valueOf(targetUnit).getDefault();
+    LOG.debug("{}.to({}) = {}", this, targetUnit, s);
+    return s;
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 54cf15c65..f6f7223da 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -226,8 +226,12 @@ class RaftServerProxy implements RaftServer {
   private void handleJvmPause(TimeDuration extraSleep, TimeDuration closeThreshold, TimeDuration stepDownThreshold)
       throws IOException {
     if (extraSleep.compareTo(closeThreshold) > 0) {
+      LOG.error("{}: JVM pause detected {} longer than the close-threshold {}, shutting down ...",
+          getId(), extraSleep.toString(TimeUnit.SECONDS, 3), closeThreshold.toString(TimeUnit.SECONDS, 3));
       close();
     } else if (extraSleep.compareTo(stepDownThreshold) > 0) {
+      LOG.warn("{}: JVM pause detected {} longer than the step-down-threshold {}",
+          getId(), extraSleep.toString(TimeUnit.SECONDS, 3), stepDownThreshold.toString(TimeUnit.SECONDS, 3));
       getImpls().forEach(RaftServerImpl::stepDownOnJvmPause);
     }
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
index edb0ad8a2..584c9e811 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.ratis.util.TimeDuration.Abbreviation;
+import static org.apache.ratis.util.TimeDuration.LOG;
 import static org.apache.ratis.util.TimeDuration.parse;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -110,22 +111,23 @@ public class TestTimeDuration {
   @Test(timeout = 1000)
   public void testTo() {
     final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS);
-    assertTo(1000, oneSecond, TimeUnit.MILLISECONDS);
-    final TimeDuration nanos = assertTo(1_000_000_000, oneSecond, TimeUnit.NANOSECONDS);
-    assertTo(1000, nanos, TimeUnit.MILLISECONDS);
+    assertTo(1000, "1000ms", oneSecond, TimeUnit.MILLISECONDS);
+    final TimeDuration nanos = assertTo(1_000_000_000, "1000000000ns", oneSecond, TimeUnit.NANOSECONDS);
+    assertTo(1000, "1000ms", nanos, TimeUnit.MILLISECONDS);
 
-    assertTo(0, oneSecond, TimeUnit.MINUTES);
-    assertTo(0, nanos, TimeUnit.MINUTES);
+    assertTo(0, "0.0167min", oneSecond, TimeUnit.MINUTES);
+    assertTo(0, "0.0167min", nanos, TimeUnit.MINUTES);
 
     final TimeDuration millis = TimeDuration.valueOf(1_999, TimeUnit.MILLISECONDS);
-    assertTo(1, millis, TimeUnit.SECONDS);
-    assertTo(0, millis, TimeUnit.MINUTES);
+    assertTo(1, "1.9990s", millis, TimeUnit.SECONDS);
+    assertTo(0, "0.0333min", millis, TimeUnit.MINUTES);
   }
 
-  static TimeDuration assertTo(long expected, TimeDuration timeDuration, TimeUnit toUnit) {
+  static TimeDuration assertTo(long expected, String expectedString, TimeDuration timeDuration, TimeUnit toUnit) {
     final TimeDuration computed = timeDuration.to(toUnit);
     assertEquals(expected, computed.getDuration());
     assertEquals(toUnit, computed.getUnit());
+    assertEquals(expectedString, timeDuration.toString(toUnit, 4));
     return computed;
   }
 


[ratis] 13/16: RATIS-1785. Use SingleThreadExecutor to manage the lifetime of single thread (#824)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit f348cac8b07ff111e57af702a0efd94999a7e2c5
Author: Potato <ta...@apache.org>
AuthorDate: Wed Feb 22 06:15:36 2023 +0800

    RATIS-1785. Use SingleThreadExecutor to manage the lifetime of single thread (#824)
    
    (cherry picked from commit d838c56772568211b9af617d78c5182af641faac)
---
 .../java/org/apache/ratis/util/ConcurrentUtils.java   | 15 +++++++++++++++
 .../org/apache/ratis/server/impl/RaftServerProxy.java |  3 ++-
 .../raftlog/segmented/SegmentedRaftLogWorker.java     | 19 +++++++------------
 3 files changed, 24 insertions(+), 13 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index df214b388..372fa62bc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -80,6 +80,21 @@ public interface ConcurrentUtils {
     };
   }
 
+  /**
+    * This method is similar to {@link java.util.concurrent.Executors#newSingleThreadExecutor(ThreadFactory)}
+    * except that this method takes a specific thread name as there is only one thread.g
+    *
+    * @param name the thread name for only one thread.
+    * @return a new {@link ExecutorService}.
+    */
+  static ExecutorService newSingleThreadExecutor(String name) {
+      return Executors.newSingleThreadExecutor(runnable -> {
+          final Thread t = new Thread(runnable);
+          t.setName(name);
+          return t;
+        });
+  }
+
   /**
    * The same as {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}
    * except that this method takes a maximumPoolSize parameter.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index bef72ee0d..54cf15c65 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -189,7 +189,7 @@ class RaftServerProxy implements RaftServer {
   private final DataStreamServerRpc dataStreamServerRpc;
 
   private final ImplMap impls = new ImplMap();
-  private final ExecutorService implExecutor = Executors.newSingleThreadExecutor();
+  private final ExecutorService implExecutor;
   private final ExecutorService executor;
 
   private final JvmPauseMonitor pauseMonitor;
@@ -210,6 +210,7 @@ class RaftServerProxy implements RaftServer {
 
     this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();
 
+    this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement");
     this.executor = ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.proxyCached(properties),
         RaftServerConfigKeys.ThreadPool.proxySize(properties),
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 7e49522fb..a7bbcab23 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -142,8 +142,7 @@ class SegmentedRaftLogWorker {
   private final DataBlockingQueue<Task> queue;
   private final WriteLogTasks writeTasks = new WriteLogTasks();
   private volatile boolean running = true;
-  private final Thread workerThread;
-
+  private final ExecutorService workerThreadExecutor;
   private final RaftStorage storage;
   private volatile SegmentedRaftLogOutputStream out;
   private final Runnable submitUpdateCommitEvent;
@@ -205,7 +204,7 @@ class SegmentedRaftLogWorker {
 
     this.stateMachineDataPolicy = new StateMachineDataPolicy(properties, metricRegistry);
 
-    this.workerThread = new Thread(this::run, name);
+    this.workerThreadExecutor = ConcurrentUtils.newSingleThreadExecutor(name);
 
     // Server Id can be null in unit tests
     metricRegistry.addDataQueueSizeGauge(queue);
@@ -228,7 +227,7 @@ class SegmentedRaftLogWorker {
           " and " + RaftServerConfigKeys.Log.ASYNC_FLUSH_ENABLED_KEY);
     }
     this.flushExecutor = (!asyncFlush && !unsafeFlush)? null
-        : Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name + "-flush"));
+        : ConcurrentUtils.newSingleThreadExecutor(name + "-flush");
   }
 
   void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
@@ -240,19 +239,15 @@ class SegmentedRaftLogWorker {
       Preconditions.assertTrue(openSegmentFile.exists());
       allocateSegmentedRaftLogOutputStream(openSegmentFile, true);
     }
-    workerThread.start();
+    workerThreadExecutor.submit(this::run);
   }
 
   void close() {
     this.running = false;
     sharedBuffer.set(null);
-    workerThread.interrupt();
     Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
-    try {
-      workerThread.join(3000);
-    } catch (InterruptedException ignored) {
-      Thread.currentThread().interrupt();
-    }
+    ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3),
+        workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + timeout, name));
     IOUtils.cleanup(LOG, out);
     LOG.info("{} close()", name);
   }
@@ -300,7 +295,7 @@ class SegmentedRaftLogWorker {
   }
 
   boolean isAlive() {
-    return running && workerThread.isAlive();
+    return running && !workerThreadExecutor.isTerminated();
   }
 
   private void run() {


[ratis] 12/16: RATIS-1786. Reset the digester of the follower at the beginning of each file transfer during a InstallSnapshot to avoid snapshot transfer failure (#825)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit e47fcb96217e6e2b44fa1c36e8b4106de9f0d490
Author: Potato <ta...@apache.org>
AuthorDate: Wed Feb 22 04:02:47 2023 +0800

    RATIS-1786. Reset the digester of the follower at the beginning of each file transfer during a InstallSnapshot to avoid snapshot transfer failure (#825)
    
    (cherry picked from commit 4ba1e71a3a0c0110421c459d7c2d116c0bed10ea)
---
 .../src/main/java/org/apache/ratis/server/storage/SnapshotManager.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 294f0a205..1ea7363fb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -115,6 +115,7 @@ public class SnapshotManager {
           }
           // create the temp snapshot file and put padding inside
           out = new FileOutputStream(tmpSnapshotFile);
+          digester.get().reset();
         } else {
           Preconditions.assertTrue(tmpSnapshotFile.exists());
           out = new FileOutputStream(tmpSnapshotFile, true);


[ratis] 05/16: RATIS-1767. Initialize MatchIndex to RaftLog.INVALID_LOG_INDEX. (#805)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 3e1419c72ad10d6c14385a3daf2f1f25ea8bcdc7
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Jan 10 14:45:05 2023 +0800

    RATIS-1767. Initialize MatchIndex to RaftLog.INVALID_LOG_INDEX. (#805)
    
    (cherry picked from commit c62c9e22557bb55173f0668fd196ee51fb1f73be)
---
 .../src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 81dbe29c6..0d7fe2075 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -37,7 +37,7 @@ class FollowerInfoImpl implements FollowerInfo {
   private final AtomicReference<Timestamp> lastRpcSendTime;
   private final AtomicReference<Timestamp> lastHeartbeatSendTime;
   private final RaftLogIndex nextIndex;
-  private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L);
+  private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
   private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
   private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
   private volatile boolean attendVote;


[ratis] 09/16: RATIS-1784. Ignore .vscode in source repo. (#822)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit ed481ff95f9b6688108616df1380a83e4e9ca74b
Author: Arpit Agarwal <ar...@users.noreply.github.com>
AuthorDate: Thu Feb 16 14:46:59 2023 -0800

    RATIS-1784. Ignore .vscode in source repo. (#822)
    
    (cherry picked from commit dd488f8b29fd1647da15c9609778f6ba97ddc4e0)
---
 .gitignore | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/.gitignore b/.gitignore
index f56494487..6af045672 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,4 +16,5 @@
 target
 build
 patchprocess
-dependency-reduced-pom.xml
\ No newline at end of file
+dependency-reduced-pom.xml
+.vscode/


[ratis] 02/16: RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 3923d64bf02c0e2fbc0915dae5fcc0ea9b12204e
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Dec 6 05:00:47 2022 -0800

    RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789)
    
    (cherry picked from commit 1c00461b93a2d259bf810713b00a9791a6bd292d)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 119 +++++++++++++--------
 .../ratis/server/impl/PeerConfiguration.java       |   7 +-
 .../ratis/server/impl/RaftConfigurationImpl.java   |  11 +-
 3 files changed, 90 insertions(+), 47 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 4a9c07bee..b55389343 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -225,6 +225,72 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
+  /** For caching {@link FollowerInfo}s.  This class is immutable. */
+  static class CurrentOldFollowerInfos {
+    private final RaftConfigurationImpl conf;
+    private final List<FollowerInfo> current;
+    private final List<FollowerInfo> old;
+
+    CurrentOldFollowerInfos(RaftConfigurationImpl conf, List<FollowerInfo> current, List<FollowerInfo> old) {
+      // set null when the sizes are not the same so that it will update next time.
+      this.conf = isSameSize(current, conf.getConf()) && isSameSize(old, conf.getOldConf())? conf: null;
+      this.current = Collections.unmodifiableList(current);
+      this.old = old == null? null: Collections.unmodifiableList(old);
+    }
+
+    RaftConfigurationImpl getConf() {
+      return conf;
+    }
+
+    List<FollowerInfo> getCurrent() {
+      return current;
+    }
+
+    List<FollowerInfo> getOld() {
+      return old;
+    }
+  }
+
+  static boolean isSameSize(List<FollowerInfo> infos, PeerConfiguration conf) {
+    return conf == null? infos == null: conf.size() == infos.size();
+  }
+
+  /** Use == to compare if the confs are the same object. */
+  static boolean isSameConf(CurrentOldFollowerInfos cached, RaftConfigurationImpl conf) {
+    return cached != null && cached.getConf() == conf;
+  }
+
+  static class FollowerInfoMap {
+    private final Map<RaftPeerId, FollowerInfo> map = new ConcurrentHashMap<>();
+
+    private volatile CurrentOldFollowerInfos followerInfos;
+
+    void put(RaftPeerId id, FollowerInfo info) {
+      map.put(id, info);
+    }
+
+    CurrentOldFollowerInfos getFollowerInfos(RaftConfigurationImpl conf) {
+      final CurrentOldFollowerInfos cached = followerInfos;
+      if (isSameConf(cached, conf)) {
+        return cached;
+      }
+
+      return update(conf);
+    }
+
+    synchronized CurrentOldFollowerInfos update(RaftConfigurationImpl conf) {
+      if (!isSameConf(followerInfos, conf)) { // compare again synchronized
+        followerInfos = new CurrentOldFollowerInfos(conf, getFollowerInfos(conf.getConf()),
+            Optional.ofNullable(conf.getOldConf()).map(this::getFollowerInfos).orElse(null));
+      }
+      return followerInfos;
+    }
+
+    private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
+      return peers.streamPeerIds().map(map::get).filter(Objects::nonNull).collect(Collectors.toList());
+    }
+  }
+
   private final StateUpdateEvent updateCommitEvent =
       new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit);
   private final StateUpdateEvent checkStagingEvent =
@@ -235,8 +301,8 @@ class LeaderStateImpl implements LeaderState {
   private final RaftLog raftLog;
   private final long currentTerm;
   private volatile ConfigurationStagingState stagingState;
-  private List<List<RaftPeerId>> voterLists;
-  private final Map<RaftPeerId, FollowerInfo> peerIdFollowerInfoMap = new ConcurrentHashMap<>();
+
+  private final FollowerInfoMap followerInfoMap = new FollowerInfoMap();
 
   /**
    * The list of threads appending entries to followers.
@@ -299,7 +365,6 @@ class LeaderStateImpl implements LeaderState {
     if (!listeners.isEmpty()) {
       addSenders(listeners, placeHolderIndex, false, RaftPeerRole.LISTENER);
     }
-    voterLists = divideFollowers(conf);
   }
 
   LogEntryProto start() {
@@ -472,7 +537,6 @@ class LeaderStateImpl implements LeaderState {
 
   private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) {
     Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex());
-    voterLists = divideFollowers(newConf);
     server.getState().setRaftConf(newConf);
   }
 
@@ -508,7 +572,7 @@ class LeaderStateImpl implements LeaderState {
     final List<LogAppender> newAppenders = newPeers.stream()
         .map(peer -> {
           final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote);
-          peerIdFollowerInfoMap.put(peer.getId(), f);
+          followerInfoMap.put(peer.getId(), f);
           if (role == RaftPeerRole.FOLLOWER) {
             raftServerMetrics.addFollower(peer.getId());
             logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
@@ -780,7 +844,8 @@ class LeaderStateImpl implements LeaderState {
     final RaftPeerId selfId = server.getId();
     final RaftConfigurationImpl conf = server.getRaftConf();
 
-    final List<RaftPeerId> followers = voterLists.get(0);
+    final CurrentOldFollowerInfos infos = followerInfoMap.getFollowerInfos(conf);
+    final List<FollowerInfo> followers = infos.getCurrent();
     final boolean includeSelf = conf.containsInConf(selfId);
     if (followers.isEmpty() && !includeSelf) {
       return Optional.empty();
@@ -792,7 +857,7 @@ class LeaderStateImpl implements LeaderState {
     if (!conf.isTransitional()) {
       return Optional.of(newConf);
     } else { // configuration is in transitional state
-      final List<RaftPeerId> oldFollowers = voterLists.get(1);
+      final List<FollowerInfo> oldFollowers = infos.getOld();
       final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
       if (oldFollowers.isEmpty() && !includeSelfInOldConf) {
         return Optional.empty();
@@ -882,31 +947,14 @@ class LeaderStateImpl implements LeaderState {
     notifySenders();
   }
 
-  private List<FollowerInfo> getFollowerInfos(List<RaftPeerId> followerIDs) {
-    List<FollowerInfo> followerInfos = new ArrayList<>();
-    for (int i = 0; i < followerIDs.size(); i++) {
-      RaftPeerId id = followerIDs.get(i);
-      if (!peerIdFollowerInfoMap.containsKey(id)) {
-        throw new IllegalArgumentException("RaftPeerId:" + id +
-                " not in peerIdFollowerInfoMap of leader:" + server.getMemberId());
-      }
-
-      followerInfos.add(peerIdFollowerInfoMap.get(id));
-    }
-
-    return followerInfos;
-  }
-
-  private long[] getSorted(List<RaftPeerId> followerIDs, boolean includeSelf,
+  private long[] getSorted(List<FollowerInfo> followerInfos, boolean includeSelf,
       ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) {
-    final int length = includeSelf ? followerIDs.size() + 1 : followerIDs.size();
+    final int length = includeSelf ? followerInfos.size() + 1 : followerInfos.size();
     if (length == 0) {
-      throw new IllegalArgumentException("followers.size() == "
-          + followerIDs.size() + " and includeSelf == " + includeSelf);
+      throw new IllegalArgumentException("followerInfos is empty and includeSelf == " + includeSelf);
     }
 
     final long[] indices = new long[length];
-    List<FollowerInfo> followerInfos = getFollowerInfos(followerIDs);
     for (int i = 0; i < followerInfos.size(); i++) {
       indices[i] = getFollowerIndex.applyAsLong(followerInfos.get(i));
     }
@@ -920,23 +968,6 @@ class LeaderStateImpl implements LeaderState {
     return indices;
   }
 
-  private List<List<RaftPeerId>> divideFollowers(RaftConfigurationImpl conf) {
-    List<List<RaftPeerId>> lists = new ArrayList<>(2);
-    List<RaftPeerId> listForNew = senders.stream()
-        .map(LogAppender::getFollowerId)
-        .filter(conf::containsInConf)
-        .collect(Collectors.toList());
-    lists.add(listForNew);
-    if (conf.isTransitional()) {
-      List<RaftPeerId> listForOld = senders.stream()
-          .map(LogAppender::getFollowerId)
-          .filter(conf::containsInOldConf)
-          .collect(Collectors.toList());
-      lists.add(listForOld);
-    }
-    return lists;
-  }
-
   private void yieldLeaderToHigherPriorityPeer() {
     if (!server.getInfo().isLeader()) {
       return;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 6730b6181..38e3602e8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -31,10 +31,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 /**
  * The peer configuration of a raft cluster.
- *
+ * <p>
  * The objects of this class are immutable.
  */
 class PeerConfiguration {
@@ -95,6 +96,10 @@ class PeerConfiguration {
     return peers.size();
   }
 
+  Stream<RaftPeerId> streamPeerIds() {
+    return peers.keySet().stream();
+  }
+
   @Override
   public String toString() {
     return "peers:" + peers.values() + "|listeners:" + listeners.values();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 43818395a..3e53451f0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -34,10 +34,10 @@ import java.util.stream.Collectors;
 
 /**
  * The configuration of the raft cluster.
- *
+ * <p>
  * The configuration is stable if there is no on-going peer change. Otherwise,
  * the configuration is transitional, i.e. in the middle of a peer change.
- *
+ * <p>
  * The objects of this class are immutable.
  */
 final class RaftConfigurationImpl implements RaftConfiguration {
@@ -157,6 +157,13 @@ final class RaftConfigurationImpl implements RaftConfiguration {
     }
   }
 
+  PeerConfiguration getConf() {
+    return conf;
+  }
+
+  PeerConfiguration getOldConf() {
+    return oldConf;
+  }
 
   boolean isHighestPriority(RaftPeerId peerId) {
     RaftPeer target = getPeer(peerId);


[ratis] 06/16: RATIS-1768. Fix stepDown command don't work issue (#806)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 070a3aa1a254e3403c14389796bfe1a4b177967c
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Fri Jan 13 17:36:03 2023 +0800

    RATIS-1768. Fix stepDown command don't work issue (#806)
    
    (cherry picked from commit 71a9fdffa9ab49d6f4f39402ff3873423021c107)
---
 .../java/org/apache/ratis/client/api/AdminApi.java     |  6 +++++-
 .../java/org/apache/ratis/client/impl/AdminImpl.java   |  6 ++++--
 .../org/apache/ratis/server/impl/RaftServerImpl.java   |  3 ++-
 .../ratis/shell/cli/sh/election/StepDownCommand.java   |  4 +++-
 .../shell/cli/sh/ElectionCommandIntegrationTest.java   | 18 ++++++++++++++++++
 5 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java
index 86e25ef9a..e27c8ae86 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java
@@ -66,5 +66,9 @@ public interface AdminApi {
   }
 
   /** Transfer leadership to the given server.*/
-  RaftClientReply transferLeadership(RaftPeerId newLeader, long timeoutMs) throws IOException;
+  default RaftClientReply transferLeadership(RaftPeerId newLeader, long timeoutMs) throws IOException {
+    return transferLeadership(newLeader, null, timeoutMs);
+  };
+
+  RaftClientReply transferLeadership(RaftPeerId newLeader, RaftPeerId leaderId, long timeoutMs) throws IOException;
 }
\ No newline at end of file
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java
index 445515430..b9e9968a6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java
@@ -48,9 +48,11 @@ class AdminImpl implements AdminApi {
   }
 
   @Override
-  public RaftClientReply transferLeadership(RaftPeerId newLeader, long timeoutMs) throws IOException {
+  public RaftClientReply transferLeadership(
+      RaftPeerId newLeader, RaftPeerId leaderId, long timeoutMs) throws IOException {
     final long callId = CallId.getAndIncrement();
     return client.io().sendRequestWithRetry(() -> new TransferLeadershipRequest(
-        client.getId(), client.getLeaderId(), client.getGroupId(), callId, newLeader, timeoutMs));
+        client.getId(), leaderId == null ? client.getLeaderId() : leaderId,
+        client.getGroupId(), callId, newLeader, timeoutMs));
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8e56ec935..fe909acb3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1099,7 +1099,8 @@ class RaftServerImpl implements RaftServer.Division,
     assertGroup(request.getRequestorId(), request.getRaftGroupId());
 
     return role.getLeaderState().map(leader -> leader.submitStepDownRequestAsync(request))
-        .orElseGet(() -> CompletableFuture.completedFuture(newSuccessReply(request)));
+        .orElseGet(() -> CompletableFuture.completedFuture(
+            newExceptionReply(request, generateNotLeaderException())));
   }
 
   public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/StepDownCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/StepDownCommand.java
index 457663456..50abfe37c 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/StepDownCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/StepDownCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.shell.cli.RaftUtils;
 import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
 import org.apache.ratis.shell.cli.sh.command.Context;
@@ -49,7 +50,8 @@ public class StepDownCommand extends AbstractRatisCommand {
     super.run(cl);
 
     try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
-      final RaftClientReply transferLeadershipReply = client.admin().transferLeadership(null, 60_000);
+      RaftPeerId leaderId = RaftPeerId.valueOf(getLeader(getGroupInfoReply().getRoleInfoProto()).getId());
+      final RaftClientReply transferLeadershipReply = client.admin().transferLeadership(null, leaderId, 60_000);
       processReply(transferLeadershipReply, () -> "Failed to step down leader");
     } catch (Throwable t) {
       printf("caught an error when executing step down leader: %s%n", t.getMessage());
diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
index d68300b26..bdd438c38 100644
--- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
@@ -105,4 +105,22 @@ public abstract class ElectionCommandIntegrationTest <CLUSTER extends MiniRaftCl
       Assert.assertEquals(cluster.getLeader().getId(), newLeader.getId());
     }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionPauseResumeCommand", LOG);
   }
+
+  @Test
+  public void testElectionStepDownCommand() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestElectionStepDownCommand);
+  }
+
+  void runTestElectionStepDownCommand(MiniRaftCluster cluster) throws Exception {
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    String sb = getClusterAddress(cluster);
+    RaftServer.Division newLeader = cluster.getFollowers().get(0);
+    final StringPrintStream out = new StringPrintStream();
+    RatisShell shell = new RatisShell(out.getPrintStream());
+    Assert.assertNotEquals(cluster.getLeader().getId(), newLeader.getId());
+    Assert.assertEquals(2, cluster.getFollowers().size());
+    int ret = shell.run("election", "stepDown", "-peers", sb.toString());
+    Assert.assertEquals(0, ret);
+    Assert.assertEquals(3, cluster.getFollowers().size());
+  }
 }


[ratis] 16/16: RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 8247d3c6b637d6155c76a03dbb8bd9504cacbdb3
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Feb 28 05:00:02 2023 -0800

    RATIS-1793. Enforce raft.server.log.appender.wait-time.min. (#832)
    
    (cherry picked from commit fdd7c5c98beaeed6aa33261752554575282e1a43)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 30 +++++++++++-----------
 .../apache/ratis/server/leader/FollowerInfo.java   |  3 +++
 .../apache/ratis/server/leader/LogAppender.java    |  4 +--
 .../apache/ratis/server/raftlog/RaftLogIndex.java  | 30 ++++++++++++++++------
 .../apache/ratis/server/impl/FollowerInfoImpl.java | 14 +++++++---
 .../ratis/server/leader/LogAppenderBase.java       | 11 ++++++--
 6 files changed, 62 insertions(+), 30 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7c742d91d..b76da8ecb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import com.codahale.metrics.Timer;
 
@@ -74,9 +73,6 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final TimeDuration requestTimeoutDuration;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
-  private final long waitTimeMinMs;
-  private final AtomicReference<Timestamp> lastAppendEntries;
-
   private volatile StreamObservers appendLogRequestObserver;
   private final boolean useSeparateHBChannel;
 
@@ -96,10 +92,6 @@ public class GrpcLogAppender extends LogAppenderBase {
     this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
 
-    final TimeDuration waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
-    this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS);
-    this.lastAppendEntries = new AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate()));
-
     grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
     grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize);
 
@@ -182,10 +174,9 @@ public class GrpcLogAppender extends LogAppenderBase {
       // For normal nodes, new entries should be sent ASAP
       // however for slow followers (especially when the follower is down),
       // keep sending without any wait time only ends up in high CPU load
-      final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs();
-      return Math.max(0L, min);
+      return Math.max(getMinWaitTimeMs(), 0L);
     }
-    return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs());
+    return Math.min(getMinWaitTimeMs(), getHeartbeatWaitTimeMs());
   }
 
   private boolean isSlowFollower() {
@@ -263,13 +254,13 @@ public class GrpcLogAppender extends LogAppenderBase {
     return CALL_ID_COMPARATOR;
   }
 
-  private void appendLog(boolean excludeLogEntries) throws IOException {
+  private void appendLog(boolean heartbeat) throws IOException {
     final AppendEntriesRequestProto pending;
     final AppendEntriesRequest request;
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       // Prepare and send the append request.
       // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
-      pending = newAppendEntriesRequest(callId.getAndIncrement(), excludeLogEntries);
+      pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
       if (pending == null) {
         return;
       }
@@ -282,6 +273,16 @@ public class GrpcLogAppender extends LogAppenderBase {
       }
     }
 
+    final long waitMs = getMinWaitTimeMs();
+    if (waitMs > 0) {
+      try {
+        Thread.sleep(waitMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw IOUtils.toInterruptedIOException(
+            "Interrupted appendLog, heartbeat? " + heartbeat, e);
+      }
+    }
     if (isRunning()) {
       sendRequest(request, pending);
     }
@@ -295,15 +296,14 @@ public class GrpcLogAppender extends LogAppenderBase {
     final boolean sent = Optional.ofNullable(appendLogRequestObserver)
         .map(observer -> {
           observer.onNext(proto);
-          lastAppendEntries.set(Timestamp.currentTime());
           return true;
         }).isPresent();
 
     if (sent) {
+      getFollower().updateLastRpcSendTime(request.isHeartbeat());
       scheduler.onTimeout(requestTimeoutDuration,
           () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
           LOG, () -> "Timeout check failed for append entry request: " + request);
-      getFollower().updateLastRpcSendTime(request.isHeartbeat());
     } else {
       request.stopRequestTimer();
     }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index b4ae8458c..fb63068a5 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -78,6 +78,9 @@ public interface FollowerInfo {
   /** @return the lastRpcResponseTime . */
   Timestamp getLastRpcResponseTime();
 
+  /** @return the lastRpcSendTime . */
+  Timestamp getLastRpcSendTime();
+
   /** Update lastRpcResponseTime to the current time. */
   void updateLastRpcResponseTime();
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index f0ff28690..49a1a12fa 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -166,8 +166,8 @@ public interface LogAppender {
     return getFollower().getNextIndex() < getRaftLog().getNextIndex();
   }
 
-  /** send a heartbeat AppendEntries immediately */
-  void triggerHeartbeat() throws IOException;
+  /** Trigger to send a heartbeat AppendEntries. */
+  void triggerHeartbeat();
 
   /** @return the wait time in milliseconds to send the next heartbeat. */
   default long getHeartbeatWaitTimeMs() {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
index ed545e291..290a58835 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIndex.java
@@ -44,30 +44,44 @@ public class RaftLogIndex {
 
   public boolean setUnconditionally(long newIndex, Consumer<Object> log) {
     final long old = index.getAndSet(newIndex);
-    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": setUnconditionally " + old + " -> " + newIndex));
-    return old != newIndex;
+    final boolean updated = old != newIndex;
+    if (updated) {
+      log.accept(StringUtils.stringSupplierAsObject(
+          () -> name + ": setUnconditionally " + old + " -> " + newIndex));
+    }
+    return updated;
   }
 
   public boolean updateUnconditionally(LongUnaryOperator update, Consumer<Object> log) {
     final long old = index.getAndUpdate(update);
     final long newIndex = update.applyAsLong(old);
-    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateUnconditionally " + old + " -> " + newIndex));
-    return old != newIndex;
+    final boolean updated = old != newIndex;
+    if (updated) {
+      log.accept(StringUtils.stringSupplierAsObject(
+          () -> name + ": updateUnconditionally " + old + " -> " + newIndex));
+    }
+    return updated;
   }
 
   public boolean updateIncreasingly(long newIndex, Consumer<Object> log) {
     final long old = index.getAndSet(newIndex);
     Preconditions.assertTrue(old <= newIndex,
         () -> "Failed to updateIncreasingly for " + name + ": " + old + " -> " + newIndex);
-    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateIncreasingly " + old + " -> " + newIndex));
-    return old != newIndex;
+    final boolean updated = old != newIndex;
+    if (updated) {
+      log.accept(StringUtils.stringSupplierAsObject(
+          () -> name + ": updateIncreasingly " + old + " -> " + newIndex));
+    }
+    return updated;
   }
 
   public boolean updateToMax(long newIndex, Consumer<Object> log) {
     final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex, newIndex));
     final boolean updated = old < newIndex;
-    log.accept(StringUtils.stringSupplierAsObject(
-        () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated));
+    if (updated) {
+      log.accept(StringUtils.stringSupplierAsObject(
+          () -> name + ": updateToMax old=" + old + ", new=" + newIndex + ", updated? " + updated));
+    }
     return updated;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 0d7fe2075..67af642fd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -93,17 +93,20 @@ class FollowerInfoImpl implements FollowerInfo {
 
   @Override
   public void decreaseNextIndex(long newNextIndex) {
-    nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex), infoIndexChange);
+    nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex),
+        message -> infoIndexChange.accept("decreaseNextIndex " + message));
   }
 
   @Override
   public void setNextIndex(long newNextIndex) {
-    nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old, infoIndexChange);
+    nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old,
+        message -> infoIndexChange.accept("setNextIndex " + message));
   }
 
   @Override
   public void updateNextIndex(long newNextIndex) {
-    nextIndex.updateToMax(newNextIndex, infoIndexChange);
+    nextIndex.updateToMax(newNextIndex,
+        message -> infoIndexChange.accept("decreaseNextIndex " + message));
   }
 
   @Override
@@ -160,6 +163,11 @@ class FollowerInfoImpl implements FollowerInfo {
     return lastRpcResponseTime.get();
   }
 
+  @Override
+  public Timestamp getLastRpcSendTime() {
+    return lastRpcSendTime.get();
+  }
+
   @Override
   public void updateLastRpcSendTime(boolean isHeartbeat) {
     final Timestamp currentTime = Timestamp.currentTime();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index bc8a31181..1c0f61836 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -34,10 +34,10 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -56,6 +56,7 @@ public abstract class LogAppenderBase implements LogAppender {
   private final AwaitForSignal eventAwaitForSignal;
 
   private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
+  private final long waitTimeMinMs;
 
   protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
     this.follower = f;
@@ -71,10 +72,12 @@ public abstract class LogAppenderBase implements LogAppender {
     this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
     this.daemon = new LogAppenderDaemon(this);
     this.eventAwaitForSignal = new AwaitForSignal(name);
+
+    this.waitTimeMinMs = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties).toLong(TimeUnit.MILLISECONDS);
   }
 
   @Override
-  public void triggerHeartbeat() throws IOException {
+  public void triggerHeartbeat() {
     if (heartbeatTrigger.compareAndSet(false, true)) {
       notifyLogAppender();
     }
@@ -133,6 +136,10 @@ public abstract class LogAppenderBase implements LogAppender {
     getLeaderState().restart(this);
   }
 
+  public long getMinWaitTimeMs() {
+    return waitTimeMinMs - getFollower().getLastRpcSendTime().elapsedTimeMs();
+  }
+
   @Override
   public final FollowerInfo getFollower() {
     return follower;


[ratis] 08/16: RATIS-1772. Refactor the startLeaderElection code in LeaderStateImpl. (#811)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit f18bce302cfc847ba9f0b83a56a7e2674b50830f
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Jan 19 22:35:24 2023 +0800

    RATIS-1772. Refactor the startLeaderElection code in LeaderStateImpl. (#811)
    
    (cherry picked from commit 8c16c28351576b893564c5cc621c7d069b1de14a)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 68 ++++++++++------------
 1 file changed, 32 insertions(+), 36 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 43349354c..42ceb8510 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -657,7 +657,7 @@ class LeaderStateImpl implements LeaderState {
     return pendingStepDown.submitAsync(request);
   }
 
-  private synchronized void sendStartLeaderElectionToHigherPriorityPeer(RaftPeerId follower, TermIndex lastEntry) {
+  private synchronized void sendStartLeaderElection(RaftPeerId follower, TermIndex lastEntry) {
     ServerState state = server.getState();
     TermIndex currLastEntry = state.getLastEntry();
     if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
@@ -665,6 +665,8 @@ class LeaderStateImpl implements LeaderState {
               "did not match lastEntry:{}", this, follower, currLastEntry, lastEntry);
       return;
     }
+    LOG.info("{}: send StartLeaderElectionRequest to follower {} on term {}, lastEntry={}",
+        this, follower, currentTerm, lastEntry);
 
     final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
         server.getMemberId(), follower, lastEntry);
@@ -681,6 +683,22 @@ class LeaderStateImpl implements LeaderState {
     });
   }
 
+  boolean sendStartLeaderElection(FollowerInfo followerInfo) {
+    final RaftPeerId followerId = followerInfo.getPeer().getId();
+    final TermIndex leaderLastEntry = server.getState().getLastEntry();
+    if (leaderLastEntry == null) {
+      sendStartLeaderElection(followerId, null);
+      return true;
+    }
+
+    final long followerMatchIndex = followerInfo.getMatchIndex();
+    if (followerMatchIndex >= leaderLastEntry.getIndex()) {
+      sendStartLeaderElection(followerId, leaderLastEntry);
+      return true;
+    }
+    return false;
+  }
+
   private void prepare() {
     synchronized (server) {
       if (isRunning()) {
@@ -716,9 +734,8 @@ class LeaderStateImpl implements LeaderState {
               event.execute();
             } else if (inStagingState()) {
               checkStaging();
-            } else {
-              yieldLeaderToHigherPriorityPeer();
-              checkLeadership();
+            } else if (checkLeadership()) {
+              checkPeersForYieldingLeader();
             }
           }
         }
@@ -986,52 +1003,31 @@ class LeaderStateImpl implements LeaderState {
     return indices;
   }
 
-  private void yieldLeaderToHigherPriorityPeer() {
-    if (!server.getInfo().isLeader()) {
-      return;
-    }
-
+  private void checkPeersForYieldingLeader() {
     final RaftConfigurationImpl conf = server.getRaftConf();
     final RaftPeer leader = conf.getPeer(server.getId());
     if (leader == null) {
       LOG.error("{} the leader {} is not in the conf {}", this, server.getId(), conf);
       return;
     }
-    int leaderPriority = leader.getPriority();
+    final int leaderPriority = leader.getPriority();
 
+    FollowerInfo highestPriorityInfo = null;
+    int highestPriority = Integer.MIN_VALUE;
     for (LogAppender logAppender : senders.getSenders()) {
-      final FollowerInfo followerInfo = logAppender.getFollower();
-      final RaftPeerId followerID = followerInfo.getPeer().getId();
-      final RaftPeer follower = conf.getPeer(followerID);
+      final RaftPeer follower = conf.getPeer(logAppender.getFollowerId());
       if (follower == null) {
-        if (conf.getPeer(followerID, RaftPeerRole.LISTENER) == null) {
-          LOG.error("{} the follower {} is not in the conf {}", this, followerID, conf);
-        }
         continue;
       }
       final int followerPriority = follower.getPriority();
-      if (followerPriority <= leaderPriority) {
-        continue;
-      }
-      final TermIndex leaderLastEntry = server.getState().getLastEntry();
-      if (leaderLastEntry == null) {
-        LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " +
-                "is higher than leader's:{} and leader's lastEntry is null",
-            this, followerID, currentTerm, followerPriority, leaderPriority);
-
-        sendStartLeaderElectionToHigherPriorityPeer(followerID, null);
-        return;
-      }
-
-      if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) {
-        LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " +
-                "is higher than leader's:{} and follower's lastEntry index:{} catch up with leader's:{}",
-            this, followerID, currentTerm, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
-            leaderLastEntry.getIndex());
-        sendStartLeaderElectionToHigherPriorityPeer(followerID, leaderLastEntry);
-        return;
+      if (followerPriority > leaderPriority && followerPriority >= highestPriority) {
+        highestPriority = followerPriority;
+        highestPriorityInfo = logAppender.getFollower();
       }
     }
+    if (highestPriorityInfo != null) {
+      sendStartLeaderElection(highestPriorityInfo);
+    }
   }
 
   /**


[ratis] 01/16: RATIS-1750. Add snapshot section in dev guide (#788)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit fb176cfea734fc5cc489c08adb0aa8be61894f3f
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Nov 29 08:06:26 2022 +0800

    RATIS-1750. Add snapshot section in dev guide (#788)
    
    (cherry picked from commit 96b2f267e81959d345725a5dc840725b32fd1b2b)
---
 ratis-docs/src/site/markdown/snapshot.md | 240 +++++++++++++++++++++++++++++++
 1 file changed, 240 insertions(+)

diff --git a/ratis-docs/src/site/markdown/snapshot.md b/ratis-docs/src/site/markdown/snapshot.md
new file mode 100644
index 000000000..f20dc19d7
--- /dev/null
+++ b/ratis-docs/src/site/markdown/snapshot.md
@@ -0,0 +1,240 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# Snapshot Guide
+
+## Overview
+
+Raft log grows during normal operation. As it grows larger, it occupies more space and takes more time to replay. 
+Therefore, some form of log compaction is necessary for practical systems.
+In Ratis, we introduce snapshot mechanism as the way to do log compaction. 
+
+The basic idea of snapshot is to create and save a snapshot reflecting the latest state of the state machine, 
+and then delete previous logs up to the checkpoint.
+
+## Implement snapshot
+
+To enable snapshot, we have to first implement the following two methods in `StateMachine`:
+
+```java
+/**
+ * Dump the in-memory state into a snapshot file in the RaftStorage. The
+ * StateMachine implementation can decide 1) its own snapshot format, 2) when
+ * a snapshot is taken, and 3) how the snapshot is taken (e.g., whether the
+ * snapshot blocks the state machine, and whether to purge log entries after
+ * a snapshot is done).
+ *
+ * The snapshot should include the latest raft configuration.
+ *
+ * @return the largest index of the log entry that has been applied to the
+ *         state machine and also included in the snapshot. Note the log purge
+ *         should be handled separately.
+ */
+long takeSnapshot() throws IOException;
+```
+
+```java
+/**
+ * Returns the information for the latest durable snapshot.
+ */
+SnapshotInfo getLatestSnapshot();
+```
+
+Snapshotting for memory-based state machines is conceptually simple. In
+snapshotting, the entire current system state is written to a snapshot on stable storage.
+
+With disk-based state machines, a recent copy of the system state is maintained on disk as
+part of normal operation. Thus, the Raft log can be discarded as soon as the state machine
+reflects writes to disk, and snapshotting is used only when sending consistent disk images to
+other servers.
+
+Examples of snapshot implementation can be found at 
+https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java.
+
+## Trigger a snapshot
+
+To trigger a snapshot, we can either
+
+* Trigger snapshot manually using `SnapshotManagementApi`. 
+Note that Ratis imposes a minimal creation gap between two subsequent snapshot creation:
+
+  ```java
+  // SnapshotManagementApi
+  RaftClientReply create(long timeoutMs) throws IOException;
+  ```
+  
+  ```java
+  // customize snapshot creation gap
+  RaftServerConfigKeys.Snapshot.setCreationGap(properties, 1024L);
+  ```
+  
+* Enable triggering snapshot automatically when log size exceeds limit (in number of applied log entries). 
+To do so, we may turn on the auto-trigger-snapshot option in RaftProperties 
+and set an appropriate triggering threshold.
+
+  ```java
+  RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
+  RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 400000);
+  ```
+
+## Purge obsolete logs after snapshot
+
+When a snapshot is taken, Ratis will automatically discard obsolete logs.
+By default, Ratis will purge logs up to min(snapshot index, safe index), 
+where safe index equals to the minimal commit index of all group members.
+In other words, if a log is committed by all group members and is included in the latest snapshot,
+then this log can be safely deleted.
+
+Sometimes we can choose to aggressively purge the logs up to the snapshot index 
+even if some peers do not have commit index up to snapshot index. 
+To do this, we can turn on the purge-up-to-snapshot-index option in RaftProperties:
+
+```java
+RaftServerConfigKeys.Log.setPurgeUptoSnapshotIndex(properties, true);
+```
+
+Purging the logs up to snapshot index sometimes leads to unnecessary burst of network bandwidth. 
+That is, even if a follower lags behind only a few logs, the leader still needs to transfer the full snapshot. 
+To avoid this situation, we can preserve some recent logs when purging logs up to snapshot index. 
+To do this, we can set the number of logs to be preserved when purging in RaftProperties:
+
+```java
+RaftServerConfigKeys.Log.setPurgePreservationLogNum(properties, n);
+```
+Intuitively, cost of transferring n logs shall equal the cost of transferring the full snapshot.
+
+## Retain multiple versions of snapshot
+
+Ratis allows `StateMachine` to retain multiple versions of snapshot. 
+To enable this feature, we have to:
+
+1. Set how many number of versions to retain in RaftProperties when building the RaftServer:
+   ```java
+   RaftServerConfigKeys.Snapshot.setRetentionFileNum(properties, 2);
+   ```
+2. Implement `StateMachineStorage.cleanupOldSnapshots` to clean up old versions of snapshot:
+   ```java
+   // StateMachineStorage.java
+   void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) throws IOException;
+   ```
+
+## Load the latest snapshot
+
+1. When a RaftServer restarts and tries to recover the data, it first has to read and load the latest snapshot,
+   and then apply the logs not included in the snapshot. 
+   We have to implement snapshot loading in `StateMachine.initialize` lifecycle hook.
+
+    ```java
+    /*
+    * Initializes the State Machine with the given parameter.
+    * The state machine must, if there is any, read the latest snapshot.
+    */
+    void initialize(RaftServer raftServer, RaftGroupId raftGroupId, RaftStorage storage) throws IOException;
+    ```
+
+2. When a RaftServer newly joins an existing cluster, 
+   it has first to obtain the latest snapshot and install it locally. Before installing a snapshot,
+   `StateMachine.pause` hook is called to ensure that a new snapshot can be installed.
+   ```java
+   /* 
+   * Pauses the state machine. On return, the state machine should have closed all open files so
+   * that a new snapshot can be installed.
+   */
+   void pause();
+    ```
+   After installing the snapshot, `StateMachine.reinitialize` is called. 
+   We shall initialize the state machine with the latest installed snapshot in this lifecycle hook.
+   ```java
+   /**
+   * Re-initializes the State Machine in PAUSED state. The
+   * state machine is responsible reading the latest snapshot from the file system (if any) and
+   * initialize itself with the latest term and index there including all the edits.
+   */
+   void reinitialize() throws IOException;
+   ```
+   
+
+## Customize snapshot storage path
+
+By default, Ratis assumes `StateMachine` snapshot files be placed under 
+`RaftStorageDirectory.getStateMachineDir()`. When leader installs a snapshot to the follower,
+Ratis will keep the snapshot layout in follower side unchanged relative to this directory. 
+That is, the installed snapshot under follower's state machine directory
+will have the same hierarchy as in the leader side.
+
+`StateMachine` can also customize the snapshot storage and install directory. To do this, 
+we may provide the snapshot storage root directory in `StateMachineStorage`, 
+together with a temporary directory holding in-transmitting snapshot files.
+
+```java
+// StateMachineStorage
+/** @return the state machine directory. */
+default File getSnapshotDir() {
+  return null;
+}
+
+/** @return the temporary directory. */
+default File getTmpDir() {
+  return null;
+}
+```
+Examples of customizing snapshot storage path can be found at
+https://github.com/apache/ratis/blob/master/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+
+## Customize snapshot installation
+
+When a new follower joins the cluster, leader will install the latest snapshot to the follower.
+`StateMachine` only needs to provide the `SnapshotInfo` of the latest snapshot, and it's Ratis'
+responsibility to handle all the nuances of dividing snapshot into chunks / transferring chunks / 
+validating checksum. This default implementation works well in most scenarios. 
+
+However, there are scenarios where the default implementation cannot satisfy. To name a few:
+1. `StateMachine` has a flexible snapshot layout with files scattering around different directories.
+2. Follower wants to fully utilize underling topology. For example, follower may download the snapshot 
+from the nearest follower instead of the leader.
+3. Follower wants to download snapshot from different sources in parallel. 
+
+Ratis provides `InstallSnapshotNotification` to let `StateMachine` take over the full control
+of snapshot installation. To enable this feature, we may first turn off the leader-install-snapshot option
+to disable leader explicitly installing snapshot to follower:
+```java
+RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, false);
+```
+After this option is disabled, Whenever the leader detects that a follower needs snapshot, 
+instead of installing snapshot to that follower, 
+leader will only send a snapshot installation notification. 
+It's now the follower's responsibility to install the latest snapshot asynchronously.
+To do this, we have to implement `FollowerEventApi.notifyInstallSnapshotFromLeader`:
+
+```java
+interface FollowerEventApi {
+  /**
+   * Notify the {@link StateMachine} that the leader has purged entries from its log.
+   * In order to catch up, the {@link StateMachine} has to install the latest snapshot asynchronously.
+   *
+   * @param roleInfoProto information about the current node role and rpc delay information.
+   * @param firstTermIndexInLog The term-index of the first append entry available in the leader's log.
+   * @return return the last term-index in the snapshot after the snapshot installation.
+   */
+  default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
+      RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+    return CompletableFuture.completedFuture(null);
+  }
+}
+```
+Examples of `notifyInstallSnapshotFromLeader` implementation can be found at 
+https://github.com/apache/ratis/blob/master/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
\ No newline at end of file


[ratis] 07/16: RATIS-1766. Add descriptions to metrics entries (#804)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 272eeae2a253fc86d72fccc536aff0fcbbfc82b9
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Fri Jan 13 19:43:35 2023 +0800

    RATIS-1766. Add descriptions to metrics entries (#804)
    
    (cherry picked from commit c2181e5fab51254452db094693d5bddbda78d5ba)
---
 ratis-docs/src/site/markdown/metrics.md            | 145 +++++++++++++++++++++
 .../server/metrics/SegmentedRaftLogMetrics.java    |   4 +-
 2 files changed, 147 insertions(+), 2 deletions(-)

diff --git a/ratis-docs/src/site/markdown/metrics.md b/ratis-docs/src/site/markdown/metrics.md
new file mode 100644
index 000000000..10c78ccbb
--- /dev/null
+++ b/ratis-docs/src/site/markdown/metrics.md
@@ -0,0 +1,145 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+
+# Metrics
+
+## Ratis Server
+
+### StateMachine Metrics
+
+| Application | Component     | Name                | Type  | Description                                                  |
+|-------------|---------------|---------------------|-------|--------------------------------------------------------------|
+| ratis       | state_machine | appliedIndex        | Gauge | Applied index of state machine                               |
+| ratis       | state_machine | applyCompletedIndex | Gauge | Last log index which completely applied to the state machine |
+| ratis       | state_machine | takeSnapshot        | Timer | Time taken for state machine to take a snapshot              |
+
+
+### Leader Election Metrics
+
+| Application | Component       | Name                          | Type    | Description                                           |
+|-------------|-----------------|-------------------------------|---------|-------------------------------------------------------|
+| ratis       | leader_election | electionCount                 | Counter | Number of leader elections of this group              |
+| ratis       | leader_election | timeoutCount                  | Counter | Number of election timeouts of this peer              |
+| ratis       | leader_election | electionTime                  | Timer   | Time spent on leader election                         |
+| ratis       | leader_election | lastLeaderElapsedTime         | Gauge   | Time elapsed since last hearing from an active leader |
+| ratis       | leader_election | transferLeadershipCount       | Counter | Number of transferLeader requests                     |
+| ratis       | leader_election | lastLeaderElectionElapsedTime | Gauge   | Time elapsed since last leader election               |
+
+### Log Appender Metrics
+
+| Application | Component    | Name                              | Type  | Description                                 |
+|-------------|--------------|-----------------------------------|-------|---------------------------------------------|
+| ratis       | log_appender | follower_{peer}_next_index        | Gauge | Next index of peer                          |
+| ratis       | log_appender | follower_{peer}_match_index       | Gauge | Match index of peer                         |
+| ratis       | log_appender | follower_{peer}_rpc_response_time | Gauge | Time elapsed since peer's last rpc response |
+
+### Raft Log Metrics
+
+| Application | Component  | Name                            | Type    | Description                                                                                                   |
+|-------------|------------|---------------------------------|---------|---------------------------------------------------------------------------------------------------------------|
+| ratis       | log_worker | metadataLogEntryCount           | Counter | Number of metadata(term-index) log entries                                                                    |
+| ratis       | log_worker | configLogEntryCount             | Counter | Number of configuration log entries                                                                           |
+| ratis       | log_worker | stateMachineLogEntryCount       | Counter | Number of statemachine log entries                                                                            |
+| ratis       | log_worker | flushTime                       | Timer   | Time taken to flush log                                                                                       |
+| ratis       | log_worker | flushCount                      | Counter | Number of times of log-flush invoked                                                                          |
+| ratis       | log_worker | syncTime                        | Timer   | Time taken to log sync (fsync)                                                                                |
+| ratis       | log_worker | dataQueueSize                   | Gauge   | Raft log data queue size which at any time gives the number of log related operations in the queue            |
+| ratis       | log_worker | workerQueueSize                 | Gauge   | Raft log worker queue size which at any time gives number of committed entries that are to be synced          |
+| ratis       | log_worker | syncBatchSize                   | Gauge   | Number of raft log entries synced in each flush call                                                          |
+| ratis       | log_worker | cacheMissCount                  | Counter | Count of RaftLogCache Misses                                                                                  |
+| ratis       | log_worker | cacheHitCount                   | Counter | Count of RaftLogCache Hits                                                                                    |
+| ratis       | log_worker | closedSegmentsNum               | Gauge   | Number of closed raft log segments                                                                            |
+| ratis       | log_worker | closedSegmentsSizeInBytes       | Gauge   | Size of closed raft log segments in bytes                                                                     |
+| ratis       | log_worker | openSegmentSizeInBytes          | Gauge   | Size of open raft log segment in bytes                                                                        |
+| ratis       | log_worker | appendEntryLatency              | Timer   | Total time taken to append a raft log entry                                                                   |
+| ratis       | log_worker | enqueuedTime                    | Timer   | Time spent by a Raft log operation in the queue                                                               |
+| ratis       | log_worker | queueingDelay                   | Timer   | Time taken for a Raft log operation to get into the queue after being requested, waiting queue to be non-full |
+| ratis       | log_worker | {operation}ExecutionTime        | Timer   | Time taken for a Raft log operation(open/close/flush/write/purge) to complete execution                       |
+| ratis       | log_worker | appendEntryCount                | Counter | Number of entries appended to the raft log                                                                    |
+| ratis       | log_worker | purgeLog                        | Timer   | Time taken for Raft log purge operation to complete execution                                                 |
+| ratis       | log_worker | numStateMachineDataWriteTimeout | Counter | Number of statemachine dataApi write timeouts                                                                 |
+| ratis       | log_worker | numStateMachineDataReadTimeout  | Counter | Number of statemachine dataApi read timeouts                                                                  |
+| ratis       | log_worker | readEntryLatency                | Timer   | Time required to read a raft log entry from actual raft log file and create a raft log entry                  |
+| ratis       | log_worker | segmentLoadLatency              | Timer   | Time required to load and process raft log segments during restart                                            |
+
+
+### Raft Server Metrics
+
+| Application | Component | Name                             | Type    | Description                                                         |
+|-------------|-----------|----------------------------------|---------|---------------------------------------------------------------------|
+| ratis       | server    | {peer}_lastHeartbeatElapsedTime  | Gauge   | Time elapsed since last heartbeat rpc response                      |
+| ratis       | server    | follower_append_entry_latency    | Timer   | Time taken for followers to append log entries                      |
+| ratis       | server    | {peer}_peerCommitIndex           | Gauge   | Commit index of peer                                                |
+| ratis       | server    | clientReadRequest                | Timer   | Time taken to process read requests from client                     |
+| ratis       | server    | clientStaleReadRequest           | Timer   | Time taken to process stale-read requests from client               |
+| ratis       | server    | clientWriteRequest               | Timer   | Time taken to process write requests from client                    |
+| ratis       | server    | clientWatch{level}Request        | Timer   | Time taken to process watch(replication_level) requests from client |
+| ratis       | server    | numRequestQueueLimitHits         | Counter | Number of (total client requests in queue) limit hits               |
+| ratis       | server    | numRequestsByteSizeLimitHits     | Counter | Number of (total size of client requests in queue) limit hits       |
+| ratis       | server    | numResourceLimitHits             | Counter | Sum of numRequestQueueLimitHits and numRequestsByteSizeLimitHits    |
+| ratis       | server    | numPendingRequestInQueue         | Gauge   | Number of pending client requests in queue                          |
+| ratis       | server    | numPendingRequestMegaByteSize    | Gauge   | Total size of pending client requests in queue                      |
+| ratis       | server    | retryCacheEntryCount             | Gauge   | Number of entries in retry cache                                    |
+| ratis       | server    | retryCacheHitCount               | Gauge   | Number of retry cache hits                                          |
+| ratis       | server    | retryCacheHitRate                | Gauge   | Retry cache hit rate                                                |
+| ratis       | server    | retryCacheMissCount              | Gauge   | Number of retry cache misses                                        |
+| ratis       | server    | retryCacheMissRate               | Gauge   | Retry cache miss rate                                               |
+| ratis       | server    | numFailedClientStaleReadOnServer | Counter | Number of failed stale-read requests                                |
+| ratis       | server    | numFailedClientReadOnServer      | Counter | Number of failed read requests                                      |
+| ratis       | server    | numFailedClientWriteOnServer     | Counter | Number of failed write requests                                     |
+| ratis       | server    | numFailedClientWatchOnServer     | Counter | Number of failed watch requests                                     |
+| ratis       | server    | numFailedClientStreamOnServer    | Counter | Number of failed stream requests                                    |
+| ratis       | server    | numInstallSnapshot               | Counter | Number of install-snapshot requests                                 |
+
+
+## Ratis Netty Metrics
+
+| Application | Component     | Name                          | Type    | Description                               |
+|-------------|---------------|-------------------------------|---------|-------------------------------------------|
+| ratis_netty | stream_server | {request}_latency             | timer   | Time taken to process data stream request |
+| ratis_netty | stream_server | {request}_success_reply_count | Counter | Number of success replies of request      |
+| ratis_netty | stream_server | {request}_fail_reply_count    | Counter | Number of fail replies of request         |
+| ratis_netty | stream_server | num_requests_{request}        | Counter | Number of total data stream requests      |
+
+## Ratis gRPC Metrics
+
+### Message Metrics
+
+| Application | Component              | Name                       | Type    | Description                                      |
+|-------------|------------------------|----------------------------|---------|--------------------------------------------------|
+| ratis       | client_message_metrics | {method}_started_total     | Counter | total messages started of {method}               |
+| ratis       | client_message_metrics | {method}_completed_total   | Counter | total messages completed of {method}             |
+| ratis       | client_message_metrics | {method}_received_executed | Counter | total messages received and executed of {method} |
+| ratis       | server_message_metrics | {method}_started_total     | Counter | total messages started of {method}               |
+| ratis       | server_message_metrics | {method}_completed_total   | Counter | total messages completed of {method}             |
+| ratis       | server_message_metrics | {method}_received_executed | Counter | total messages received and executed of {method} |
+
+### gRPC Log Appender Metrics
+
+
+| Application | Component    | Name                                  | Type    | Description                                 |
+|-------------|--------------|---------------------------------------|---------|---------------------------------------------|
+| ratis_grpc  | log_appender | {appendEntries}_latency               | Timer   | Latency of method (appendEntries/heartbeat) |
+| ratis_grpc  | log_appender | {follower}_success_reply_count        | Counter | Number of success replies                   |
+| ratis_grpc  | log_appender | {follower}_not_leader_reply_count     | Counter | Number of NotLeader replies                 |
+| ratis_grpc  | log_appender | {follower}_inconsistency_reply_count  | Counter | Number of Inconsistency replies             |
+| ratis_grpc  | log_appender | {follower}_append_entry_timeout_count | Counter | Number of appendEntries timeouts            |
+| ratis_grpc  | log_appender | {follower}_pending_log_requests_count | Counter | Number of pending requests                  |
+| ratis_grpc  | log_appender | num_retries                           | Counter | Number of request retries                   |
+| ratis_grpc  | log_appender | num_requests                          | Counter | Number of requests in total                 |
+| ratis_grpc  | log_appender | num_install_snapshot                  | Counter | Number of install snapshot requests         |
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
index f359e1a0b..810fcb003 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
@@ -67,9 +67,9 @@ public class SegmentedRaftLogMetrics extends RaftLogMetricsBase {
   /** Number of entries appended to the raft log */
   public static final String RAFT_LOG_APPEND_ENTRY_COUNT = "appendEntryCount";
   public static final String RAFT_LOG_PURGE_METRIC = "purgeLog";
-  /** Time taken for a Raft log operation to complete write state machine data. */
+  /** Number of statemachine dataApi write timeouts */
   public static final String RAFT_LOG_STATEMACHINE_DATA_WRITE_TIMEOUT_COUNT = "numStateMachineDataWriteTimeout";
-  /** Time taken for a Raft log operation to complete read state machine data. */
+  /** Number of statemachine dataApi read timeouts */
   public static final String RAFT_LOG_STATEMACHINE_DATA_READ_TIMEOUT_COUNT = "numStateMachineDataReadTimeout";
 
   //////////////////////////////


[ratis] 10/16: RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 4a0496b188cd96f05fbab10a4327714bb8408d17
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Feb 16 16:34:18 2023 -0800

    RATIS-1783. MAX_OP_SIZE is not configurable on raft log read. (#823)
    
    (cherry picked from commit c4f54dfe88d1b574a03688f16a592cf0d59d13a9)
---
 .../ratis/server/raftlog/segmented/LogSegment.java | 47 +++++++++++-----------
 .../raftlog/segmented/SegmentedRaftLogCache.java   |  7 +++-
 .../segmented/SegmentedRaftLogInputStream.java     | 15 ++++---
 .../raftlog/segmented/SegmentedRaftLogReader.java  | 27 +++++++------
 .../segmented/SegmentedRaftLogTestUtils.java       | 10 +++++
 .../statemachine/SimpleStateMachine4Testing.java   |  3 +-
 .../raftlog/segmented/TestCacheEviction.java       |  4 +-
 .../server/raftlog/segmented/TestLogSegment.java   | 20 ++++-----
 .../raftlog/segmented/TestRaftLogReadWrite.java    |  7 ++--
 .../segmented/TestSegmentedRaftLogCache.java       |  3 +-
 .../java/org/apache/ratis/tools/ParseRatisLog.java | 14 +++++--
 11 files changed, 94 insertions(+), 63 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index c3c4d6e53..b8e0e72ff 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -31,6 +31,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,35 +103,31 @@ public final class LogSegment implements Comparable<Long> {
     }
   }
 
-  static LogSegment newOpenSegment(RaftStorage storage, long start, SegmentedRaftLogMetrics raftLogMetrics) {
+  static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
+      SegmentedRaftLogMetrics raftLogMetrics) {
     Preconditions.assertTrue(start >= 0);
-    return new LogSegment(storage, true, start, start - 1, raftLogMetrics);
+    return new LogSegment(storage, true, start, start - 1, maxOpSize, raftLogMetrics);
   }
 
   @VisibleForTesting
   static LogSegment newCloseSegment(RaftStorage storage,
-      long start, long end, SegmentedRaftLogMetrics raftLogMetrics) {
+      long start, long end, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
     Preconditions.assertTrue(start >= 0 && end >= start);
-    return new LogSegment(storage, false, start, end, raftLogMetrics);
-  }
-
-  static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SegmentedRaftLogMetrics metrics) {
-    return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), metrics)
-        : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), metrics);
+    return new LogSegment(storage, false, start, end, maxOpSize, raftLogMetrics);
   }
 
-  public static int readSegmentFile(File file, LogSegmentStartEnd startEnd,
-      CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
-      throws IOException {
-    return readSegmentFile(file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(),
-        corruptionPolicy, raftLogMetrics, entryConsumer);
+  static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
+      SegmentedRaftLogMetrics metrics) {
+    return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), maxOpSize, metrics)
+        : newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), maxOpSize, metrics);
   }
 
-  private static int readSegmentFile(File file, long start, long end, boolean isOpen,
+  public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
       CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
       throws IOException {
     int count = 0;
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen, raftLogMetrics)) {
+    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
+        file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(), maxOpSize, raftLogMetrics)) {
       for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
         if (prev != null) {
           Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
@@ -146,8 +143,8 @@ public final class LogSegment implements Comparable<Long> {
       switch (corruptionPolicy) {
         case EXCEPTION: throw ioe;
         case WARN_AND_RETURN:
-          LOG.warn("Failed to read segment file {} (start={}, end={}, isOpen? {}): only {} entries read successfully",
-              file, start, end, isOpen, count, ioe);
+          LOG.warn("Failed to read segment file {} ({}): only {} entries read successfully",
+              file, startEnd, count, ioe);
           break;
         default:
           throw new IllegalStateException("Unexpected enum value: " + corruptionPolicy
@@ -158,13 +155,13 @@ public final class LogSegment implements Comparable<Long> {
     return count;
   }
 
-  static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd,
+  static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
       boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer, SegmentedRaftLogMetrics raftLogMetrics)
       throws IOException {
-    final LogSegment segment = newLogSegment(storage, startEnd, raftLogMetrics);
+    final LogSegment segment = newLogSegment(storage, startEnd, maxOpSize, raftLogMetrics);
     final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
     final boolean isOpen = startEnd.isOpen();
-    final int entryCount = readSegmentFile(file, startEnd, corruptionPolicy, raftLogMetrics, entry -> {
+    final int entryCount = readSegmentFile(file, startEnd, maxOpSize, corruptionPolicy, raftLogMetrics, entry -> {
       segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
       if (logConsumer != null) {
         logConsumer.accept(entry);
@@ -235,7 +232,9 @@ public final class LogSegment implements Comparable<Long> {
       // note the loading should not exceed the endIndex: it is possible that
       // the on-disk log file should be truncated but has not been done yet.
       final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
-      readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics, entry -> {
+      final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
+      readSegmentFile(file, startEnd, maxOpSize,
+          getLogCorruptionPolicy(), raftLogMetrics, entry -> {
         final TermIndex ti = TermIndex.valueOf(entry);
         putEntryCache(ti, entry, Op.LOAD_SEGMENT_FILE);
         if (ti.equals(key.getTermIndex())) {
@@ -259,6 +258,7 @@ public final class LogSegment implements Comparable<Long> {
   /** Segment end index, inclusive. */
   private volatile long endIndex;
   private RaftStorage storage;
+  private final SizeInBytes maxOpSize;
   private final LogEntryLoader cacheLoader;
   /** later replace it with a metric */
   private final AtomicInteger loadingTimes = new AtomicInteger();
@@ -272,12 +272,13 @@ public final class LogSegment implements Comparable<Long> {
    */
   private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
 
-  private LogSegment(RaftStorage storage, boolean isOpen, long start, long end,
+  private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, SizeInBytes maxOpSize,
       SegmentedRaftLogMetrics raftLogMetrics) {
     this.storage = storage;
     this.isOpen = isOpen;
     this.startIndex = start;
     this.endIndex = end;
+    this.maxOpSize = maxOpSize;
     this.cacheLoader = new LogEntryLoader(raftLogMetrics);
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index b6f932d6a..e42f451d0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -32,6 +32,7 @@ import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.AutoCloseableReadWriteLock;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -350,6 +351,7 @@ public class SegmentedRaftLogCache {
   private volatile LogSegment openSegment;
   private final LogSegmentList closedSegments;
   private final RaftStorage storage;
+  private final SizeInBytes maxOpSize;
   private final SegmentedRaftLogMetrics raftLogMetrics;
 
   private final int maxCachedSegments;
@@ -367,6 +369,7 @@ public class SegmentedRaftLogCache {
     this.raftLogMetrics.addOpenSegmentSizeInBytes(this);
     this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
     this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize();
+    this.maxOpSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
   }
 
   int getMaxCachedSegments() {
@@ -376,7 +379,7 @@ public class SegmentedRaftLogCache {
   void loadSegment(LogSegmentPath pi, boolean keepEntryInCache,
       Consumer<LogEntryProto> logConsumer) throws IOException {
     final LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(), pi.getStartEnd(),
-        keepEntryInCache, logConsumer, raftLogMetrics);
+        maxOpSize, keepEntryInCache, logConsumer, raftLogMetrics);
     if (logSegment != null) {
       addSegment(logSegment);
     }
@@ -434,7 +437,7 @@ public class SegmentedRaftLogCache {
   }
 
   void addOpenSegment(long startIndex) {
-    setOpenSegment(LogSegment.newOpenSegment(storage, startIndex,raftLogMetrics));
+    setOpenSegment(LogSegment.newOpenSegment(storage, startIndex, maxOpSize, raftLogMetrics));
   }
 
   private void setOpenSegment(LogSegment openSegment) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
index e445b1abb..481f837f5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.OpenCloseState;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,14 +67,12 @@ public class SegmentedRaftLogInputStream implements Closeable {
   private final boolean isOpen;
   private final OpenCloseState state;
   private SegmentedRaftLogReader reader;
+  private final SizeInBytes maxOpSize;
   private final SegmentedRaftLogMetrics raftLogMetrics;
 
-  public SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen) {
-    this(log, startIndex, endIndex, isOpen, null);
-  }
-
   SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen,
-      SegmentedRaftLogMetrics raftLogMetrics) {
+      SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
+    this.maxOpSize = maxOpSize;
     if (isOpen) {
       Preconditions.assertTrue(endIndex == INVALID_LOG_INDEX);
     } else {
@@ -92,7 +91,7 @@ public class SegmentedRaftLogInputStream implements Closeable {
     state.open();
     boolean initSuccess = false;
     try {
-      reader = new SegmentedRaftLogReader(logFile, raftLogMetrics);
+      reader = new SegmentedRaftLogReader(logFile, maxOpSize, raftLogMetrics);
       initSuccess = reader.verifyHeader();
     } finally {
       if (!initSuccess) {
@@ -191,11 +190,11 @@ public class SegmentedRaftLogInputStream implements Closeable {
    * @return Result of the validation
    * @throws IOException
    */
-  static LogValidation scanEditLog(File file, long maxTxIdToScan)
+  static LogValidation scanEditLog(File file, long maxTxIdToScan, SizeInBytes maxOpSize)
       throws IOException {
     SegmentedRaftLogInputStream in;
     try {
-      in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, null);
+      in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, maxOpSize, null);
       // read the header, initialize the inputstream
       in.init();
     } catch (EOFException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index dc67d31c4..f1179de84 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -27,6 +27,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.PureJavaCrc32C;
+import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,21 +134,22 @@ class SegmentedRaftLogReader implements Closeable {
     }
   }
 
-  private static final int MAX_OP_SIZE = 32 * 1024 * 1024;
-
   private final File file;
   private final LimitedInputStream limiter;
   private final DataInputStream in;
   private byte[] temp = new byte[4096];
   private final Checksum checksum;
   private final SegmentedRaftLogMetrics raftLogMetrics;
+  private final SizeInBytes maxOpSize;
 
-  SegmentedRaftLogReader(File file, SegmentedRaftLogMetrics raftLogMetrics) throws FileNotFoundException {
+  SegmentedRaftLogReader(File file, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics)
+      throws FileNotFoundException {
     this.file = file;
     this.limiter = new LimitedInputStream(
         new BufferedInputStream(new FileInputStream(file)));
     in = new DataInputStream(limiter);
     checksum = new PureJavaCrc32C();
+    this.maxOpSize = maxOpSize;
     this.raftLogMetrics = raftLogMetrics;
   }
 
@@ -273,8 +275,9 @@ class SegmentedRaftLogReader implements Closeable {
    * @return The log entry, or null if we hit EOF.
    */
   private LogEntryProto decodeEntry() throws IOException {
-    limiter.setLimit(MAX_OP_SIZE);
-    in.mark(MAX_OP_SIZE);
+    final int max = maxOpSize.getSizeInt();
+    limiter.setLimit(max);
+    in.mark(max);
 
     byte nextByte;
     try {
@@ -294,17 +297,17 @@ class SegmentedRaftLogReader implements Closeable {
     // Here, we verify that the Op size makes sense and that the
     // data matches its checksum before attempting to construct an Op.
     int entryLength = CodedInputStream.readRawVarint32(nextByte, in);
-    if (entryLength > MAX_OP_SIZE) {
+    if (entryLength > max) {
       throw new IOException("Entry has size " + entryLength
-          + ", but MAX_OP_SIZE = " + MAX_OP_SIZE);
+          + ", but MAX_OP_SIZE = " + maxOpSize);
     }
 
     final int varintLength = CodedOutputStream.computeUInt32SizeNoTag(
         entryLength);
     final int totalLength = varintLength + entryLength;
-    checkBufferSize(totalLength);
+    checkBufferSize(totalLength, max);
     in.reset();
-    in.mark(MAX_OP_SIZE);
+    in.mark(max);
     IOUtils.readFully(in, temp, 0, totalLength);
 
     // verify checksum
@@ -323,12 +326,12 @@ class SegmentedRaftLogReader implements Closeable {
         CodedInputStream.newInstance(temp, varintLength, entryLength));
   }
 
-  private void checkBufferSize(int entryLength) {
-    Preconditions.assertTrue(entryLength <= MAX_OP_SIZE);
+  private void checkBufferSize(int entryLength, int max) {
+    Preconditions.assertTrue(entryLength <= max);
     int length = temp.length;
     if (length < entryLength) {
       while (length < entryLength) {
-        length = Math.min(length * 2, MAX_OP_SIZE);
+        length = Math.min(length * 2, max);
       }
       temp = new byte[length];
     }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
index 04527e728..5dfa4de10 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
@@ -19,8 +19,18 @@ package org.apache.ratis.server.raftlog.segmented;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.SizeInBytes;
+
+import java.io.File;
 
 public interface SegmentedRaftLogTestUtils {
+  SizeInBytes MAX_OP_SIZE = SizeInBytes.valueOf("32MB");
+
+  static SegmentedRaftLogInputStream newSegmentedRaftLogInputStream(File log,
+      long startIndex, long endIndex, boolean isOpen) {
+    return new SegmentedRaftLogInputStream(log, startIndex, endIndex, isOpen, MAX_OP_SIZE, null);
+  }
+
   static void setRaftLogWorkerLogLevel(Level level) {
     Log4jUtils.setLogLevel(SegmentedRaftLogWorker.LOG, level);
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index cf715585e..122b66e58 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -40,6 +40,7 @@ import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
@@ -309,7 +310,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     } else {
       LOG.info("Loading snapshot {}", snapshot);
       final long endIndex = snapshot.getIndex();
-      try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
+      try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
           snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
         LogEntryProto entry;
         while ((entry = in.nextEntry()) != null) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 87dd2ef37..d2ff12e75 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -49,6 +49,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
+
 public class TestCacheEviction extends BaseTest {
   private static final CacheInvalidationPolicy policy = new CacheInvalidationPolicyDefault();
 
@@ -56,7 +58,7 @@ public class TestCacheEviction extends BaseTest {
     Assert.assertEquals(numSegments, cached.length);
     final LogSegmentList segments = new LogSegmentList(JavaUtils.getClassSimpleName(TestCacheEviction.class));
     for (int i = 0; i < numSegments; i++) {
-      LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, null);
+      LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, MAX_OP_SIZE, null);
       if (cached[i]) {
         s = Mockito.spy(s);
         Mockito.when(s.hasCache()).thenReturn(true);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 6e0af2dab..ee284ae2b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
 import static org.apache.ratis.server.raftlog.segmented.LogSegment.getEntrySize;
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
 
 import com.codahale.metrics.Timer;
 
@@ -170,7 +171,7 @@ public class TestLogSegment extends BaseTest {
     final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten);
     RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
     final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile,
-        LogSegmentStartEnd.valueOf(0), loadInitial, null, null);
+        LogSegmentStartEnd.valueOf(0), MAX_OP_SIZE, loadInitial, null, null);
     final int delta = isLastEntryPartiallyWritten? 1: 0;
     checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0);
     storage.close();
@@ -180,7 +181,7 @@ public class TestLogSegment extends BaseTest {
     // load a closed segment (1000-1099)
     final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false);
     LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
-        LogSegmentStartEnd.valueOf(1000, 1099L), loadInitial, null, null);
+        LogSegmentStartEnd.valueOf(1000, 1099L), MAX_OP_SIZE, loadInitial, null, null);
     checkLogSegment(closedSegment, 1000, 1099, false,
         closedSegment.getTotalFileSize(), 1);
     Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
@@ -189,7 +190,7 @@ public class TestLogSegment extends BaseTest {
   @Test
   public void testAppendEntries() throws Exception {
     final long start = 1000;
-    LogSegment segment = LogSegment.newOpenSegment(null, start, null);
+    LogSegment segment = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
     long size = SegmentedRaftLogFormat.getHeaderLength();
     final long max = 8 * 1024 * 1024;
     checkLogSegment(segment, start, start - 1, true, size, 0);
@@ -215,7 +216,7 @@ public class TestLogSegment extends BaseTest {
     final File openSegmentFile = prepareLog(true, 0, 100, 0, true);
     RaftStorage storage = RaftStorageTestUtils.newRaftStorage(storageDir);
     final LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile,
-        LogSegmentStartEnd.valueOf(0), true, null, raftLogMetrics);
+        LogSegmentStartEnd.valueOf(0), MAX_OP_SIZE, true, null, raftLogMetrics);
     checkLogSegment(openSegment, 0, 98, true, openSegmentFile.length(), 0);
     storage.close();
 
@@ -228,7 +229,7 @@ public class TestLogSegment extends BaseTest {
 
   @Test
   public void testAppendWithGap() throws Exception {
-    LogSegment segment = LogSegment.newOpenSegment(null, 1000, null);
+    LogSegment segment = LogSegment.newOpenSegment(null, 1000, MAX_OP_SIZE, null);
     SimpleOperation op = new SimpleOperation("m");
     final StateMachineLogEntryProto m = op.getLogEntryContent();
     try {
@@ -255,7 +256,7 @@ public class TestLogSegment extends BaseTest {
   public void testTruncate() throws Exception {
     final long term = 1;
     final long start = 1000;
-    LogSegment segment = LogSegment.newOpenSegment(null, start, null);
+    LogSegment segment = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
     for (int i = 0; i < 100; i++) {
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(
           new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
@@ -298,7 +299,8 @@ public class TestLogSegment extends BaseTest {
                 new SegmentedRaftLogOutputStream(file, false, max, a, ByteBuffer.allocateDirect(bufferSize))) {
           Assert.assertEquals("max=" + max + ", a=" + a, file.length(), Math.min(max, a));
         }
-        try(SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
+        try(SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+            file, 0, INVALID_LOG_INDEX, true)) {
           LogEntryProto entry = in.nextEntry();
           Assert.assertNull(entry);
         }
@@ -318,8 +320,8 @@ public class TestLogSegment extends BaseTest {
     }
     Assert.assertEquals(file.length(),
         size + SegmentedRaftLogFormat.getHeaderLength());
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, 0,
-        INVALID_LOG_INDEX, true)) {
+    try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+        file, 0, INVALID_LOG_INDEX, true)) {
       LogEntryProto entry = in.nextEntry();
       Assert.assertArrayEquals(content,
           entry.getStateMachineLogEntry().getLogData().toByteArray());
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
index 88b5e2f48..e79f9f7f9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestRaftLogReadWrite.java
@@ -75,7 +75,8 @@ public class TestRaftLogReadWrite extends BaseTest {
   private LogEntryProto[] readLog(File file, long startIndex, long endIndex,
       boolean isOpen) throws IOException {
     List<LogEntryProto> list = new ArrayList<>();
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, startIndex, endIndex, isOpen)) {
+    try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+        file, startIndex, endIndex, isOpen)) {
       LogEntryProto entry;
       while ((entry = in.nextEntry()) != null) {
         list.add(entry);
@@ -207,8 +208,8 @@ public class TestRaftLogReadWrite extends BaseTest {
     }
 
     List<LogEntryProto> list = new ArrayList<>();
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(openSegment, 0,
-        RaftLog.INVALID_LOG_INDEX, true)) {
+    try (SegmentedRaftLogInputStream in = SegmentedRaftLogTestUtils.newSegmentedRaftLogInputStream(
+        openSegment, 0, RaftLog.INVALID_LOG_INDEX, true)) {
       LogEntryProto entry;
       while ((entry = in.nextEntry()) != null) {
         list.add(entry);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 976e9d6e4..7c5229e5d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.raftlog.segmented;
 
 import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
+import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -59,7 +60,7 @@ public class TestSegmentedRaftLogCache {
   }
 
   private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
-    LogSegment s = LogSegment.newOpenSegment(null, start, null);
+    LogSegment s = LogSegment.newOpenSegment(null, start, MAX_OP_SIZE, null);
     for (long i = start; i <= end; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
diff --git a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
index 250a4790a..564ce0bf0 100644
--- a/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
+++ b/ratis-tools/src/main/java/org/apache/ratis/tools/ParseRatisLog.java
@@ -24,6 +24,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
 import org.apache.ratis.server.raftlog.segmented.LogSegment;
+import org.apache.ratis.util.SizeInBytes;
 
 import java.io.File;
 import java.io.IOException;
@@ -33,15 +34,17 @@ public final class ParseRatisLog {
 
   private final File file;
   private final Function<StateMachineLogEntryProto, String> smLogToString;
+  private final SizeInBytes maxOpSize;
 
   private long numConfEntries;
   private long numMetadataEntries;
   private long numStateMachineEntries;
   private long numInvalidEntries;
 
-  private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString) {
+  private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString, SizeInBytes maxOpSize) {
     this.file = f;
     this.smLogToString = smLogToString;
+    this.maxOpSize = maxOpSize;
     this.numConfEntries = 0;
     this.numMetadataEntries = 0;
     this.numStateMachineEntries = 0;
@@ -56,7 +59,7 @@ public final class ParseRatisLog {
     }
 
     System.out.println("Processing Raft Log file: " + file.getAbsolutePath() + " size:" + file.length());
-    final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(),
+    final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(), maxOpSize,
         RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, this::processLogEntry);
     System.out.println("Num Total Entries: " + entryCount);
     System.out.println("Num Conf Entries: " + numConfEntries);
@@ -85,7 +88,12 @@ public final class ParseRatisLog {
   public static class Builder {
     private File file = null;
     private Function<StateMachineLogEntryProto, String> smLogToString = null;
+    private SizeInBytes maxOpSize = SizeInBytes.valueOf("32MB");
 
+    public Builder setMaxOpSize(SizeInBytes maxOpSize) {
+      this.maxOpSize = maxOpSize;
+      return this;
+    }
 
     public Builder setSegmentFile(File segmentFile) {
       this.file = segmentFile;
@@ -98,7 +106,7 @@ public final class ParseRatisLog {
     }
 
     public ParseRatisLog build() {
-      return new ParseRatisLog(file, smLogToString);
+      return new ParseRatisLog(file, smLogToString, maxOpSize);
     }
   }
 }


[ratis] 03/16: RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit e01c8e579e0cac8b918de7348afc409bf1e218c8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sat Dec 24 16:41:50 2022 +0800

    RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799)
    
    (cherry picked from commit 77a9949f98d6c80a8c1466887763d44fb64c9ccc)
---
 .../java/org/apache/ratis/util/OpenCloseState.java |  4 +--
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 34 +++++++++++++++++-----
 .../ratis/server/leader/LogAppenderBase.java       |  8 +++++
 .../ratis/server/leader/LogAppenderDaemon.java     |  6 ++--
 4 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
index 7847c21cd..b79b49b27 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -60,7 +60,7 @@ public class OpenCloseState {
     final Throwable t = state.get();
     if (!(t instanceof OpenTrace)) {
       final String s = name + " is expected to be opened but it is " + toString(t);
-      throw new IllegalArgumentException(s, t);
+      throw new IllegalStateException(s, t);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index b55389343..43349354c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -74,6 +74,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import java.util.function.Predicate;
@@ -160,7 +161,7 @@ class LeaderStateImpl implements LeaderState {
       } catch (InterruptedException ie) {
         Thread.currentThread().interrupt();
         String s = this + ": poll() is interrupted";
-        if (!running) {
+        if (isStopped.get()) {
           LOG.info(s + " gracefully");
           return null;
         } else {
@@ -314,7 +315,7 @@ class LeaderStateImpl implements LeaderState {
   private final PendingRequests pendingRequests;
   private final WatchRequests watchRequests;
   private final MessageStreamRequests messageStreamRequests;
-  private volatile boolean running = true;
+  private final AtomicBoolean isStopped = new AtomicBoolean();
 
   private final int stagingCatchupGap;
   private final long placeHolderIndex;
@@ -386,7 +387,10 @@ class LeaderStateImpl implements LeaderState {
   }
 
   void stop() {
-    this.running = false;
+    if (!isStopped.compareAndSet(false, true)) {
+      LOG.info("{} is already stopped", this);
+      return;
+    }
     // do not interrupt event processor since it may be in the middle of logSync
     senders.forEach(LogAppender::stop);
     final NotLeaderException nle = server.generateNotLeaderException();
@@ -431,7 +435,8 @@ class LeaderStateImpl implements LeaderState {
    */
   PendingRequest startSetConfiguration(SetConfigurationRequest request, List<RaftPeer> peersInNewConf) {
     LOG.info("{}: startSetConfiguration {}", this, request);
-    Preconditions.assertTrue(running && !inStagingState());
+    Preconditions.assertTrue(isRunning(), () -> this + " is not running.");
+    Preconditions.assertTrue(!inStagingState(), () -> this + " is already in staging state " + stagingState);
 
     final List<RaftPeer> listenersInNewConf = request.getArguments().getPeersInNewConf(RaftPeerRole.LISTENER);
     final Collection<RaftPeer> peersToBootStrap = server.getRaftConf().filterNotContainedInConf(peersInNewConf);
@@ -589,8 +594,21 @@ class LeaderStateImpl implements LeaderState {
     senders.removeAll(toStop);
   }
 
+  boolean isRunning() {
+    if (isStopped.get()) {
+      return false;
+    }
+    final LeaderStateImpl current = server.getRole().getLeaderState().orElse(null);
+    return this == current;
+  }
+
   @Override
   public void restart(LogAppender sender) {
+    if (!isRunning()) {
+      LOG.warn("Failed to restart {}: {} is not running", sender, this);
+      return;
+    }
+
     final FollowerInfo info = sender.getFollower();
     LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName());
     sender.stop();
@@ -629,7 +647,7 @@ class LeaderStateImpl implements LeaderState {
       LOG.warn(s, e);
       // the failure should happen while changing the state to follower
       // thus the in-memory state should have been updated
-      if (running) {
+      if (!isStopped.get()) {
         throw new IllegalStateException(s + " and running == true", e);
       }
     }
@@ -665,7 +683,7 @@ class LeaderStateImpl implements LeaderState {
 
   private void prepare() {
     synchronized (server) {
-      if (running) {
+      if (isRunning()) {
         final ServerState state = server.getState();
         if (state.getRaftConf().isTransitional() && state.isConfCommitted()) {
           // the configuration is in transitional state, and has been committed
@@ -690,10 +708,10 @@ class LeaderStateImpl implements LeaderState {
       // apply an empty message; check if necessary to replicate (new) conf
       prepare();
 
-      while (running) {
+      while (isRunning()) {
         final StateUpdateEvent event = eventQueue.poll();
         synchronized(server) {
-          if (running) {
+          if (isRunning()) {
             if (event != null) {
               event.execute();
             } else if (inStagingState()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index fda78fbcf..bc8a31181 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -125,6 +125,14 @@ public abstract class LogAppenderBase implements LogAppender {
     daemon.tryToClose();
   }
 
+  void restart() {
+    if (!server.getInfo().isAlive()) {
+      LOG.warn("Failed to restart {}: server {} is not alive", this, server.getMemberId());
+      return;
+    }
+    getLeaderState().restart(this);
+  }
+
   @Override
   public final FollowerInfo getFollower() {
     return follower;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index d985a6ae8..6ca237ecf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -42,9 +42,9 @@ class LogAppenderDaemon {
   private final LifeCycle lifeCycle;
   private final Daemon daemon;
 
-  private final LogAppender logAppender;
+  private final LogAppenderBase logAppender;
 
-  LogAppenderDaemon(LogAppender logAppender) {
+  LogAppenderDaemon(LogAppenderBase logAppender) {
     this.logAppender = logAppender;
     this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
     this.lifeCycle = new LifeCycle(name);
@@ -88,7 +88,7 @@ class LogAppenderDaemon {
       lifeCycle.transitionIfValid(EXCEPTION);
     } finally {
       if (lifeCycle.transitionAndGet(TRANSITION_FINALLY) == EXCEPTION) {
-        logAppender.getLeaderState().restart(logAppender);
+        logAppender.restart();
       }
     }
   }


[ratis] 04/16: RATIS-1763. Purging logs in an ordered manner. (#801)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit e40a5f8aa35a0438c9d21c2139d85a29556c9b40
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Jan 3 13:54:18 2023 +0800

    RATIS-1763. Purging logs in an ordered manner. (#801)
---
 .../apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index c9ca6e440..b6f932d6a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -318,8 +318,8 @@ public class SegmentedRaftLogCache {
           // to purge that.
           int startIndex = (overlappedSegment.getEndIndex() == index) ?
               segmentIndex : segmentIndex - 1;
-          for (int i = startIndex; i >= 0; i--) {
-            LogSegment segment = segments.remove(i);
+          for (int i = 0; i <= startIndex; i++) {
+            LogSegment segment = segments.remove(0); // must remove the first segment to avoid gaps.
             sizeInBytes -= segment.getTotalFileSize();
             list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
           }