You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/16 13:18:29 UTC

[ratis] branch branch-2 updated (f5d15922a -> 3bfaa496e)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a change to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git


    from f5d15922a RATIS-1637. Improve the log message with fallback value in conf. (#695)
     new ce5357a2e RATIS-1640. Add unit-test of listener related to setConfiguration and takeSnapshot (#697)
     new 8d44e8893 RATIS-1639. Added getting started document (#696)
     new d5e3d413b RATIS-1655. Fix OrderedStreamAsync#scheduleWithTimeout to report a correct timeout value (#701)
     new 6cd8ef3d9 RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)
     new 0c2ecd615 RATIS-1603. TimeoutScheduler can have a huge amount of threads and cause OOM. (#666)
     new 2afee9379 RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)
     new 396d69654 RATIS-1661. Support configurable hostname in GrpcService (#707)
     new 07f04cbd2 RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)
     new d80d95520 RATIS-1665. RaftLog avoid converting list (#708)
     new 8a9963c8f RATIS-1660. Fix wrong ratis package (#705)
     new bb0f0021f RATIS-1663. Record call id for board casting a heartbeat. (#706)
     new 3bfaa496e RATIS-1669. Combine shell lib folder and root jars folder (#711)

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |   1 +
 pom.xml                                            |   7 +
 ratis-assembly/pom.xml                             |  41 +-
 .../src/main/assembly/bin-pkg.xml                  |  31 +-
 ratis-assembly/src/main/assembly/bin.xml           |  33 +-
 ratis-assembly/src/main/assembly/examples-bin.xml  |  16 +-
 ratis-assembly/src/main/assembly/shell-bin.xml     |  15 +-
 .../ratis/client/impl/OrderedStreamAsync.java      |   6 +-
 .../apache/ratis/client/impl/RaftClientImpl.java   |   7 +-
 .../src/main/java/org/apache/ratis/rpc/CallId.java |  12 +
 .../org/apache/ratis/util/CollectionUtils.java     |  14 +-
 .../java/org/apache/ratis/util/SlidingWindow.java  |  58 ++-
 .../org/apache/ratis/util/TimeoutExecutor.java     |  50 ++
 .../org/apache/ratis/util/TimeoutScheduler.java    |  22 +-
 .../java/org/apache/ratis/util/TimeoutTimer.java   | 109 +++++
 ratis-docs/src/site/markdown/index.md              |  42 +-
 ratis-docs/src/site/markdown/start/index.md        | 509 ++++++++++++++++++++-
 ratis-examples/pom.xml                             |   5 +-
 .../apache/ratis/examples/common/Constants.java    |   5 +-
 .../ratis/examples/counter/CounterCommand.java     |  26 +-
 .../examples/counter/client/CounterClient.java     | 118 +++--
 .../examples/counter/server/CounterServer.java     |  59 ++-
 .../counter/server/CounterStateMachine.java        | 205 +++++----
 .../apache/ratis/examples/counter/TestCounter.java |  10 +-
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java |  34 ++
 .../grpc/client/GrpcClientProtocolClient.java      |   5 +-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  30 +-
 .../org/apache/ratis/grpc/server/GrpcService.java  |  30 +-
 .../ratis/netty/client/NettyClientStreamRpc.java   |   4 +-
 .../apache/ratis/server/leader/LogAppender.java    |   7 +
 .../ratis/server/raftlog/RaftLogSequentialOps.java |  13 +-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |   2 +-
 .../apache/ratis/server/impl/PendingStepDown.java  |   4 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  51 ++-
 .../apache/ratis/server/impl/RaftServerProxy.java  |   4 +-
 .../org/apache/ratis/server/impl/ReadRequests.java |  66 +++
 .../org/apache/ratis/server/impl/ServerState.java  |   8 +-
 .../impl/SnapshotManagementRequestHandler.java     |   4 +-
 .../ratis/server/impl/TransferLeadership.java      |   4 +-
 .../apache/ratis/server/impl/WatchRequests.java    |   2 +-
 .../ratis/server/leader/LogAppenderDefault.java    |  16 +-
 .../apache/ratis/server/raftlog/LogProtoUtils.java |   7 +-
 .../apache/ratis/server/raftlog/RaftLogBase.java   |   4 +-
 .../ratis/server/raftlog/memory/MemoryRaftLog.java |  21 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java |  12 +-
 .../raftlog/segmented/SegmentedRaftLogCache.java   |  15 +-
 .../ratis/statemachine/impl/BaseStateMachine.java  |   9 +-
 .../ratis/InstallSnapshotNotificationTests.java    |   7 +-
 .../ratis/server/impl/LeaderElectionTests.java     |  88 ++++
 .../apache/ratis/server/impl/MiniRaftCluster.java  |   6 +
 .../ratis/statemachine/SnapshotManagementTest.java |  30 ++
 ratis-shell/pom.xml                                |  41 +-
 ratis-shell/src/main/libexec/ratis-shell-config.sh |  11 +
 .../server/raftlog/memory/MemoryRaftLogTest.java   |  30 +-
 .../raftlog/segmented/TestCacheEviction.java       |   6 +-
 .../raftlog/segmented/TestSegmentedRaftLog.java    |   2 +-
 .../apache/ratis/util/TestTimeoutScheduler.java    |   4 +-
 57 files changed, 1516 insertions(+), 462 deletions(-)
 copy ratis-docs/pom.xml => ratis-assembly/src/main/assembly/bin-pkg.xml (50%)
 create mode 100644 ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java
 create mode 100644 ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java
 copy ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogIOException.java => ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java (61%)
 create mode 100644 ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java


[ratis] 03/12: RATIS-1655. Fix OrderedStreamAsync#scheduleWithTimeout to report a correct timeout value (#701)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit d5e3d413b3f8dc868549a246e9617f1aedc2c0cf
Author: Kiyoshi Mizumaru <ki...@gmail.com>
AuthorDate: Wed Aug 3 05:57:56 2022 +0900

    RATIS-1655. Fix OrderedStreamAsync#scheduleWithTimeout to report a correct timeout value (#701)
    
    
    (cherry picked from commit 3db8f9eac82efedd51322a0d213300c9a0940bb6)
---
 .../src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 5bc3242e7..ed4a20c03 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -170,7 +170,7 @@ public class OrderedStreamAsync {
     scheduler.onTimeout(timeout, () -> {
       if (!request.getReplyFuture().isDone()) {
         request.getReplyFuture().completeExceptionally(
-            new TimeoutIOException("Timeout " + requestTimeout + ": Failed to send " + request));
+            new TimeoutIOException("Timeout " + timeout + ": Failed to send " + request));
       }
     }, LOG, () -> "Failed to completeExceptionally for " + request);
   }


[ratis] 11/12: RATIS-1663. Record call id for board casting a heartbeat. (#706)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit bb0f0021f68c8b2c011be0de23b258ae24151848
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Aug 10 01:43:08 2022 -0700

    RATIS-1663. Record call id for board casting a heartbeat. (#706)
    
    
    (cherry picked from commit 5dd3c1db093bb06e462afbd0df4b8b215bbd8bf3)
---
 .../src/main/java/org/apache/ratis/rpc/CallId.java | 12 ++++
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 28 +++++++--
 .../apache/ratis/server/leader/LogAppender.java    |  7 +++
 .../org/apache/ratis/server/impl/ReadRequests.java | 66 ++++++++++++++++++++++
 .../ratis/server/leader/LogAppenderDefault.java    | 16 +++++-
 5 files changed, 120 insertions(+), 9 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
index a6914e27b..85e6ef06b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.rpc;
 
+import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -27,6 +28,17 @@ import java.util.concurrent.atomic.AtomicLong;
 public final class CallId {
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
 
+  private static final Comparator<Long> COMPARATOR = (left, right) -> {
+    final long diff = left - right;
+    // check diff < Long.MAX_VALUE/2 for the possibility of numerical overflow
+    return diff == 0? 0: diff > 0 && diff < Long.MAX_VALUE/2? 1: -1;
+  };
+
+  /** @return a long comparator, which takes care the possibility of numerical overflow, for comparing call ids. */
+  public static Comparator<Long> getComparator() {
+    return COMPARATOR;
+  }
+
   /** @return the default value. */
   public static long getDefault() {
     return 0;
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 3f01ea299..3e33a1787 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -47,6 +47,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
 
@@ -56,9 +57,16 @@ import org.apache.ratis.thirdparty.com.codahale.metrics.Timer;
 public class GrpcLogAppender extends LogAppenderBase {
   public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
 
+  private static final Comparator<Long> CALL_ID_COMPARATOR = (left, right) -> {
+    // calculate diff in order to take care the possibility of numerical overflow
+    final long diff = left - right;
+    return diff == 0? 0: diff > 0? 1: -1;
+  };
+
+  private final AtomicLong callId = new AtomicLong();
+
   private final RequestMap pendingRequests = new RequestMap();
   private final int maxPendingRequestsNum;
-  private long callId = 0;
   private volatile boolean firstResponseReceived = false;
   private final boolean installSnapshotEnabled;
 
@@ -235,15 +243,23 @@ public class GrpcLogAppender extends LogAppenderBase {
     }
   }
 
+  @Override
+  public long getCallId() {
+    return callId.get();
+  }
+
+  @Override
+  public Comparator<Long> getCallIdComparator() {
+    return CALL_ID_COMPARATOR;
+  }
+
   private void appendLog(boolean excludeLogEntries) throws IOException {
     final AppendEntriesRequestProto pending;
     final AppendEntriesRequest request;
-    final StreamObserver<AppendEntriesRequestProto> s;
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
-      // prepare and enqueue the append request. note changes on follower's
-      // nextIndex and ops on pendingRequests should always be associated
-      // together and protected by the lock
-      pending = newAppendEntriesRequest(callId++, excludeLogEntries);
+      // Prepare and send the append request.
+      // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
+      pending = newAppendEntriesRequest(callId.getAndIncrement(), excludeLogEntries);
       if (pending == null) {
         return;
       }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 135b4318d..ef5e1a7ed 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Comparator;
 
 /**
  * A {@link LogAppender} is for the leader to send appendEntries to a particular follower.
@@ -81,6 +82,12 @@ public interface LogAppender {
     return getFollower().getPeer().getId();
   }
 
+  /** @return the call id for the next {@link AppendEntriesRequestProto}. */
+  long getCallId();
+
+  /** @return the a {@link Comparator} for comparing call ids. */
+  Comparator<Long> getCallIdComparator();
+
   /**
    * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
    * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
new file mode 100644
index 000000000..b8d8998c7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.server.leader.LogAppender;
+
+/** For supporting linearizable read. */
+class ReadRequests {
+  /** The acknowledgement from a {@link LogAppender} of a heartbeat for a particular call id. */
+  static class HeartbeatAck {
+    private final LogAppender appender;
+    private final long minCallId;
+    private volatile boolean acknowledged = false;
+
+    HeartbeatAck(LogAppender appender) {
+      this.appender = appender;
+      this.minCallId = appender.getCallId();
+    }
+
+    /** Is the heartbeat (for a particular call id) acknowledged? */
+    boolean isAcknowledged() {
+      return acknowledged;
+    }
+
+    /**
+     * @return true if the acknowledged state is changed from false to true;
+     *         otherwise, the acknowledged state remains unchanged, return false.
+     */
+    boolean receive(AppendEntriesReplyProto reply) {
+      if (acknowledged) {
+        return false;
+      }
+      synchronized (this) {
+        if (!acknowledged && isValid(reply)) {
+          acknowledged = true;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    private boolean isValid(AppendEntriesReplyProto reply) {
+      if (reply == null || !reply.getServerReply().getSuccess()) {
+        return false;
+      }
+      // valid only if the reply has a later call id than the min.
+      return appender.getCallIdComparator().compare(reply.getServerReply().getCallId(), minCallId) >= 0;
+    }
+  }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index c9d341409..0c91427e5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.server.leader;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
@@ -30,6 +29,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.Comparator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -42,12 +42,22 @@ class LogAppenderDefault extends LogAppenderBase {
     super(server, leaderState, f);
   }
 
+  @Override
+  public long getCallId() {
+    return CallId.get();
+  }
+
+  @Override
+  public Comparator<Long> getCallIdComparator() {
+    return CallId.getComparator();
+  }
+
   /** Send an appendEntries RPC; retry indefinitely. */
-  @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
   private AppendEntriesReplyProto sendAppendEntriesWithRetries()
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
-    AppendEntriesRequestProto request = null;
+
+    AppendEntriesRequestProto request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
     while (isRunning()) { // keep retrying for IOException
       try {
         if (request == null || request.getEntriesCount() == 0) {


[ratis] 06/12: RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 2afee937948ac5eb2ec5f6193e8cfe137168b92b
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Sun Aug 7 08:28:09 2022 +0200

    RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)
    
    
    (cherry picked from commit 9bbb4401b832a69035bf0b186bb9525bf6aadeb9)
---
 .../src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++-
 .../main/java/org/apache/ratis/server/impl/RaftServerProxy.java    | 4 +++-
 .../java/org/apache/ratis/InstallSnapshotNotificationTests.java    | 7 ++++++-
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fe9f87ce9..6ab0a6a5c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1414,7 +1414,8 @@ class RaftServerImpl implements RaftServer.Division,
       }
     }
     return JavaUtils.allOf(futures).whenCompleteAsync(
-        (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
+        (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)),
+        serverExecutor
     ).thenApply(v -> {
       final AppendEntriesReplyProto reply;
       synchronized(this) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 96f7efbe1..ad4d988ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -115,7 +115,9 @@ class RaftServerProxy implements RaftServer {
         return;
       }
       isClosed = true;
-      map.entrySet().parallelStream().forEach(entry -> close(entry.getKey(), entry.getValue()));
+      ConcurrentUtils.parallelForEachAsync(map.entrySet(),
+          entry -> close(entry.getKey(), entry.getValue()),
+          executor);
     }
 
     private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> future) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 4476f3ecf..215e8408f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -51,6 +51,8 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -85,6 +87,9 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
   private static final AtomicInteger numNotifyInstallSnapshotFinished = new AtomicInteger();
 
   private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing {
+
+    private final Executor stateMachineExecutor = Executors.newSingleThreadExecutor();
+
     @Override
     public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
         RaftProtos.RoleInfoProto roleInfoProto,
@@ -120,7 +125,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
         return leaderSnapshotInfo.getTermIndex();
       };
 
-      return CompletableFuture.supplyAsync(supplier);
+      return CompletableFuture.supplyAsync(supplier, stateMachineExecutor);
     }
 
     @Override


[ratis] 05/12: RATIS-1603. TimeoutScheduler can have a huge amount of threads and cause OOM. (#666)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 0c2ecd615773cbb0bb6f3a475c93a79ea5982d7d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Aug 5 18:51:29 2022 -0700

    RATIS-1603. TimeoutScheduler can have a huge amount of threads and cause OOM. (#666)
    
    
    (cherry picked from commit 34867f25afc1329d722253e94e008f7616b9eb39)
---
 .../ratis/client/impl/OrderedStreamAsync.java      |   4 +-
 .../apache/ratis/client/impl/RaftClientImpl.java   |   7 +-
 .../org/apache/ratis/util/TimeoutExecutor.java     |  50 ++++++++++
 .../org/apache/ratis/util/TimeoutScheduler.java    |  22 +----
 .../java/org/apache/ratis/util/TimeoutTimer.java   | 109 +++++++++++++++++++++
 .../grpc/client/GrpcClientProtocolClient.java      |   5 +-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |   2 +-
 .../ratis/netty/client/NettyClientStreamRpc.java   |   4 +-
 .../apache/ratis/server/impl/PendingStepDown.java  |   4 +-
 .../impl/SnapshotManagementRequestHandler.java     |   4 +-
 .../ratis/server/impl/TransferLeadership.java      |   4 +-
 .../apache/ratis/server/impl/WatchRequests.java    |   2 +-
 .../apache/ratis/util/TestTimeoutScheduler.java    |   4 +-
 13 files changed, 183 insertions(+), 38 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index ed4a20c03..8683a7f20 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -33,7 +33,7 @@ import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SlidingWindow;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,7 +107,7 @@ public class OrderedStreamAsync {
   private final Semaphore requestSemaphore;
   private final TimeDuration requestTimeout;
   private final TimeDuration closeTimeout;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
     this.dataStreamClientRpc = dataStreamClientRpc;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 7313c9149..a333fe9a7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -48,7 +48,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -135,7 +135,7 @@ public final class RaftClientImpl implements RaftClient {
 
   private volatile RaftPeerId leaderId;
 
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   private final Supplier<OrderedAsync> orderedAsync;
   private final Supplier<AsyncImpl> asyncApi;
@@ -227,7 +227,7 @@ public final class RaftClientImpl implements RaftClient {
         TimeDuration.ZERO : sleepDefault;
   }
 
-  TimeoutScheduler getScheduler() {
+  TimeoutExecutor getScheduler() {
     return scheduler;
   }
 
@@ -404,7 +404,6 @@ public final class RaftClientImpl implements RaftClient {
 
   @Override
   public void close() throws IOException {
-    scheduler.close();
     clientRpc.close();
     if (dataStreamApi.isInitialized()) {
       dataStreamApi.get().close();
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java
new file mode 100644
index 000000000..aace13b88
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedRunnable;
+import org.slf4j.Logger;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/** Execute timeout tasks. */
+public interface TimeoutExecutor {
+  int MAXIMUM_POOL_SIZE =  8;
+  static TimeoutExecutor getInstance() {
+    return TimeoutTimer.getInstance();
+  }
+
+  /** @return the number of scheduled but not completed timeout tasks. */
+  int getTaskCount();
+
+  /**
+   * Schedule a timeout task.
+   *
+   * @param timeout the timeout value.
+   * @param task the task to run when timeout.
+   * @param errorHandler to handle the error, if there is any.
+   */
+  <THROWABLE extends Throwable> void onTimeout(
+      TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler);
+
+  /** When timeout, run the task.  Log the error, if there is any. */
+  default void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) {
+    onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
index 9c428f2f4..cba2851f4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java
@@ -21,7 +21,6 @@ import org.apache.ratis.util.function.CheckedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.ScheduledFuture;
@@ -32,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-public final class TimeoutScheduler implements Closeable {
+public final class TimeoutScheduler implements TimeoutExecutor {
   public static final Logger LOG = LoggerFactory.getLogger(TimeoutScheduler.class);
 
   static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES);
@@ -110,7 +109,8 @@ public final class TimeoutScheduler implements Closeable {
   private TimeoutScheduler() {
   }
 
-  int getQueueSize() {
+  @Override
+  public int getTaskCount() {
     return scheduler.getQueueSize();
   }
 
@@ -126,13 +126,7 @@ public final class TimeoutScheduler implements Closeable {
     return scheduler.hasExecutor();
   }
 
-  /**
-   * Schedule a timeout task.
-   *
-   * @param timeout the timeout value.
-   * @param task the task to run when timeout.
-   * @param errorHandler to handle the error, if there is any.
-   */
+  @Override
   public <THROWABLE extends Throwable> void onTimeout(
       TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
     onTimeout(timeout, sid -> {
@@ -186,13 +180,7 @@ public final class TimeoutScheduler implements Closeable {
     }
   }
 
-  /** When timeout, run the task.  Log the error, if there is any. */
-  public void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) {
-    onTimeout(timeout, task, t -> log.error(errorMessage.get(), t));
-  }
-
-  @Override
-  public synchronized void close() {
+  public synchronized void tryShutdownScheduler() {
     tryShutdownScheduler(scheduleID);
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java
new file mode 100644
index 000000000..36cbb2b29
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public final class TimeoutTimer implements TimeoutExecutor {
+  public static final Logger LOG = LoggerFactory.getLogger(TimeoutTimer.class);
+
+  private static final Supplier<TimeoutTimer> INSTANCE = JavaUtils.memoize(() -> new TimeoutTimer(MAXIMUM_POOL_SIZE));
+
+  public static TimeoutTimer getInstance() {
+    return INSTANCE.get();
+  }
+
+  static class Task extends TimerTask {
+    private final int id;
+    private final Runnable runnable;
+
+    Task(int id, Runnable runnable) {
+      this.id = id;
+      this.runnable = LogUtils.newRunnable(LOG, runnable, this::toString);
+    }
+
+    @Override
+    public void run() {
+      LOG.debug("run {}", this);
+      runnable.run();
+    }
+
+    @Override
+    public String toString() {
+      return "task #" + id;
+    }
+  }
+
+  /** The number of scheduled tasks. */
+  private final AtomicInteger numTasks = new AtomicInteger();
+  /** A unique ID for each task. */
+  private final AtomicInteger taskId = new AtomicInteger();
+
+  private final List<MemoizedSupplier<Timer>> timers;
+
+  private TimeoutTimer(int numTimers) {
+    final List<MemoizedSupplier<Timer>> list = new ArrayList<>(numTimers);
+    for(int i = 0; i < numTimers; i++) {
+      final String name = "timer" + i;
+      list.add(JavaUtils.memoize(() -> new Timer(name, true)));
+    }
+    this.timers = Collections.unmodifiableList(list);
+  }
+
+  @Override
+  public int getTaskCount() {
+    return numTasks.get();
+  }
+
+  private Timer getTimer(int tid) {
+    return timers.get(Math.toIntExact(Integer.toUnsignedLong(tid) % timers.size())).get();
+  }
+
+  private void schedule(TimeDuration timeout, Runnable toSchedule) {
+    final int tid = taskId.incrementAndGet();
+    final int n = numTasks.incrementAndGet();
+    LOG.debug("schedule a task #{} with timeout {}, numTasks={}", tid, timeout, n);
+    getTimer(n).schedule(new Task(tid, toSchedule), timeout.toLong(TimeUnit.MILLISECONDS));
+  }
+
+  @Override
+  public <THROWABLE extends Throwable> void onTimeout(
+      TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) {
+    schedule(timeout, () -> {
+      try {
+        task.run();
+      } catch(Throwable t) {
+        errorHandler.accept(JavaUtils.cast(t));
+      } finally {
+        numTasks.decrementAndGet();
+      }
+    });
+  }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index c2ee1f247..d8b128a43 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -58,7 +58,7 @@ import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +86,7 @@ public class GrpcClientProtocolClient implements Closeable {
 
   private final TimeDuration requestTimeoutDuration;
   private final TimeDuration watchRequestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   private final RaftClientProtocolServiceStub asyncStub;
   private final AdminProtocolServiceBlockingStub adminBlockingStub;
@@ -173,7 +173,6 @@ public class GrpcClientProtocolClient implements Closeable {
     if (clientChannel != adminChannel) {
       GrpcUtil.shutdownManagedChannel(adminChannel);
     }
-    scheduler.close();
     metricClientInterceptor.close();
   }
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 69a3795ca..3f01ea299 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -63,7 +63,7 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final boolean installSnapshotEnabled;
 
   private final TimeDuration requestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   private volatile StreamObservers appendLogRequestObserver;
   private final boolean useSeparateHBChannel;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 37f09b8da..a03013748 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -52,7 +52,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.NetUtils;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -243,7 +243,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
 
   private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>();
   private final TimeDuration replyQueueGracePeriod;
-  private final TimeoutScheduler timeoutScheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance();
 
   public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
index 5b928a69e..b7bfde3f6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java
@@ -25,7 +25,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,7 +81,7 @@ public class PendingStepDown {
   }
 
   private final LeaderStateImpl leader;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
   private final PendingRequestReference pending = new PendingRequestReference();
 
   PendingStepDown(LeaderStateImpl leaderState) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
index 416e501ad..b899752e0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java
@@ -23,7 +23,7 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,7 +89,7 @@ class SnapshotManagementRequestHandler {
   }
 
   private final RaftServerImpl server;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
   private final PendingRequestReference pending = new PendingRequestReference();
 
   SnapshotManagementRequestHandler(RaftServerImpl server) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index 03b1dfc80..3aed1a10d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -24,7 +24,7 @@ import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +74,7 @@ public class TransferLeadership {
   }
 
   private final RaftServerImpl server;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
   private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
 
   TransferLeadership(RaftServerImpl server) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
index 3b95d4bf1..f4c6200b9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java
@@ -169,7 +169,7 @@ class WatchRequests {
 
   private final TimeDuration watchTimeoutNanos;
   private final TimeDuration watchTimeoutDenominationNanos;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   WatchRequests(Object name, RaftProperties properties) {
     this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
index dbe0e943f..cca1cfdea 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java
@@ -223,7 +223,7 @@ public class TestTimeoutScheduler extends BaseTest {
     }
     HUNDRED_MILLIS.sleep();
     HUNDRED_MILLIS.sleep();
-    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getTaskCount()),
         10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
 
     final TimeDuration oneMillis = TimeDuration.valueOf(1, TimeUnit.MILLISECONDS);
@@ -234,7 +234,7 @@ public class TestTimeoutScheduler extends BaseTest {
       oneMillis.sleep();
     }
     HUNDRED_MILLIS.sleep();
-    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()),
+    JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getTaskCount()),
         10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG);
 
     errorHandler.assertNoError();


[ratis] 08/12: RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 07f04cbd299e653cbad4bd1589b2b46aa1427bf7
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Aug 8 18:57:47 2022 -0700

    RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc. (#698)
    
    * RATIS-1641. Cleanup the Counter example and revise the "Getting Started" doc.
    
    * Revert ArithmeticStateMachine.java
    
    * Add an option to use AsyncApi
    
    * Fix a long line
    
    * Minor changes in CounterClient.
    
    * Remove the use of Charset.
    
    * Revert some CounterServer change for a findbugs warning.
    
    * Fix a bug in CounterServer.main.
    
    (cherry picked from commit 1009a6c385373b3280cc9066e02ce050911627f8)
---
 ratis-docs/src/site/markdown/start/index.md        | 653 +++++++++++++--------
 .../apache/ratis/examples/common/Constants.java    |   5 +-
 .../ratis/examples/counter/CounterCommand.java     |  41 ++
 .../examples/counter/client/CounterClient.java     | 118 ++--
 .../examples/counter/server/CounterServer.java     |  59 +-
 .../counter/server/CounterStateMachine.java        | 205 ++++---
 .../apache/ratis/examples/counter/TestCounter.java |  10 +-
 .../ratis/statemachine/impl/BaseStateMachine.java  |   9 +-
 8 files changed, 679 insertions(+), 421 deletions(-)

diff --git a/ratis-docs/src/site/markdown/start/index.md b/ratis-docs/src/site/markdown/start/index.md
index 7aca21f9b..beaa519ea 100644
--- a/ratis-docs/src/site/markdown/start/index.md
+++ b/ratis-docs/src/site/markdown/start/index.md
@@ -18,23 +18,51 @@
 # Getting Started
 Let's get started to use Raft in your application.
 To demonstrate how to use Ratis,
-we'll implement a simple counter service,
-which maintains a counter value across a cluster.
-The client could send the following types of commands to the cluster:
+we implement a simple *Counter* service,
+which maintains a counter value across a raft group.
+Clients could send the following types of requests to the raft group:
 
-* `INCREMENT`: increase the counter value
-* `GET`: query the current value of the counter,
-we call such kind of commands as read-only commands
+* `INCREMENT`: increase the counter value by 1.
+This command will trigger a transaction to change the state.
+* `GET`: query the current value of the counter.
+This is a read-only command since it does not change the state.
 
-Note: The full source could be found at [Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples).
-This article is mainly intended to show the steps of integration of Ratis,
-if you wish to run this example or find more examples,
+We have the following `enum` for representing the supported commands.
+
+```java
+/**
+ * The supported commands the Counter example.
+ */
+public enum CounterCommand {
+  /** Increment the counter by 1. */
+  INCREMENT,
+  /** Get the counter value. */
+  GET;
+
+  private final Message message = Message.valueOf(name());
+
+  public Message getMessage() {
+    return message;
+  }
+
+  /** Does the given command string match this command? */
+  public boolean matches(String command) {
+    return name().equalsIgnoreCase(command);
+  }
+}
+```
+
+Note:
+The source code of the Counter example and the other examples is at
+[Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples).
+This article intends to show the steps of integration of Ratis.
+If you wish to run the Counter example
 please refer to [the README](https://github.com/apache/ratis/tree/master/ratis-examples#example-3-counter).
 
-## Add the dependency
+## Adding the Dependency
 
-First, we need to add Ratis dependencies into the project,
-it's available in maven central:
+The first step is to add Ratis dependencies into the project.
+The dependencies are available in maven central:
 
 ```xml
 <dependency>
@@ -43,13 +71,14 @@ it's available in maven central:
 </dependency>
 ```
 
-Also, one of the following transports need to be added:
+Then, add one of the following transports:
 
-* grpc
-* netty
-* hadoop
+* ratis-grpc
+* ratis-netty
+* ratis-hadoop
 
-For example, let's use grpc transport:
+In this example,
+we choose to use ratis-grpc:
 
 ```xml
 <dependency>
@@ -61,294 +90,436 @@ For example, let's use grpc transport:
 Please note that Apache Hadoop dependencies are shaded,
 so it’s safe to use hadoop transport with different versions of Hadoop.
 
-## Create the StateMachine
-A state machine is used to maintain the current state of the raft node,
-the state machine is responsible for:
-
-* Execute raft logs to get the state. In this example, when a `INCREMENT` log is executed, 
-the counter value will be increased by 1.
-And a `GET` log does not affect the state but only returns the current counter value to the client.
-* Managing snapshots loading/saving.
-Snapshots are used to speed the log execution,
-the state machine could start from a snapshot point and only execute newer logs.
-
-### Define the StateMachine
-To define our state machine,
-we can extend a class from the base class `BaseStateMachine`.
-
-Also, a storage is needed to store snapshots,
-and we'll use the build-in `SimpleStateMachineStorage`,
-which is a file-based storage implementation.
-
-Since we're going to implement a count server,
-the `counter` instance is defined in the state machine,
-represents the current value.
-Below is the declaration of the state machine: 
+## Implementing the `CounterStateMachine`
+A state machine manages the application logic.
+The state machine is responsible for:
+
+* Apply raft log transactions in order to maintain the current state.
+  * When there is an `INCREMENT` request,
+    it will first be written to the raft log as a log entry.
+    Once the log entry is committed,
+    the state machine will be invoked for applying the log entry as a transaction
+    so that the counter value will be increased by 1.
+  * When there is a `GET` request,
+    it will not be written to the raft log
+    since it is a readonly request which does not change the state.
+    The state machine should return the current value of the counter.
+* Manage snapshots loading/saving.
+  * Snapshots are used for log compaction
+    so that the state machine can be restored from a snapshot
+    and then applies only the newer log entries,
+    instead of applying a long history of log starting from the beginning.
+
+We discuss how to implement `CounterStateMachine` in the following subsections.
+The complete source code of it is in
+[CounterStateMachine.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java).
+
+### Defining the State
+In this example,
+the `CounterStateMachine` extends the `BaseStateMachine`,
+which provides a base implementation of a `StateMachine`.
+
+Inside the `CounterStateMachine`,
+there is a `counter` object
+which stores the current value.
+The `counter` is an `AtomicInteger`
+in order to support concurrent access.
+We use the build-in `SimpleStateMachineStorage`,
+which is a file-based storage implementation,
+as a storage for storing snapshots.
+The fields are shown below:
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    private final SimpleStateMachineStorage storage =
-            new SimpleStateMachineStorage();
-    private AtomicInteger counter = new AtomicInteger(0);
-    // ...
+  // ...
+
+  private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final AtomicInteger counter = new AtomicInteger(0);
+
+  // ...
 }
 ```
 
-### Apply Raft Log Item
+### Applying Raft Log Entries
 
-Once the raft log is committed,
-Ratis will notify state machine by invoking the `public CompletableFuture<Message> applyTransaction(TransactionContext trx)` method,
-and we need to override this method to decode the message and apply it. 
-
-First, get the log content and decode it:
+Once a raft log entry has been committed
+(i.e. a majority of the servers have acknowledged),
+Ratis notifies the state machine by invoking the `applyTransaction` method.
+The `applyTransaction` method first validates the log entry.
+Then, it applies the log entry by increasing the counter value and updates the term-index.
+The code fragments are shown below.
+Note that the `incrementCounter` method is synchronized
+in order to update both counter and last applied term-index atomically.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-        final RaftProtos.LogEntryProto entry = trx.getLogEntry();
-        String logData = entry.getStateMachineLogEntry().getLogData()
-                .toString(Charset.defaultCharset());
-        if (!logData.equals("INCREMENT")) {
-            return CompletableFuture.completedFuture(
-                    Message.valueOf("Invalid Command"));
-        }
-        // ...
+  // ...
+
+  private synchronized int incrementCounter(TermIndex termIndex) {
+    updateLastAppliedTermIndex(termIndex);
+    return counter.incrementAndGet();
+  }
+
+  // ...
+
+  /**
+   * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object.
+   *
+   * @param trx the transaction context
+   * @return the message containing the updated counter value
+   */
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    final LogEntryProto entry = trx.getLogEntry();
+
+    //check if the command is valid
+    final String command = entry.getStateMachineLogEntry().getLogData().toString(Charset.defaultCharset());
+    if (!CounterCommand.INCREMENT.match(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
+    //increment the counter and update term-index
+    final TermIndex termIndex = TermIndex.valueOf(entry);
+    final long incremented = incrementCounter(termIndex);
+
+    //if leader, log the incremented value and the term-index
+    if (trx.getServerRole() == RaftPeerRole.LEADER) {
+      LOG.info("{}: Increment to {}", termIndex, incremented);
+    }
+
+    //return the new value of the counter to the client
+    return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
+  }
+
+  // ...
 }
 ```
 
-After that, if the log is valid,
-we could apply it by increasing the counter value.
-Remember that we also need to update the committed indexes:
+### Processing Readonly Commands
+The `INCREMENT` command is implemented in the previous section.
+What about the `GET` command?
+Since the `GET` command is a readonly command,
+it is implemented by the `query` method instead of the `applyTransaction` method.
+The code fragment is shown below.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-        // ...
-        final long index = entry.getIndex();
-        updateLastAppliedTermIndex(entry.getTerm(), index);
-
-        //actual execution of the command: increment the counter
-        counter.incrementAndGet();
-
-        //return the new value of the counter to the client
-        final CompletableFuture<Message> f =
-                CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
-        return f;
+  // ...
+
+  /**
+   * Process {@link CounterCommand#GET}, which gets the counter value.
+   *
+   * @param request the GET request
+   * @return a {@link Message} containing the current counter value as a {@link String}.
+   */
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    final String command = request.getContent().toString(Charset.defaultCharset());
+    if (!CounterCommand.GET.match(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
+    return CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+  }
+
+  // ...
 }
 ```
 
-### Handle Readonly Command
-Note that we only handled `INCREMENT` command,
-what about the `GET` command?
-The `GET` command is implemented as a readonly command,
-so we'll need to implement `public CompletableFuture<Message> query(Message request)` instead of `applyTransaction`.
+### Taking Snapshots
+When taking a snapshot,
+the state is persisted in the storage of the state machine.
+The snapshot can be loaded for restoring the state in the future.
+In this example,
+we use `ObjectOutputStream` to write the counter value to a snapshot file.
+The term-index is stored in the file name of the snapshot file.
+The code fragments are shown below.
+Note that the `getState` method is synchronized
+in order to get the applied term-index and the counter value atomically.
+Note also that getting the counter value alone does not have to be synchronized
+since the `counter` field is already an `AtomicInteger`.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    @Override
-    public CompletableFuture<Message> query(Message request) {
-        String msg = request.getContent().toString(Charset.defaultCharset());
-        if (!msg.equals("GET")) {
-            return CompletableFuture.completedFuture(
-                    Message.valueOf("Invalid Command"));
-        }
-        return CompletableFuture.completedFuture(
-                Message.valueOf(counter.toString()));
+  // ...
+
+  /** The state of the {@link CounterStateMachine}. */
+  static class CounterState {
+    private final TermIndex applied;
+    private final int counter;
+
+    CounterState(TermIndex applied, int counter) {
+      this.applied = applied;
+      this.counter = counter;
+    }
+
+    TermIndex getApplied() {
+      return applied;
+    }
+
+    int getCounter() {
+      return counter;
+    }
+  }
+
+  // ...
+
+  /** @return the current state. */
+  private synchronized CounterState getState() {
+    return new CounterState(getLastAppliedTermIndex(), counter.get());
+  }
+
+  // ...
+
+  /**
+   * Store the current state as a snapshot file in the {@link #storage}.
+   *
+   * @return the index of the snapshot
+   */
+  @Override
+  public long takeSnapshot() {
+    //get the current state
+    final CounterState state = getState();
+    final long index = state.getApplied().getIndex();
+
+    //create a file with a proper name to store the snapshot
+    final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index);
+
+    //write the counter value into the snapshot file
+    try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(
+        Files.newOutputStream(snapshotFile.toPath())))) {
+      out.writeInt(state.getCounter());
+    } catch (IOException ioe) {
+      LOG.warn("Failed to write snapshot file \"" + snapshotFile
+              + "\", last applied index=" + state.getApplied());
     }
+
+    //return the index of the stored snapshot (which is the last applied one)
+    return index;
+  }
+
+  // ...
 }
 ```
 
-### Save and Load Snapshots
-When taking a snapshot,
-we persist every state in the state machine,
-and the value could be loaded directly to the state in the future.
-In this example,
-the only state is the counter value,
-we're going to use `ObjectOutputStream` to write it to a snapshot file:
+### Loading Snapshots
+When loading a snapshot,
+we use an `ObjectInputStream` to read the snapshot file.
+The term-index is read from the file name of the snapshot file.
+The code fragments are shown below.
+Note that the `updateState` method is synchronized
+in order to update the applied term-index and the counter value atomically.
 
 ```java
 public class CounterStateMachine extends BaseStateMachine {
-    // ...
-    @Override
-    public long takeSnapshot() {
-        //get the last applied index
-        final TermIndex last = getLastAppliedTermIndex();
-
-        //create a file with a proper name to store the snapshot
-        final File snapshotFile =
-                storage.getSnapshotFile(last.getTerm(), last.getIndex());
-
-        //serialize the counter object and write it into the snapshot file
-        try (ObjectOutputStream out = new ObjectOutputStream(
-                new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
-            out.writeObject(counter);
-        } catch (IOException ioe) {
-            LOG.warn("Failed to write snapshot file \"" + snapshotFile
-                    + "\", last applied index=" + last);
-        }
-
-        //return the index of the stored snapshot (which is the last applied one)
-        return last.getIndex();
+  // ...
+
+  private synchronized void updateState(TermIndex applied, int counterValue) {
+    updateLastAppliedTermIndex(applied);
+    counter.set(counterValue);
+  }
+
+  // ...
+
+  /**
+   * Load the state of the state machine from the {@link #storage}.
+   *
+   * @param snapshot the information of the snapshot being loaded
+   * @return the index of the snapshot or -1 if snapshot is invalid
+   * @throws IOException if it failed to read from storage
+   */
+  private long load(SingleFileSnapshotInfo snapshot) throws IOException {
+    //check null
+    if (snapshot == null) {
+      LOG.warn("The snapshot info is null.");
+      return RaftLog.INVALID_LOG_INDEX;
+    }
+    //check if the snapshot file exists.
+    final Path snapshotPath = snapshot.getFile().getPath();
+    if (!Files.exists(snapshotPath)) {
+      LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot);
+      return RaftLog.INVALID_LOG_INDEX;
     }
+
+    //read the TermIndex from the snapshot file name
+    final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile());
+
+    //read the counter value from the snapshot file
+    final int counterValue;
+    try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) {
+      counterValue = in.readInt();
+    }
+
+    //update state
+    updateState(last, counterValue);
+
+    return last.getIndex();
+  }
+
+  // ...
 }
 ```
 
-When loading it,
-we could use `ObjectInputStream` to deserialize it.
-Remember that we also need to implement `initialize` and `reinitialize` method,
-so that the state machine will be correctly initialized.
-
-## Build and Start a RaftServer
-In order to build a raft cluster,
-each node must start a `RaftServer` instance,
-which is responsible for communicating to each other through Raft protocol.
+### Implementing the `initialize` and `reinitialize` methods.
+The `initialize` method is called at most once
+when the server is starting up.
+In contrast,
+the `reinitialize` method is called when
+1. the server is resumed from the `PAUSE` state, or
+2. a new snapshot is installed from the leader or from an external source.
+
+In `CounterStateMachine`,
+the `reinitialize` method simply loads the latest snapshot
+and the `initialize` method additionally initializes the `BaseStateMachine` super class and the storage.
+```java
+public class CounterStateMachine extends BaseStateMachine {
+  // ...
+
+  /**
+   * Initialize the state machine storage and then load the state.
+   *
+   * @param server  the server running this state machine
+   * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup}
+   * @param raftStorage the storage of the server
+   * @throws IOException if it fails to load the state.
+   */
+  @Override
+  public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
+    super.initialize(server, groupId, raftStorage);
+    storage.init(raftStorage);
+    reinitialize();
+  }
+
+  /**
+   * Simply load the state.
+   *
+   * @throws IOException if it fails to load the state.
+   */
+  @Override
+  public void reinitialize() throws IOException {
+    load(storage.getLatestSnapshot());
+  }
+
+  // ...
+}
+```
+## Preparing a `RaftGroup`
+In order to run a raft group,
+each server must start a `RaftServer` instance,
+which is responsible for communicating to each other through the Raft protocol.
 
 It's important to keep in mind that,
-each raft server knows exactly how many raft peers are in the cluster,
-and what are the addresses of them.
-In this example, we'll set a 3 node cluster.
+each raft server knows the initial raft group when starting up.
+They know the number of raft peers in the group
+and the addresses of the peers.
+
+In this example, we have a raft group with 3 peers.
 For simplicity,
-each peer listens to specific port on the same machine,
-and we can define the addresses of the cluster in a configuration file:
+each peer listens to a specific port on the same machine.
+The addresses of them are defined in a
+[property file](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/resources/conf.properties)
+as below.
 
 ```properties
 raft.server.address.list=127.0.0.1:10024,127.0.0.1:10124,127.0.0.1:11124
 ```
 
-We name those peers as 'n-0', 'n-1' and 'n-2',
-and then we will create a `RaftGroup` instance representing them.
-Since they are immutable,
-we'll put them in the `Constant` class:
+The peers are named as 'n0', 'n1' and 'n2'
+and they form a `RaftGroup`.
+For more details, see
+[Constants.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java).
 
-```java
-public final class Constants {
-    public static final List<RaftPeer> PEERS;
-    private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
-
-    static {
-        // load addresses from configuration file
-        // final String[] addresses = ...
-        List<RaftPeer> peers = new ArrayList<>(addresses.length);
-        for (int i = 0; i < addresses.length; i++) {
-            peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).build());
-        }
-        PEERS = Collections.unmodifiableList(peers);
-    }
-    
-    public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
-            RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS);
-    // ...
-}
-```
+## Building & Starting the `CounterServer`
 
-Except for the cluster info,
-another important thing is that we need to know the information of the current peer.
-To achieve this,
-we could pass the current peer's id as a program argument,
-and then the raft server could be created:
+We use a `RaftServer.Builder` to build a `RaftServer`.
+We first set up a `RaftProperties` object
+with a local directory as the storage of the server
+and a port number as the gRPC server port.
+Then,
+we create our `CounterStateMachine`
+and pass everything to the builder as below.
 
 ```java
 public final class CounterServer implements Closeable {
-    private final RaftServer server;
-
-    // the current peer will be passed as argument
-    public CounterServer(RaftPeer peer, File storageDir) throws IOException {
-        // ...
-        CounterStateMachine counterStateMachine = new CounterStateMachine();
-
-        //create and start the Raft server
-        this.server = RaftServer.newBuilder()
-                .setGroup(Constants.RAFT_GROUP)
-                .setProperties(properties)
-                .setServerId(peer.getId())
-                .setStateMachine(counterStateMachine)
-                .build();
-    }
+  private final RaftServer server;
 
-    public void start() throws IOException {
-        server.start();
-    }
-}
-```
+  public CounterServer(RaftPeer peer, File storageDir) throws IOException {
+    //create a property object
+    final RaftProperties properties = new RaftProperties();
 
-Each `RaftServer` will own a `CounterStateMachine` instance,
-as previously defined by us.
-After that, all we need to do is to start it along with our application:
+    //set the storage directory (different for each peer) in the RaftProperty object
+    RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
 
-```java
-public final class CounterServer implements Closeable {
-    // ...
-    public static void main(String[] args) throws IOException {
-        // ...
-        //find current peer object based on application parameter
-        final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1);
-
-        //start a counter server
-        final File storageDir = new File("./" + currentPeer.getId());
-        final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
-        counterServer.start();
-        // ...
-    }
+    //set the port (different for each peer) in RaftProperty object
+    final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
+    GrpcConfigKeys.Server.setPort(properties, port);
+
+    //create the counter state machine which holds the counter value
+    final CounterStateMachine counterStateMachine = new CounterStateMachine();
+
+    //build the Raft server
+    this.server = RaftServer.newBuilder()
+            .setGroup(Constants.RAFT_GROUP)
+            .setProperties(properties)
+            .setServerId(peer.getId())
+            .setStateMachine(counterStateMachine)
+            .build();
+  }
 
+  // ...
 }
 ```
 
-After the server is started,
-it will try to communicate with other peers in the cluster,
-and perform raft actions like leader election, append log entries, etc.
+Now we are ready to start our `CounterServer` peers and form a raft group.
+The command is:
+```shell
+java org.apache.ratis.examples.counter.server.CounterServer peer_index
+```
+The argument `peer_index` must be 0, 1 or 2.
+
+After a server is started,
+it communicates with other peers in the group,
+and performs raft actions such as leader election and append-log-entries.
+After all three servers are started,
+the counter service is up and running with the Raft protocol.
 
-## Build Raft Client
+For more details, see
+[CounterServer.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java).
 
-To send commands to the cluster,
-we need to use a `RaftClient` instance.
-All we need to know is the peers in the cluster, ie. the raft group. 
+## Building & Running the `CounterClient`
+
+We use a `RaftGroup` to build a `RaftClient`
+and then use the `RaftClient` to send commands to the raft service.
+Note that gRPC is the default RPC type
+so that we may skip setting it in the `RaftProperties`.
 
 ```java
-public final class CounterClient {
-    // ...
-    private static RaftClient buildClient() {
-        RaftProperties raftProperties = new RaftProperties();
-        RaftClient.Builder builder = RaftClient.newBuilder()
-                .setProperties(raftProperties)
-                .setRaftGroup(Constants.RAFT_GROUP)
-                .setClientRpc(
-                        new GrpcFactory(new Parameters())
-                                .newRaftClientRpc(ClientId.randomId(), raftProperties));
-        return builder.build();
-    }
+public final class CounterClient implements Closeable {
+  private final RaftClient client = RaftClient.newBuilder()
+          .setProperties(new RaftProperties())
+          .setRaftGroup(Constants.RAFT_GROUP)
+          .build();
+
+  // ...
 }
 ```
 
 With this raft client,
-we can then send commands by `raftClient.io().send` method,
-and use `raftClient.io().sendReadonly` method for read only commands.
-In this example,
-to send `INCREMENT` and `GET` command,
-we can do it like this:
-
+we can then send commands using the `BlockingApi` returned by `RaftClient.io()`,
+or the `AsyncApi` returned by `RaftClient.async()`.
+The `send` method in the `BlockingApi`/`AsyncApi` is used to send the `INCREMENT` command as below.
 ```java
-raftClient.io().send(Message.valueOf("INCREMENT")));
+client.io().send(CounterCommand.INCREMENT.getMessage());
 ```
-
-and
-
+or
 ```java
-RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
-String response = count.getMessage().getContent().toString(Charset.defaultCharset());
-System.out.println(response);
+client.async().send(CounterCommand.INCREMENT.getMessage());
 ```
-
-## Summary
-It might seem a little complicated for beginners,
-but since Raft itself is a hard topic,
-this is already the simplest example we've found as a 'Hello World' for Ratis.
-After you have a basic understanding of Ratis,
-you'll find it really easy to be integrated into any projects. 
-
-Next, you can take a look at other [examples](https://github.com/apache/ratis/tree/master/ratis-examples),
-to know more about the features of Ratis.
\ No newline at end of file
+The `sendReadonly` method in the `BlockingApi`/`AsyncApi` is used to send the `GET` command as below.
+```java
+client.io().sendReadOnly(CounterCommand.GET.getMessage());
+```
+or
+```java
+client.async().sendReadOnly(CounterCommand.GET.getMessage());
+```
+For more details, see
+[CounterClient.java](https://github.com/apache/ratis/blob/master/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java).
\ No newline at end of file
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
index 9da64092d..9fa873fc2 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/Constants.java
@@ -71,10 +71,9 @@ public final class Constants {
     PEERS = Collections.unmodifiableList(peers);
   }
 
-  private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
+  private static final UUID GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
 
-  public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
-      RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS);
+  public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(RaftGroupId.valueOf(Constants.GROUP_ID), PEERS);
 
   private Constants() {
   }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
new file mode 100644
index 000000000..843a158fb
--- /dev/null
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/CounterCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.counter;
+
+import org.apache.ratis.protocol.Message;
+
+/**
+ * The supported commands the Counter example.
+ */
+public enum CounterCommand {
+  /** Increment the counter by 1. */
+  INCREMENT,
+  /** Get the counter value. */
+  GET;
+
+  private final Message message = Message.valueOf(name());
+
+  public Message getMessage() {
+    return message;
+  }
+
+  /** Does the given command string match this command? */
+  public boolean matches(String command) {
+    return name().equalsIgnoreCase(command);
+  }
+}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index a3f28cb6a..c0350ec72 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,24 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.examples.counter.client;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.examples.common.Constants;
-import org.apache.ratis.grpc.GrpcFactory;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
+import org.apache.ratis.examples.counter.CounterCommand;
 import org.apache.ratis.protocol.RaftClientReply;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 
 /**
  * Counter client application, this application sends specific number of
@@ -42,55 +40,75 @@ import java.util.concurrent.TimeUnit;
  * Parameter to this application indicate the number of INCREMENT command, if no
  * parameter found, application use default value which is 10
  */
-public final class CounterClient {
+public final class CounterClient implements Closeable {
+  //build the client
+  private final RaftClient client = RaftClient.newBuilder()
+      .setProperties(new RaftProperties())
+      .setRaftGroup(Constants.RAFT_GROUP)
+      .build();
 
-  private CounterClient(){
+  @Override
+  public void close() throws IOException {
+    client.close();
   }
 
-  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
-  public static void main(String[] args)
-      throws IOException, InterruptedException {
-    //indicate the number of INCREMENT command, set 10 if no parameter passed
-    int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
-
-    //build the counter cluster client
-    RaftClient raftClient = buildClient();
+  private void run(int increment, boolean blocking) throws Exception {
+    System.out.printf("Sending %d %s command(s) using the %s ...%n",
+        increment, CounterCommand.INCREMENT, blocking? "BlockingApi": "AsyncApi");
+    final List<Future<RaftClientReply>> futures = new ArrayList<>(increment);
 
-    //use a executor service with 10 thread to send INCREMENT commands
-    // concurrently
-    ExecutorService executorService = Executors.newFixedThreadPool(10);
-
-    //send INCREMENT commands concurrently
-    System.out.printf("Sending %d increment command...%n", increment);
-    for (int i = 0; i < increment; i++) {
-      executorService.submit(() ->
-          raftClient.io().send(Message.valueOf("INCREMENT")));
+    //send INCREMENT command(s)
+    if (blocking) {
+      // use BlockingApi
+      final ExecutorService executor = Executors.newFixedThreadPool(10);
+      for (int i = 0; i < increment; i++) {
+        final Future<RaftClientReply> f = executor.submit(
+            () -> client.io().send(CounterCommand.INCREMENT.getMessage()));
+        futures.add(f);
+      }
+      executor.shutdown();
+    } else {
+      // use AsyncApi
+      for (int i = 0; i < increment; i++) {
+        final Future<RaftClientReply> f = client.async().send(CounterCommand.INCREMENT.getMessage());
+        futures.add(f);
+      }
     }
 
-    //shutdown the executor service and wait until they finish their work
-    executorService.shutdown();
-    executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS);
+    //wait for the futures
+    for (Future<RaftClientReply> f : futures) {
+      final RaftClientReply reply = f.get();
+      if (reply.isSuccess()) {
+        final String count = reply.getMessage().getContent().toStringUtf8();
+        System.out.println("Counter is incremented to " + count);
+      } else {
+        System.err.println("Failed " + reply);
+      }
+    }
 
-    //send GET command and print the response
-    RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
-    String response = count.getMessage().getContent().toString(Charset.defaultCharset());
-    System.out.println(response);
+    //send a GET command and print the reply
+    final RaftClientReply reply = client.io().sendReadOnly(CounterCommand.GET.getMessage());
+    final String count = reply.getMessage().getContent().toStringUtf8();
+    System.out.println("Current counter value: " + count);
   }
 
-  /**
-   * build the RaftClient instance which is used to communicate to
-   * Counter cluster
-   *
-   * @return the created client of Counter cluster
-   */
-  private static RaftClient buildClient() {
-    RaftProperties raftProperties = new RaftProperties();
-    RaftClient.Builder builder = RaftClient.newBuilder()
-        .setProperties(raftProperties)
-        .setRaftGroup(Constants.RAFT_GROUP)
-        .setClientRpc(
-            new GrpcFactory(new Parameters())
-                .newRaftClientRpc(ClientId.randomId(), raftProperties));
-    return builder.build();
+  public static void main(String[] args) {
+    try(CounterClient client = new CounterClient()) {
+      //the number of INCREMENT commands, default is 10
+      final int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
+      final boolean io = args.length > 1 && "io".equalsIgnoreCase(args[1]);
+      client.run(increment, io);
+    } catch (Throwable e) {
+      e.printStackTrace();
+      System.err.println();
+      System.err.println("args = " + Arrays.toString(args));
+      System.err.println();
+      System.err.println("Usage: java org.apache.ratis.examples.counter.client.CounterClient [increment] [async|io]");
+      System.err.println();
+      System.err.println("       increment: the number of INCREMENT commands to be sent (default is 10)");
+      System.err.println("       async    : use the AsyncApi (default)");
+      System.err.println("       io       : use the BlockingApi");
+      System.exit(1);
+    }
   }
 }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
index b6fc8c7d8..7a6367ece 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.examples.counter.server;
 
 import org.apache.ratis.conf.RaftProperties;
@@ -29,6 +28,7 @@ import org.apache.ratis.util.NetUtils;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Scanner;
 
@@ -48,19 +48,19 @@ public final class CounterServer implements Closeable {
 
   public CounterServer(RaftPeer peer, File storageDir) throws IOException {
     //create a property object
-    RaftProperties properties = new RaftProperties();
+    final RaftProperties properties = new RaftProperties();
 
-    //set the storage directory (different for each peer) in RaftProperty object
+    //set the storage directory (different for each peer) in the RaftProperty object
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
 
-    //set the port which server listen to in RaftProperty object
+    //set the port (different for each peer) in RaftProperty object
     final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
     GrpcConfigKeys.Server.setPort(properties, port);
 
-    //create the counter state machine which hold the counter value
-    CounterStateMachine counterStateMachine = new CounterStateMachine();
+    //create the counter state machine which holds the counter value
+    final CounterStateMachine counterStateMachine = new CounterStateMachine();
 
-    //create and start the Raft server
+    //build the Raft server
     this.server = RaftServer.newBuilder()
         .setGroup(Constants.RAFT_GROUP)
         .setProperties(properties)
@@ -78,24 +78,41 @@ public final class CounterServer implements Closeable {
     server.close();
   }
 
-  public static void main(String[] args) throws IOException {
-    if (args.length < 1) {
-      System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
-      System.err.println("{serverIndex} could be 1, 2 or 3");
+  public static void main(String[] args) {
+    try {
+      //get peerIndex from the arguments
+      if (args.length != 1) {
+        throw new IllegalArgumentException("Invalid argument number: expected to be 1 but actual is " + args.length);
+      }
+      final int peerIndex = Integer.parseInt(args[0]);
+      if (peerIndex < 0 || peerIndex > 2) {
+        throw new IllegalArgumentException("The server index must be 0, 1 or 2: peerIndex=" + peerIndex);
+      }
+
+      startServer(peerIndex);
+    } catch(Throwable e) {
+      e.printStackTrace();
+      System.err.println();
+      System.err.println("args = " + Arrays.toString(args));
+      System.err.println();
+      System.err.println("Usage: java org.apache.ratis.examples.counter.server.CounterServer peer_index");
+      System.err.println();
+      System.err.println("       peer_index must be 0, 1 or 2");
       System.exit(1);
     }
+  }
 
-    //find current peer object based on application parameter
-    final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1);
+  private static void startServer(int peerIndex) throws IOException {
+    //get peer and define storage dir
+    final RaftPeer currentPeer = Constants.PEERS.get(peerIndex);
+    final File storageDir = new File("./" + currentPeer.getId());
 
     //start a counter server
-    final File storageDir = new File("./" + currentPeer.getId());
-    final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
-    counterServer.start();
+    try(CounterServer counterServer = new CounterServer(currentPeer, storageDir)) {
+      counterServer.start();
 
-    //exit when any input entered
-    Scanner scanner = new Scanner(System.in, UTF_8.name());
-    scanner.nextLine();
-    counterServer.close();
+      //exit when any input entered
+      new Scanner(System.in, UTF_8.name()).nextLine();
+    }
   }
 }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index 7159ec1c5..d5a027910 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,10 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ratis.examples.counter.server;
 
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.examples.counter.CounterCommand;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
@@ -34,52 +35,81 @@ import org.apache.ratis.util.JavaUtils;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * State machine implementation for Counter server application. This class
- * maintain a {@link AtomicInteger} object as a state and accept two commands:
- * GET and INCREMENT, GET is a ReadOnly command which will be handled by
- * {@code query} method however INCREMENT is a transactional command which
- * will be handled by {@code applyTransaction}.
+ * A {@link org.apache.ratis.statemachine.StateMachine} implementation for the {@link CounterServer}.
+ * This class maintain a {@link AtomicInteger} object as a state and accept two commands:
+ *
+ * - {@link CounterCommand#GET} is a readonly command
+ *   which is handled by the {@link #query(Message)} method.
+ *
+ * - {@link CounterCommand#INCREMENT} is a transactional command
+ *   which is handled by the {@link #applyTransaction(TransactionContext)} method.
  */
 public class CounterStateMachine extends BaseStateMachine {
-  private final SimpleStateMachineStorage storage =
-      new SimpleStateMachineStorage();
-  private AtomicInteger counter = new AtomicInteger(0);
+  /** The state of the {@link CounterStateMachine}. */
+  static class CounterState {
+    private final TermIndex applied;
+    private final int counter;
+
+    CounterState(TermIndex applied, int counter) {
+      this.applied = applied;
+      this.counter = counter;
+    }
+
+    TermIndex getApplied() {
+      return applied;
+    }
+
+    int getCounter() {
+      return counter;
+    }
+  }
+
+  private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+  private final AtomicInteger counter = new AtomicInteger(0);
+
+  /** @return the current state. */
+  private synchronized CounterState getState() {
+    return new CounterState(getLastAppliedTermIndex(), counter.get());
+  }
+
+  private synchronized void updateState(TermIndex applied, int counterValue) {
+    updateLastAppliedTermIndex(applied);
+    counter.set(counterValue);
+  }
+
+  private synchronized int incrementCounter(TermIndex termIndex) {
+    updateLastAppliedTermIndex(termIndex);
+    return counter.incrementAndGet();
+  }
 
   /**
-   * initialize the state machine by initilize the state machine storage and
-   * calling the load method which reads the last applied command and restore it
-   * in counter object)
+   * Initialize the state machine storage and then load the state.
    *
-   * @param server      the current server information
-   * @param groupId     the cluster groupId
-   * @param raftStorage the raft storage which is used to keep raft related
-   *                    stuff
-   * @throws IOException if any error happens during load state
+   * @param server  the server running this state machine
+   * @param groupId the id of the {@link org.apache.ratis.protocol.RaftGroup}
+   * @param raftStorage the storage of the server
+   * @throws IOException if it fails to load the state.
    */
   @Override
-  public void initialize(RaftServer server, RaftGroupId groupId,
-                         RaftStorage raftStorage) throws IOException {
+  public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
     super.initialize(server, groupId, raftStorage);
-    this.storage.init(raftStorage);
-    load(storage.getLatestSnapshot());
+    storage.init(raftStorage);
+    reinitialize();
   }
 
   /**
-   * very similar to initialize method, but doesn't initialize the storage
-   * system because the state machine reinitialized from the PAUSE state and
-   * storage system initialized before.
+   * Simply load the latest snapshot.
    *
-   * @throws IOException if any error happens during load state
+   * @throws IOException if it fails to load the state.
    */
   @Override
   public void reinitialize() throws IOException {
@@ -87,124 +117,107 @@ public class CounterStateMachine extends BaseStateMachine {
   }
 
   /**
-   * Store the current state as an snapshot file in the stateMachineStorage.
+   * Store the current state as a snapshot file in the {@link #storage}.
    *
    * @return the index of the snapshot
    */
   @Override
   public long takeSnapshot() {
-    //get the last applied index
-    final TermIndex last = getLastAppliedTermIndex();
+    //get the current state
+    final CounterState state = getState();
+    final long index = state.getApplied().getIndex();
 
     //create a file with a proper name to store the snapshot
-    final File snapshotFile =
-        storage.getSnapshotFile(last.getTerm(), last.getIndex());
+    final File snapshotFile = storage.getSnapshotFile(state.getApplied().getTerm(), index);
 
-    //serialize the counter object and write it into the snapshot file
-    try (ObjectOutputStream out = new ObjectOutputStream(
-        new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
-      out.writeObject(counter);
+    //write the counter value into the snapshot file
+    try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(
+        Files.newOutputStream(snapshotFile.toPath())))) {
+      out.writeInt(state.getCounter());
     } catch (IOException ioe) {
       LOG.warn("Failed to write snapshot file \"" + snapshotFile
-          + "\", last applied index=" + last);
+          + "\", last applied index=" + state.getApplied());
     }
 
     //return the index of the stored snapshot (which is the last applied one)
-    return last.getIndex();
+    return index;
   }
 
   /**
-   * Load the state of the state machine from the storage.
+   * Load the state of the state machine from the {@link #storage}.
    *
-   * @param snapshot to load
+   * @param snapshot the information of the snapshot being loaded
    * @return the index of the snapshot or -1 if snapshot is invalid
-   * @throws IOException if any error happens during read from storage
+   * @throws IOException if it failed to read from storage
    */
   private long load(SingleFileSnapshotInfo snapshot) throws IOException {
-    //check the snapshot nullity
+    //check null
     if (snapshot == null) {
       LOG.warn("The snapshot info is null.");
       return RaftLog.INVALID_LOG_INDEX;
     }
-
-    //check the existance of the snapshot file
-    final File snapshotFile = snapshot.getFile().getPath().toFile();
-    if (!snapshotFile.exists()) {
-      LOG.warn("The snapshot file {} does not exist for snapshot {}",
-          snapshotFile, snapshot);
+    //check if the snapshot file exists.
+    final Path snapshotPath = snapshot.getFile().getPath();
+    if (!Files.exists(snapshotPath)) {
+      LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotPath, snapshot);
       return RaftLog.INVALID_LOG_INDEX;
     }
 
-    //load the TermIndex object for the snapshot using the file name pattern of
-    // the snapshot
-    final TermIndex last =
-        SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
-
-    //read the file and cast it to the AtomicInteger and set the counter
-    try (ObjectInputStream in = new ObjectInputStream(
-        new BufferedInputStream(new FileInputStream(snapshotFile)))) {
-      //set the last applied termIndex to the termIndex of the snapshot
-      setLastAppliedTermIndex(last);
-
-      //read, cast and set the counter
-      counter = JavaUtils.cast(in.readObject());
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException(e);
+    //read the TermIndex from the snapshot file name
+    final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotPath.toFile());
+
+    //read the counter value from the snapshot file
+    final int counterValue;
+    try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(Files.newInputStream(snapshotPath)))) {
+      counterValue = in.readInt();
     }
 
+    //update state
+    updateState(last, counterValue);
+
     return last.getIndex();
   }
 
   /**
-   * Handle GET command, which used by clients to get the counter value.
+   * Process {@link CounterCommand#GET}, which gets the counter value.
    *
-   * @param request the GET message
-   * @return the Message containing the current counter value
+   * @param request the GET request
+   * @return a {@link Message} containing the current counter value as a {@link String}.
    */
   @Override
   public CompletableFuture<Message> query(Message request) {
-    String msg = request.getContent().toString(Charset.defaultCharset());
-    if (!msg.equals("GET")) {
-      return CompletableFuture.completedFuture(
-          Message.valueOf("Invalid Command"));
+    final String command = request.getContent().toStringUtf8();
+    if (!CounterCommand.GET.matches(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
-    return CompletableFuture.completedFuture(
-        Message.valueOf(counter.toString()));
+    return CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
   }
 
   /**
-   * Apply the INCREMENT command by incrementing the counter object.
+   * Apply the {@link CounterCommand#INCREMENT} by incrementing the counter object.
    *
    * @param trx the transaction context
    * @return the message containing the updated counter value
    */
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+    final LogEntryProto entry = trx.getLogEntry();
 
     //check if the command is valid
-    String logData = entry.getStateMachineLogEntry().getLogData()
-        .toString(Charset.defaultCharset());
-    if (!logData.equals("INCREMENT")) {
-      return CompletableFuture.completedFuture(
-          Message.valueOf("Invalid Command"));
+    final String command = entry.getStateMachineLogEntry().getLogData().toStringUtf8();
+    if (!CounterCommand.INCREMENT.matches(command)) {
+      return JavaUtils.completeExceptionally(new IllegalArgumentException("Invalid Command: " + command));
     }
-    //update the last applied term and index
-    final long index = entry.getIndex();
-    updateLastAppliedTermIndex(entry.getTerm(), index);
-
-    //actual execution of the command: increment the counter
-    counter.incrementAndGet();
+    //increment the counter and update term-index
+    final TermIndex termIndex = TermIndex.valueOf(entry);
+    final long incremented = incrementCounter(termIndex);
 
-    //return the new value of the counter to the client
-    final CompletableFuture<Message> f =
-        CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
-
-    //if leader, log the incremented value and it's log index
-    if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) {
-      LOG.info("{}: Increment to {}", index, counter.toString());
+    //if leader, log the incremented value and the term-index
+    if (trx.getServerRole() == RaftPeerRole.LEADER) {
+      LOG.info("{}: Increment to {}", termIndex, incremented);
     }
 
-    return f;
+    //return the new value of the counter to the client
+    return CompletableFuture.completedFuture(Message.valueOf(String.valueOf(incremented)));
   }
 }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
index e63789117..953fd5bfa 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -29,7 +29,6 @@ import org.junit.Test;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.Collection;
 
 public class TestCounter extends ParameterizedBaseTest {
@@ -50,20 +49,17 @@ public class TestCounter extends ParameterizedBaseTest {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply1 = client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("10",
-          reply1.getMessage().getContent().toString(Charset.defaultCharset()));
+      Assert.assertEquals("10", reply1.getMessage().getContent().toStringUtf8());
       for (int i = 0; i < 10; i++) {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply2 = client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("20",
-          reply2.getMessage().getContent().toString(Charset.defaultCharset()));
+      Assert.assertEquals("20", reply2.getMessage().getContent().toStringUtf8());
       for (int i = 0; i < 10; i++) {
         client.io().send(Message.valueOf("INCREMENT"));
       }
       RaftClientReply reply3 = client.io().sendReadOnly(Message.valueOf("GET"));
-      Assert.assertEquals("30",
-          reply3.getMessage().getContent().toString(Charset.defaultCharset()));
+      Assert.assertEquals("30", reply3.getMessage().getContent().toStringUtf8());
     }
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 9cc4c5b21..629a55a67 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -132,9 +132,12 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
     updateLastAppliedTermIndex(term, index);
   }
 
-  @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
   protected boolean updateLastAppliedTermIndex(long term, long index) {
-    final TermIndex newTI = TermIndex.valueOf(term, index);
+    return updateLastAppliedTermIndex(TermIndex.valueOf(term, index));
+  }
+
+  protected boolean updateLastAppliedTermIndex(TermIndex newTI) {
+    Objects.requireNonNull(newTI, "newTI == null");
     final TermIndex oldTI = lastAppliedTermIndex.getAndSet(newTI);
     if (!newTI.equals(oldTI)) {
       LOG.trace("{}: update lastAppliedTermIndex from {} to {}", getId(), oldTI, newTI);
@@ -147,7 +150,7 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
     }
 
     synchronized (transactionFutures) {
-      for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= index; ) {
+      for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= newTI.getIndex(); ) {
         transactionFutures.remove(i).complete(null);
       }
     }


[ratis] 02/12: RATIS-1639. Added getting started document (#696)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 8d44e8893a42272645375e54aa8ce814f5db90b0
Author: Riguz Lee <so...@gmail.com>
AuthorDate: Thu Jul 28 02:07:40 2022 +0800

    RATIS-1639. Added getting started document (#696)
    
    
    (cherry picked from commit 95243a7ac501567d5c76dfaadf59b69a4a28fa03)
---
 ratis-docs/src/site/markdown/index.md       |  42 +++-
 ratis-docs/src/site/markdown/start/index.md | 338 +++++++++++++++++++++++++++-
 2 files changed, 366 insertions(+), 14 deletions(-)

diff --git a/ratis-docs/src/site/markdown/index.md b/ratis-docs/src/site/markdown/index.md
index 5af7adaa6..d5b31e516 100644
--- a/ratis-docs/src/site/markdown/index.md
+++ b/ratis-docs/src/site/markdown/index.md
@@ -17,12 +17,36 @@
 
 # Apache Ratis
 Apache Ratis is a highly customizable Raft protocol implementation in Java.
-Raft is a easily understandable consensus algorithm to manage replicated state.
-Apache Ratis could be used in any Java application where state should be replicated between multiple instances.
-
-## Ratis Features
-TODO: complete this section
-#### Multi-group servers
-TODO: complete this section
-#### Separate RAFT log storage from actual data (client-data)
-TODO: complete this section
\ No newline at end of file
+[Raft](https://raft.github.io/) is an easily understandable consensus algorithm to manage replicated state. 
+
+The Ratis project was started at 2016,
+entered Apache incubation in 2017,
+and graduated as a top level Apache project on Feb 17, 2021.
+Originally, Ratis was built for using Raft in [Apache Ozone](https://ozone.apache.org)
+in order to replicate raw data and to provide high availability.
+The correctness and the performance of Ratis have been heavily tested with Ozone.
+
+## Pluggability
+
+Unlike many other raft implementations,
+Ratis is designed to be pluggable,
+it could be used in any Java applications
+where state should be replicated between multiple instances.
+Ratis provides abstractions over Raft protocol for users,
+which make Raft library fully decoupled from the applications.
+
+### Pluggable transport
+Ratis provides a pluggable transport layer. 
+Applications may use their own implementation.
+By default, gRPC, Netty+Protobuf and Apache Hadoop RPC based transports are provided.
+
+### Pluggable state machine
+Ratis supports a log and state machine. 
+State machine typically contains the data that you want to make highly available.
+Applications usually define its own state machine for the application logic.
+Ratis makes it easy to use your own state machine.
+
+### Pluggable raft log
+Raft log is also pluggable, 
+users can provide their own log implementation. 
+The default implementation stores log in local files.
diff --git a/ratis-docs/src/site/markdown/start/index.md b/ratis-docs/src/site/markdown/start/index.md
index 88eddaf5c..7aca21f9b 100644
--- a/ratis-docs/src/site/markdown/start/index.md
+++ b/ratis-docs/src/site/markdown/start/index.md
@@ -16,11 +16,339 @@
 -->
 
 # Getting Started
-TODO: complete this section
-### Add the dependency
+Let's get started to use Raft in your application.
+To demonstrate how to use Ratis,
+we'll implement a simple counter service,
+which maintains a counter value across a cluster.
+The client could send the following types of commands to the cluster:
 
-### Create Your StateMachine
+* `INCREMENT`: increase the counter value
+* `GET`: query the current value of the counter,
+we call such kind of commands as read-only commands
 
-### Build and Start a RaftServer
+Note: The full source could be found at [Ratis examples](https://github.com/apache/ratis/tree/master/ratis-examples).
+This article is mainly intended to show the steps of integration of Ratis,
+if you wish to run this example or find more examples,
+please refer to [the README](https://github.com/apache/ratis/tree/master/ratis-examples#example-3-counter).
 
-### Configuration
+## Add the dependency
+
+First, we need to add Ratis dependencies into the project,
+it's available in maven central:
+
+```xml
+<dependency>
+   <artifactId>ratis-server</artifactId>
+   <groupId>org.apache.ratis</groupId>
+</dependency>
+```
+
+Also, one of the following transports need to be added:
+
+* grpc
+* netty
+* hadoop
+
+For example, let's use grpc transport:
+
+```xml
+<dependency>
+   <artifactId>ratis-grpc</artifactId>
+   <groupId>org.apache.ratis</groupId>
+</dependency>
+```
+
+Please note that Apache Hadoop dependencies are shaded,
+so it’s safe to use hadoop transport with different versions of Hadoop.
+
+## Create the StateMachine
+A state machine is used to maintain the current state of the raft node,
+the state machine is responsible for:
+
+* Execute raft logs to get the state. In this example, when a `INCREMENT` log is executed, 
+the counter value will be increased by 1.
+And a `GET` log does not affect the state but only returns the current counter value to the client.
+* Managing snapshots loading/saving.
+Snapshots are used to speed the log execution,
+the state machine could start from a snapshot point and only execute newer logs.
+
+### Define the StateMachine
+To define our state machine,
+we can extend a class from the base class `BaseStateMachine`.
+
+Also, a storage is needed to store snapshots,
+and we'll use the build-in `SimpleStateMachineStorage`,
+which is a file-based storage implementation.
+
+Since we're going to implement a count server,
+the `counter` instance is defined in the state machine,
+represents the current value.
+Below is the declaration of the state machine: 
+
+```java
+public class CounterStateMachine extends BaseStateMachine {
+    private final SimpleStateMachineStorage storage =
+            new SimpleStateMachineStorage();
+    private AtomicInteger counter = new AtomicInteger(0);
+    // ...
+}
+```
+
+### Apply Raft Log Item
+
+Once the raft log is committed,
+Ratis will notify state machine by invoking the `public CompletableFuture<Message> applyTransaction(TransactionContext trx)` method,
+and we need to override this method to decode the message and apply it. 
+
+First, get the log content and decode it:
+
+```java
+public class CounterStateMachine extends BaseStateMachine {
+    // ...
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+        final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+        String logData = entry.getStateMachineLogEntry().getLogData()
+                .toString(Charset.defaultCharset());
+        if (!logData.equals("INCREMENT")) {
+            return CompletableFuture.completedFuture(
+                    Message.valueOf("Invalid Command"));
+        }
+        // ...
+    }
+}
+```
+
+After that, if the log is valid,
+we could apply it by increasing the counter value.
+Remember that we also need to update the committed indexes:
+
+```java
+public class CounterStateMachine extends BaseStateMachine {
+    // ...
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+        // ...
+        final long index = entry.getIndex();
+        updateLastAppliedTermIndex(entry.getTerm(), index);
+
+        //actual execution of the command: increment the counter
+        counter.incrementAndGet();
+
+        //return the new value of the counter to the client
+        final CompletableFuture<Message> f =
+                CompletableFuture.completedFuture(Message.valueOf(counter.toString()));
+        return f;
+    }
+}
+```
+
+### Handle Readonly Command
+Note that we only handled `INCREMENT` command,
+what about the `GET` command?
+The `GET` command is implemented as a readonly command,
+so we'll need to implement `public CompletableFuture<Message> query(Message request)` instead of `applyTransaction`.
+
+```java
+public class CounterStateMachine extends BaseStateMachine {
+    // ...
+    @Override
+    public CompletableFuture<Message> query(Message request) {
+        String msg = request.getContent().toString(Charset.defaultCharset());
+        if (!msg.equals("GET")) {
+            return CompletableFuture.completedFuture(
+                    Message.valueOf("Invalid Command"));
+        }
+        return CompletableFuture.completedFuture(
+                Message.valueOf(counter.toString()));
+    }
+}
+```
+
+### Save and Load Snapshots
+When taking a snapshot,
+we persist every state in the state machine,
+and the value could be loaded directly to the state in the future.
+In this example,
+the only state is the counter value,
+we're going to use `ObjectOutputStream` to write it to a snapshot file:
+
+```java
+public class CounterStateMachine extends BaseStateMachine {
+    // ...
+    @Override
+    public long takeSnapshot() {
+        //get the last applied index
+        final TermIndex last = getLastAppliedTermIndex();
+
+        //create a file with a proper name to store the snapshot
+        final File snapshotFile =
+                storage.getSnapshotFile(last.getTerm(), last.getIndex());
+
+        //serialize the counter object and write it into the snapshot file
+        try (ObjectOutputStream out = new ObjectOutputStream(
+                new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
+            out.writeObject(counter);
+        } catch (IOException ioe) {
+            LOG.warn("Failed to write snapshot file \"" + snapshotFile
+                    + "\", last applied index=" + last);
+        }
+
+        //return the index of the stored snapshot (which is the last applied one)
+        return last.getIndex();
+    }
+}
+```
+
+When loading it,
+we could use `ObjectInputStream` to deserialize it.
+Remember that we also need to implement `initialize` and `reinitialize` method,
+so that the state machine will be correctly initialized.
+
+## Build and Start a RaftServer
+In order to build a raft cluster,
+each node must start a `RaftServer` instance,
+which is responsible for communicating to each other through Raft protocol.
+
+It's important to keep in mind that,
+each raft server knows exactly how many raft peers are in the cluster,
+and what are the addresses of them.
+In this example, we'll set a 3 node cluster.
+For simplicity,
+each peer listens to specific port on the same machine,
+and we can define the addresses of the cluster in a configuration file:
+
+```properties
+raft.server.address.list=127.0.0.1:10024,127.0.0.1:10124,127.0.0.1:11124
+```
+
+We name those peers as 'n-0', 'n-1' and 'n-2',
+and then we will create a `RaftGroup` instance representing them.
+Since they are immutable,
+we'll put them in the `Constant` class:
+
+```java
+public final class Constants {
+    public static final List<RaftPeer> PEERS;
+    private static final UUID CLUSTER_GROUP_ID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
+
+    static {
+        // load addresses from configuration file
+        // final String[] addresses = ...
+        List<RaftPeer> peers = new ArrayList<>(addresses.length);
+        for (int i = 0; i < addresses.length; i++) {
+            peers.add(RaftPeer.newBuilder().setId("n" + i).setAddress(addresses[i]).build());
+        }
+        PEERS = Collections.unmodifiableList(peers);
+    }
+    
+    public static final RaftGroup RAFT_GROUP = RaftGroup.valueOf(
+            RaftGroupId.valueOf(Constants.CLUSTER_GROUP_ID), PEERS);
+    // ...
+}
+```
+
+Except for the cluster info,
+another important thing is that we need to know the information of the current peer.
+To achieve this,
+we could pass the current peer's id as a program argument,
+and then the raft server could be created:
+
+```java
+public final class CounterServer implements Closeable {
+    private final RaftServer server;
+
+    // the current peer will be passed as argument
+    public CounterServer(RaftPeer peer, File storageDir) throws IOException {
+        // ...
+        CounterStateMachine counterStateMachine = new CounterStateMachine();
+
+        //create and start the Raft server
+        this.server = RaftServer.newBuilder()
+                .setGroup(Constants.RAFT_GROUP)
+                .setProperties(properties)
+                .setServerId(peer.getId())
+                .setStateMachine(counterStateMachine)
+                .build();
+    }
+
+    public void start() throws IOException {
+        server.start();
+    }
+}
+```
+
+Each `RaftServer` will own a `CounterStateMachine` instance,
+as previously defined by us.
+After that, all we need to do is to start it along with our application:
+
+```java
+public final class CounterServer implements Closeable {
+    // ...
+    public static void main(String[] args) throws IOException {
+        // ...
+        //find current peer object based on application parameter
+        final RaftPeer currentPeer = Constants.PEERS.get(Integer.parseInt(args[0]) - 1);
+
+        //start a counter server
+        final File storageDir = new File("./" + currentPeer.getId());
+        final CounterServer counterServer = new CounterServer(currentPeer, storageDir);
+        counterServer.start();
+        // ...
+    }
+
+}
+```
+
+After the server is started,
+it will try to communicate with other peers in the cluster,
+and perform raft actions like leader election, append log entries, etc.
+
+## Build Raft Client
+
+To send commands to the cluster,
+we need to use a `RaftClient` instance.
+All we need to know is the peers in the cluster, ie. the raft group. 
+
+```java
+public final class CounterClient {
+    // ...
+    private static RaftClient buildClient() {
+        RaftProperties raftProperties = new RaftProperties();
+        RaftClient.Builder builder = RaftClient.newBuilder()
+                .setProperties(raftProperties)
+                .setRaftGroup(Constants.RAFT_GROUP)
+                .setClientRpc(
+                        new GrpcFactory(new Parameters())
+                                .newRaftClientRpc(ClientId.randomId(), raftProperties));
+        return builder.build();
+    }
+}
+```
+
+With this raft client,
+we can then send commands by `raftClient.io().send` method,
+and use `raftClient.io().sendReadonly` method for read only commands.
+In this example,
+to send `INCREMENT` and `GET` command,
+we can do it like this:
+
+```java
+raftClient.io().send(Message.valueOf("INCREMENT")));
+```
+
+and
+
+```java
+RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
+String response = count.getMessage().getContent().toString(Charset.defaultCharset());
+System.out.println(response);
+```
+
+## Summary
+It might seem a little complicated for beginners,
+but since Raft itself is a hard topic,
+this is already the simplest example we've found as a 'Hello World' for Ratis.
+After you have a basic understanding of Ratis,
+you'll find it really easy to be integrated into any projects. 
+
+Next, you can take a look at other [examples](https://github.com/apache/ratis/tree/master/ratis-examples),
+to know more about the features of Ratis.
\ No newline at end of file


[ratis] 07/12: RATIS-1661. Support configurable hostname in GrpcService (#707)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 396d696540d9acb8f959b594f8dcb44bc27cbbe1
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Aug 9 01:02:47 2022 +0800

    RATIS-1661. Support configurable hostname in GrpcService (#707)
    
    
    (cherry picked from commit 485c7eccd036ea9afb3750ba77ab325bc76b0037)
---
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 34 ++++++++++++++++++++++
 .../org/apache/ratis/grpc/server/GrpcService.java  | 30 ++++++++++++-------
 2 files changed, 53 insertions(+), 11 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index b227dfb37..8163d18de 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -97,6 +97,16 @@ public interface GrpcConfigKeys {
   interface Admin {
     String PREFIX = GrpcConfigKeys.PREFIX + ".admin";
 
+    String HOST_KEY = PREFIX + ".host";
+    String HOST_DEFAULT = null;
+    static String host(RaftProperties properties) {
+      final String fallbackServerHost = Server.host(properties, null);
+      return get(properties::get, HOST_KEY, HOST_DEFAULT, Server.HOST_KEY, fallbackServerHost, getDefaultLog());
+    }
+    static void setHost(RaftProperties properties, String host) {
+      set(properties::set, HOST_KEY, host);
+    }
+
     String PORT_KEY = PREFIX + ".port";
     int PORT_DEFAULT = -1;
     static int port(RaftProperties properties) {
@@ -121,6 +131,16 @@ public interface GrpcConfigKeys {
   interface Client {
     String PREFIX = GrpcConfigKeys.PREFIX + ".client";
 
+    String HOST_KEY = PREFIX + ".host";
+    String HOST_DEFAULT = null;
+    static String host(RaftProperties properties) {
+      final String fallbackServerHost = Server.host(properties, null);
+      return get(properties::get, HOST_KEY, HOST_DEFAULT, Server.HOST_KEY, fallbackServerHost, getDefaultLog());
+    }
+    static void setHost(RaftProperties properties, String host) {
+      set(properties::set, HOST_KEY, host);
+    }
+
     String PORT_KEY = PREFIX + ".port";
     int PORT_DEFAULT = -1;
     static int port(RaftProperties properties) {
@@ -145,6 +165,20 @@ public interface GrpcConfigKeys {
   interface Server {
     String PREFIX = GrpcConfigKeys.PREFIX + ".server";
 
+    String HOST_KEY = PREFIX + ".host";
+    String HOST_DEFAULT = null;
+    static String host(RaftProperties properties) {
+      return host(properties, getDefaultLog());
+    }
+
+    static String host(RaftProperties properties, Consumer<String> logger) {
+      return get(properties::get, HOST_KEY, HOST_DEFAULT, logger);
+    }
+
+    static void setHost(RaftProperties properties, String host) {
+      set(properties::set, HOST_KEY, host);
+    }
+
     String PORT_KEY = PREFIX + ".port";
     int PORT_DEFAULT = 0;
     static int port(RaftProperties properties) {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 9d65cba39..40e413915 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -120,9 +120,15 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
   private GrpcService(RaftServer server,
       GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) {
     this(server, server::getId,
-        GrpcConfigKeys.Admin.port(server.getProperties()), adminTlsConfig,
-        GrpcConfigKeys.Client.port(server.getProperties()), clientTlsConfig,
-        GrpcConfigKeys.Server.port(server.getProperties()), serverTlsConfig,
+        GrpcConfigKeys.Admin.host(server.getProperties()),
+        GrpcConfigKeys.Admin.port(server.getProperties()),
+        adminTlsConfig,
+        GrpcConfigKeys.Client.host(server.getProperties()),
+        GrpcConfigKeys.Client.port(server.getProperties()),
+        clientTlsConfig,
+        GrpcConfigKeys.Server.host(server.getProperties()),
+        GrpcConfigKeys.Server.port(server.getProperties()),
+        serverTlsConfig,
         GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
         RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
         GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
@@ -132,9 +138,9 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
 
   @SuppressWarnings("checkstyle:ParameterNumber") // private constructor
   private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
-      int adminPort, GrpcTlsConfig adminTlsConfig,
-      int clientPort, GrpcTlsConfig clientTlsConfig,
-      int serverPort, GrpcTlsConfig serverTlsConfig,
+      String adminHost, int adminPort, GrpcTlsConfig adminTlsConfig,
+      String clientHost, int clientPort, GrpcTlsConfig clientTlsConfig,
+      String serverHost, int serverPort, GrpcTlsConfig serverTlsConfig,
       SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
       SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
       boolean useSeparateHBChannel) {
@@ -163,7 +169,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
     final boolean separateClientServer = clientPort != serverPort && clientPort > 0;
 
     final NettyServerBuilder serverBuilder =
-        startBuildingNettyServer(serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow);
+        startBuildingNettyServer(serverHost, serverPort, serverTlsConfig, grpcMessageSizeMax, flowControlWindow);
     serverBuilder.addService(ServerInterceptors.intercept(
         new GrpcServerProtocolService(idSupplier, raftServer), serverInterceptor));
     if (!separateAdminServer) {
@@ -179,7 +185,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
 
     if (separateAdminServer) {
       final NettyServerBuilder builder =
-          startBuildingNettyServer(adminPort, adminTlsConfig, grpcMessageSizeMax, flowControlWindow);
+          startBuildingNettyServer(adminHost, adminPort, adminTlsConfig, grpcMessageSizeMax, flowControlWindow);
       addAdminService(raftServer, builder);
       final Server adminServer = builder.build();
       servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
@@ -190,7 +196,7 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
 
     if (separateClientServer) {
       final NettyServerBuilder builder =
-          startBuildingNettyServer(clientPort, clientTlsConfig, grpcMessageSizeMax, flowControlWindow);
+          startBuildingNettyServer(clientHost, clientPort, clientTlsConfig, grpcMessageSizeMax, flowControlWindow);
       addClientService(builder);
       final Server clientServer = builder.build();
       servers.put(GrpcClientProtocolService.class.getName(), clientServer);
@@ -214,9 +220,11 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
           serverInterceptor));
   }
 
-  private static NettyServerBuilder startBuildingNettyServer(int port, GrpcTlsConfig tlsConfig,
+  private static NettyServerBuilder startBuildingNettyServer(String hostname, int port, GrpcTlsConfig tlsConfig,
       SizeInBytes grpcMessageSizeMax, SizeInBytes flowControlWindow) {
-    NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
+    InetSocketAddress address = hostname == null || hostname.isEmpty() ?
+        new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
+    NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address)
         .withChildOption(ChannelOption.SO_REUSEADDR, true)
         .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
         .flowControlWindow(flowControlWindow.getSizeInt());


[ratis] 01/12: RATIS-1640. Add unit-test of listener related to setConfiguration and takeSnapshot (#697)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit ce5357a2eac6aee4e824649776e916b6dbd0fe22
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Wed Jul 27 02:09:05 2022 +0800

    RATIS-1640. Add unit-test of listener related to setConfiguration and takeSnapshot (#697)
    
    
    (cherry picked from commit d3a0f9491f17462555c8fe522cbdc2ea4c88ef3b)
---
 .../ratis/server/impl/LeaderElectionTests.java     | 88 ++++++++++++++++++++++
 .../apache/ratis/server/impl/MiniRaftCluster.java  |  6 ++
 .../ratis/statemachine/SnapshotManagementTest.java | 30 ++++++++
 3 files changed, 124 insertions(+)

diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index b988d3f4f..6b5d04b24 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -23,6 +23,7 @@ import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -46,12 +47,16 @@ import org.junit.Test;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LAST_LEADER_ELECTION_ELAPSED_TIME;
@@ -312,6 +317,89 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
     cluster.setBlockRequestsFrom(id.toString(), false);
   }
 
+  @Test
+  public void testAddListener() throws Exception {
+    try (final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        List<RaftPeer> servers = cluster.getPeers();
+        Assert.assertEquals(servers.size(), 3);
+        MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1,
+            true, false, RaftProtos.RaftPeerRole.LISTENER);
+        RaftClientReply reply = client.admin().setConfiguration(servers, Arrays.asList(changes.newPeers));
+        Assert.assertTrue(reply.isSuccess());
+        Collection<RaftPeer> listener =
+            leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+        Assert.assertEquals(1, listener.size());
+        Assert.assertEquals(changes.newPeers[0].getId(), new ArrayList<>(listener).get(0).getId());
+      }
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testRemoveListener() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(3,1)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        Assert.assertEquals(1, cluster.getListeners().size());
+        List<RaftPeer> servers = cluster.getFollowers().stream().map(RaftServer.Division::getPeer).collect(
+            Collectors.toList());
+        servers.add(leader.getPeer());
+        RaftClientReply reply = client.admin().setConfiguration(servers);
+        Assert.assertTrue(reply.isSuccess());
+        Assert.assertEquals(0, leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size());
+      }
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testChangeFollowerToListener() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient()) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        List<RaftPeer> followers = cluster.getFollowers().stream().map(
+            RaftServer.Division::getPeer).collect(Collectors.toList());
+        Assert.assertEquals(2, followers.size());
+        List<RaftPeer> listeners = new ArrayList<>();
+        listeners.add(followers.get(1));
+        followers.remove(1);
+        RaftClientReply reply = client.admin().setConfiguration(followers, listeners);
+        Assert.assertTrue(reply.isSuccess());
+        Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+        Assert.assertEquals(1, peer.size());
+        Assert.assertEquals(listeners.get(0).getId(), new ArrayList<>(peer).get(0).getId());
+      }
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testChangeListenerToFollower() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(2, 1)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        List<RaftPeer> listeners = cluster.getListeners()
+            .stream().map(RaftServer.Division::getPeer).collect(Collectors.toList());
+        Assert.assertEquals(listeners.size(), 1);
+        RaftClientReply reply = client.admin().setConfiguration(cluster.getPeers());
+        Assert.assertTrue(reply.isSuccess());
+        Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+        Assert.assertEquals(0, peer.size());
+      }
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testLeaderElectionMetrics() throws IOException, InterruptedException {
     Timestamp timestamp = Timestamp.currentTime();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index f5cd38b36..1f4047524 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -656,6 +656,12 @@ public abstract class MiniRaftCluster implements Closeable {
         .collect(Collectors.toList());
   }
 
+  public List<RaftServer.Division> getListeners() {
+    return getServerAliveStream()
+        .filter(server -> server.getInfo().isListener())
+        .collect(Collectors.toList());
+  }
+
   public int getNumServers() {
     return servers.size();
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
index 608407786..c821f36c4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
@@ -143,4 +143,34 @@ public abstract class SnapshotManagementTest<CLUSTER extends MiniRaftCluster>
         .getStateMachineStorage().getSnapshotFile(follower.getInfo().getCurrentTerm(), snapshotIndex);
     Assert.assertTrue(snapshotFile.exists());
   }
+
+
+  @Test
+  public void testReceiveLogAndTakeSnapshotOnListener() throws Exception {
+    runWithNewCluster(2, 1, this::runTestReceiveLogAndTakeSnapshotOnListener);
+  }
+
+  void runTestReceiveLogAndTakeSnapshotOnListener(CLUSTER cluster) throws Exception {
+    final RaftClientReply snapshotReply;
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final RaftServer.Division listener = cluster.getListeners().get(0);
+    final RaftPeerId listenerId = listener.getId();
+    Assert.assertTrue(listener.getInfo().isListener());
+    try (final RaftClient client = cluster.createClient(listenerId)) {
+      for (int i = 0; i < RaftServerConfigKeys.Snapshot.creationGap(getProperties()); i++) {
+        RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+        Assert.assertTrue(reply.isSuccess());
+      }
+      snapshotReply = client.getSnapshotManagementApi(listenerId).create(3000);
+    }
+
+    Assert.assertTrue(snapshotReply.isSuccess());
+    final long snapshotIndex = snapshotReply.getLogIndex();
+    LOG.info("snapshotIndex = {} on {} server {}",
+        snapshotIndex, listener.getInfo().getCurrentRole(), listener.getId());
+
+    final File snapshotFile = SimpleStateMachine4Testing.get(listener)
+        .getStateMachineStorage().getSnapshotFile(listener.getInfo().getCurrentTerm(), snapshotIndex);
+    Assert.assertTrue(snapshotFile.exists());
+  }
 }


[ratis] 04/12: RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 6cd8ef3d945eb99e6f553729973447336bb344e0
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Aug 5 00:55:48 2022 -0700

    RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703)
    
    
    (cherry picked from commit d9aa480c53a69141aeb5bb7f3279ed843c12f322)
---
 .../org/apache/ratis/util/CollectionUtils.java     | 14 +++++-
 .../java/org/apache/ratis/util/SlidingWindow.java  | 58 ++++++++++++++++------
 2 files changed, 55 insertions(+), 17 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index cdfd9635b..db0c6fd93 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.util;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -103,8 +104,17 @@ public interface CollectionUtils {
   }
 
   static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<Object> name) {
-    final V returned = map.put(key, value);
-    Preconditions.assertTrue(returned == null,
+    return putNew(key, value, map::put, name);
+  }
+
+  /** For the case that key and value are the same object. */
+  static <K> void putNew(K key, Function<K, K> putMethod, Supplier<Object> name) {
+    putNew(key, key, (k, v) -> putMethod.apply(k), name);
+  }
+
+  static <K, V> V putNew(K key, V value, BiFunction<K, V, V> putMethod, Supplier<Object> name) {
+    final V returned = putMethod.apply(key, value);
+    Preconditions.assertNull(returned,
         () -> "Entry already exists for key " + key + " in map " + name.get());
     return value;
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 43b1efcdb..316604db0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -17,13 +17,14 @@
  */
 package org.apache.ratis.util;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -63,14 +64,13 @@ public interface SlidingWindow {
 
   /** A seqNum-to-request map, sorted by seqNum. */
   class RequestMap<REQUEST extends Request<REPLY>, REPLY> implements Iterable<REQUEST> {
-    private static boolean logRepeatedly = false;
     private final Object name;
     /** Request map: seqNum -> request */
     private final SortedMap<Long, REQUEST> requests = new ConcurrentSkipListMap<>();
 
     RequestMap(Object name) {
       this.name = name;
-      if (logRepeatedly && LOG.isDebugEnabled()) {
+      if (LOG.isDebugEnabled()) {
         JavaUtils.runRepeatedly(this::log, 5, 10, TimeUnit.SECONDS);
       }
     }
@@ -185,6 +185,33 @@ public interface SlidingWindow {
     }
   }
 
+  class DelayedRequests {
+    private final SortedMap<Long, Long> sorted = new TreeMap<>();
+
+    synchronized Long put(Long seqNum) {
+      return sorted.put(seqNum, seqNum);
+    }
+
+    synchronized boolean containsKey(long seqNum) {
+      return sorted.containsKey(seqNum);
+    }
+
+    synchronized List<Long> getAllAndClear() {
+      final List<Long> keys = new ArrayList<>(sorted.keySet());
+      sorted.clear();
+      return keys;
+    }
+
+    synchronized Long remove(long seqNum) {
+      return sorted.remove(seqNum);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return "" + sorted.keySet();
+    }
+  }
+
   /**
    * Client side sliding window.
    * A client may
@@ -200,7 +227,7 @@ public interface SlidingWindow {
     /** The requests in the sliding window. */
     private final RequestMap<REQUEST, REPLY> requests;
     /** Delayed requests. */
-    private final SortedMap<Long, Long> delayedRequests = new TreeMap<>();
+    private final DelayedRequests delayedRequests = new DelayedRequests();
 
     /** The seqNum for the next new request. */
     private long nextSeqNum = 1;
@@ -214,9 +241,14 @@ public interface SlidingWindow {
     public Client(Object name) {
       this.requests = new RequestMap<REQUEST, REPLY>(getName(getClass(), name)) {
         @Override
-        @SuppressFBWarnings("IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD")
-        synchronized void log() {
-          LOG.debug(toString());
+        void log() {
+          if (LOG.isDebugEnabled()) {
+            logDebug();
+          }
+        }
+
+        synchronized void logDebug() {
+          LOG.debug(super.toString());
           for (REQUEST r : requests) {
             LOG.debug("  {}: {}", r.getSeqNum(), r.hasReply() ? "replied"
                 : delayedRequests.containsKey(r.getSeqNum()) ? "delayed" : "submitted");
@@ -229,7 +261,7 @@ public interface SlidingWindow {
     public synchronized String toString() {
       return requests + ", nextSeqNum=" + nextSeqNum
           + ", firstSubmitted=" + firstSeqNum + ", replied? " + firstReplied
-          + ", delayed=" + delayedRequests.keySet();
+          + ", delayed=" + delayedRequests;
     }
 
     /**
@@ -282,7 +314,7 @@ public interface SlidingWindow {
       }
 
       // delay other requests
-      CollectionUtils.putNew(seqNum, seqNum, delayedRequests, () -> requests.getName() + ":delayedRequests");
+      CollectionUtils.putNew(seqNum, delayedRequests::put, () -> requests.getName() + ":delayedRequests");
       return false;
     }
 
@@ -326,12 +358,8 @@ public interface SlidingWindow {
     private void trySendDelayed(Consumer<REQUEST> sendMethod) {
       if (firstReplied) {
         // after first received, all other requests can be submitted (out-of-order)
-        if (!delayedRequests.isEmpty()) {
-          for (Long seqNum : delayedRequests.keySet()) {
-            sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed"));
-          }
-          delayedRequests.clear();
-        }
+        delayedRequests.getAllAndClear().forEach(
+            seqNum -> sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed")));
       } else {
         // Otherwise, submit the first only if it is a delayed request
         final Iterator<REQUEST> i = requests.iterator();


[ratis] 09/12: RATIS-1665. RaftLog avoid converting list (#708)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit d80d955204b1f2542a4a7fd75b9c1e720450944b
Author: jiangyuan <ji...@baidu.com>
AuthorDate: Tue Aug 9 13:35:51 2022 +0800

    RATIS-1665. RaftLog avoid converting list (#708)
    
    
    (cherry picked from commit 38a01cdc39b32e69c6c89c8423904842bf7d3cb8)
---
 .../ratis/server/raftlog/RaftLogSequentialOps.java | 13 +++++-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 48 ++++++++++++----------
 .../org/apache/ratis/server/impl/ServerState.java  |  8 ++--
 .../apache/ratis/server/raftlog/LogProtoUtils.java |  7 +---
 .../apache/ratis/server/raftlog/RaftLogBase.java   |  4 +-
 .../ratis/server/raftlog/memory/MemoryRaftLog.java | 21 +++++-----
 .../server/raftlog/segmented/SegmentedRaftLog.java | 12 +++---
 .../raftlog/segmented/SegmentedRaftLogCache.java   | 15 +++----
 .../server/raftlog/memory/MemoryRaftLogTest.java   | 30 +++++++-------
 .../raftlog/segmented/TestCacheEviction.java       |  6 +--
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  2 +-
 12 files changed, 93 insertions(+), 75 deletions(-)

diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index 34a8f8952..32bd564e0 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -25,6 +25,7 @@ import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.StringUtils;
 import org.apache.ratis.util.function.CheckedSupplier;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
@@ -120,6 +121,16 @@ interface RaftLogSequentialOps {
    */
   CompletableFuture<Long> appendEntry(LogEntryProto entry);
 
+  /**
+   * The same as append(Arrays.asList(entries)).
+   *
+   * @deprecated use {@link #append(List)}
+   */
+  @Deprecated
+  default List<CompletableFuture<Long>> append(LogEntryProto... entries) {
+    return append(Arrays.asList(entries));
+  }
+
   /**
    * Append asynchronously all the given log entries.
    * Used by the followers.
@@ -127,7 +138,7 @@ interface RaftLogSequentialOps {
    * If an existing entry conflicts with a new one (same index but different terms),
    * delete the existing entry and all entries that follow it (§5.3).
    */
-  List<CompletableFuture<Long>> append(LogEntryProto... entries);
+  List<CompletableFuture<Long>> append(List<LogEntryProto> entries);
 
   /**
    * Truncate asynchronously the log entries till the given index (inclusively).
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 9e170e933..fbcbce448 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -310,7 +310,7 @@ class LeaderStateImpl implements LeaderState {
         server.getRaftConf(), server.getState().getCurrentTerm(), raftLog.getNextIndex());
     CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
         server.getId().toString(), null);
-    raftLog.append(placeHolder);
+    raftLog.append(Collections.singletonList(placeHolder));
     processor.start();
     senders.forEach(LogAppender::start);
     return placeHolder;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6ab0a6a5c..4c798d9d5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1236,9 +1236,9 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   private void validateEntries(long expectedTerm, TermIndex previous,
-      LogEntryProto... entries) {
-    if (entries != null && entries.length > 0) {
-      final long index0 = entries[0].getIndex();
+      List<LogEntryProto> entries) {
+    if (entries != null && !entries.isEmpty()) {
+      final long index0 = entries.get(0).getIndex();
 
       if (previous == null || previous.getTerm() == 0) {
         Preconditions.assertTrue(index0 == 0,
@@ -1250,13 +1250,14 @@ class RaftServerImpl implements RaftServer.Division,
             previous, 0, index0);
       }
 
-      for (int i = 0; i < entries.length; i++) {
-        final long t = entries[i].getTerm();
+      for (int i = 0; i < entries.size(); i++) {
+        LogEntryProto entry = entries.get(i);
+        final long t = entry.getTerm();
         Preconditions.assertTrue(expectedTerm >= t,
             "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
             i, t, expectedTerm);
 
-        final long indexi = entries[i].getIndex();
+        final long indexi = entry.getIndex();
         Preconditions.assertTrue(indexi == index0 + i,
             "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
             i, indexi, index0);
@@ -1277,10 +1278,8 @@ class RaftServerImpl implements RaftServer.Division,
   @Override
   public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto r)
       throws IOException {
-    // TODO avoid converting list to array
     final RaftRpcRequestProto request = r.getServerRequest();
-    final LogEntryProto[] entries = r.getEntriesList()
-        .toArray(new LogEntryProto[r.getEntriesCount()]);
+    final List<LogEntryProto> entries = r.getEntriesList();
     final TermIndex previous = r.hasPreviousLog()? TermIndex.valueOf(r.getPreviousLog()) : null;
     final RaftPeerId requestorId = RaftPeerId.valueOf(request.getRequestorId());
 
@@ -1318,7 +1317,7 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   private void preAppendEntriesAsync(RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
-      TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException {
+      TermIndex previous, long leaderCommit, boolean initializing, List<LogEntryProto> entries) throws IOException {
     CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
         leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
 
@@ -1342,8 +1341,8 @@ class RaftServerImpl implements RaftServer.Division,
   @SuppressWarnings("checkstyle:parameternumber")
   private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
       RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
-      List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException {
-    final boolean isHeartbeat = entries.length == 0;
+      List<CommitInfoProto> commitInfos, List<LogEntryProto> entries) throws IOException {
+    final boolean isHeartbeat = entries.isEmpty();
     logAppendEntries(isHeartbeat,
         () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
             + previous + ", " + leaderCommit + ", " + initializing
@@ -1400,7 +1399,7 @@ class RaftServerImpl implements RaftServer.Division,
       state.updateConfiguration(entries);
     }
 
-    final List<CompletableFuture<Long>> futures = entries.length == 0 ? Collections.emptyList()
+    final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
         : state.getLog().append(entries);
     commitInfos.forEach(commitInfoCache::update);
 
@@ -1419,12 +1418,19 @@ class RaftServerImpl implements RaftServer.Division,
     ).thenApply(v -> {
       final AppendEntriesReplyProto reply;
       synchronized(this) {
-        final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.length);
+        final long commitIndex = ServerImplUtils.effectiveCommitIndex(leaderCommit, previous, entries.size());
         state.updateCommitIndex(commitIndex, currentTerm, false);
         updateCommitInfoCache();
-        final long n = isHeartbeat? state.getLog().getNextIndex(): entries[entries.length - 1].getIndex() + 1;
-        final long matchIndex = entries.length != 0 ? entries[entries.length - 1].getIndex() :
-            INVALID_LOG_INDEX;
+        final long n;
+        final long matchIndex;
+        if (!isHeartbeat) {
+          LogEntryProto requestLastEntry = entries.get(entries.size() - 1);
+          n = requestLastEntry.getIndex() + 1;
+          matchIndex = requestLastEntry.getIndex();
+        } else {
+          n = state.getLog().getNextIndex();
+          matchIndex = INVALID_LOG_INDEX;
+        }
         reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getMemberId(), currentTerm,
             state.getLog().getLastCommittedIndex(), n, SUCCESS, callId, matchIndex,
             isHeartbeat);
@@ -1437,7 +1443,7 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId leaderId, long currentTerm,
-      long followerCommit, TermIndex previous, long callId, boolean isHeartbeat, LogEntryProto... entries) {
+      long followerCommit, TermIndex previous, long callId, boolean isHeartbeat, List<LogEntryProto> entries) {
     final long replyNextIndex = checkInconsistentAppendEntries(previous, entries);
     if (replyNextIndex == -1) {
       return null;
@@ -1450,7 +1456,7 @@ class RaftServerImpl implements RaftServer.Division,
     return reply;
   }
 
-  private long checkInconsistentAppendEntries(TermIndex previous, LogEntryProto... entries) {
+  private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
     // Check if a snapshot installation through state machine is in progress.
     final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
     if (installSnapshot != INVALID_LOG_INDEX) {
@@ -1460,8 +1466,8 @@ class RaftServerImpl implements RaftServer.Division,
 
     // Check that the first log entry is greater than the snapshot index in the latest snapshot and follower's last
     // committed index. If not, reply to the leader the new next index.
-    if (entries != null && entries.length > 0) {
-      final long firstEntryIndex = entries[0].getIndex();
+    if (entries != null && !entries.isEmpty()) {
+      final long firstEntryIndex = entries.get(0).getIndex();
       final long snapshotIndex = state.getSnapshotIndex();
       final long commitIndex =  state.getLog().getLastCommittedIndex();
       final long nextIndex = Math.max(snapshotIndex, commitIndex);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 52aedfb11..34e9cb92c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -430,10 +430,10 @@ class ServerState implements Closeable {
     LOG.trace("{}: {}", getMemberId(), configurationManager);
   }
 
-  void updateConfiguration(LogEntryProto[] entries) {
-    if (entries != null && entries.length > 0) {
-      configurationManager.removeConfigurations(entries[0].getIndex());
-      Arrays.stream(entries).forEach(this::setRaftConf);
+  void updateConfiguration(List<LogEntryProto> entries) {
+    if (entries != null && !entries.isEmpty()) {
+      configurationManager.removeConfigurations(entries.get(0).getIndex());
+      entries.stream().forEach(this::setRaftConf);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index 49caeb9fc..dd8c67dc8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -29,7 +29,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Function;
@@ -65,11 +64,9 @@ public final class LogProtoUtils {
     return toLogEntryString(entry, null);
   }
 
-  public static String toLogEntriesString(LogEntryProto... entries) {
+  public static String toLogEntriesString(List<LogEntryProto> entries) {
     return entries == null ? null
-        : entries.length == 0 ? "[]"
-        : entries.length == 1 ? toLogEntryString(entries[0])
-        : "" + Arrays.stream(entries).map(LogProtoUtils::toLogEntryString).collect(Collectors.toList());
+        : entries.stream().map(LogProtoUtils::toLogEntryString).collect(Collectors.toList()).toString();
   }
 
   public static String toLogEntriesShortString(List<LogEntryProto> entries) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 60081edd2..11eddb927 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -333,11 +333,11 @@ public abstract class RaftLogBase implements RaftLog {
   protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto entry);
 
   @Override
-  public final List<CompletableFuture<Long>> append(LogEntryProto... entries) {
+  public final List<CompletableFuture<Long>> append(List<LogEntryProto> entries) {
     return runner.runSequentially(() -> appendImpl(entries));
   }
 
-  protected abstract List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries);
+  protected abstract List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> entries);
 
   @Override
   public String toString() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 41caef404..0eb7fb159 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -180,9 +180,9 @@ public class MemoryRaftLog extends RaftLogBase {
   }
 
   @Override
-  public List<CompletableFuture<Long>> appendImpl(LogEntryProto... logEntryProtos) {
+  public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> logEntryProtos) {
     checkLogState();
-    if (logEntryProtos == null || logEntryProtos.length == 0) {
+    if (logEntryProtos == null || logEntryProtos.isEmpty()) {
       return Collections.emptyList();
     }
     try(AutoCloseableLock writeLock = writeLock()) {
@@ -194,26 +194,27 @@ public class MemoryRaftLog extends RaftLogBase {
       // been committed but in the system the entry has not been committed to
       // the quorum of peers' disks.
       boolean toTruncate = false;
-      int truncateIndex = (int) logEntryProtos[0].getIndex();
+      int truncateIndex = (int) logEntryProtos.get(0).getIndex();
       int index = 0;
-      for (; truncateIndex < getNextIndex() && index < logEntryProtos.length;
+      for (; truncateIndex < getNextIndex() && index < logEntryProtos.size();
            index++, truncateIndex++) {
         if (this.entries.get(truncateIndex).getTerm() !=
-            logEntryProtos[index].getTerm()) {
+            logEntryProtos.get(index).getTerm()) {
           toTruncate = true;
           break;
         }
       }
       final List<CompletableFuture<Long>> futures;
       if (toTruncate) {
-        futures = new ArrayList<>(logEntryProtos.length - index + 1);
+        futures = new ArrayList<>(logEntryProtos.size() - index + 1);
         futures.add(truncate(truncateIndex));
       } else {
-        futures = new ArrayList<>(logEntryProtos.length - index);
+        futures = new ArrayList<>(logEntryProtos.size() - index);
       }
-      for (int i = index; i < logEntryProtos.length; i++) {
-        this.entries.add(logEntryProtos[i]);
-        futures.add(CompletableFuture.completedFuture(logEntryProtos[i].getIndex()));
+      for (int i = index; i < logEntryProtos.size(); i++) {
+        LogEntryProto logEntryProto = logEntryProtos.get(i);
+        this.entries.add(logEntryProto);
+        futures.add(CompletableFuture.completedFuture(logEntryProto.getIndex()));
       }
       return futures;
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index e5f4ab87a..74d6a8c03 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -435,9 +435,9 @@ public class SegmentedRaftLog extends RaftLogBase {
   }
 
   @Override
-  public List<CompletableFuture<Long>> appendImpl(LogEntryProto... entries) {
+  public List<CompletableFuture<Long>> appendImpl(List<LogEntryProto> entries) {
     checkLogState();
-    if (entries == null || entries.length == 0) {
+    if (entries == null || entries.isEmpty()) {
       return Collections.emptyList();
     }
     try(AutoCloseableLock writeLock = writeLock()) {
@@ -448,13 +448,13 @@ public class SegmentedRaftLog extends RaftLogBase {
 
       final List<CompletableFuture<Long>> futures;
       if (truncateIndex != -1) {
-        futures = new ArrayList<>(entries.length - index + 1);
+        futures = new ArrayList<>(entries.size() - index + 1);
         futures.add(truncate(truncateIndex));
       } else {
-        futures = new ArrayList<>(entries.length - index);
+        futures = new ArrayList<>(entries.size() - index);
       }
-      for (int i = index; i < entries.length; i++) {
-        futures.add(appendEntry(entries[i]));
+      for (int i = index; i < entries.size(); i++) {
+        futures.add(appendEntry(entries.get(i)));
       }
       return futures;
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 634a2bcb7..7608b30af 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -595,25 +595,26 @@ public class SegmentedRaftLogCache {
     }
   }
 
-  TruncateIndices computeTruncateIndices(Consumer<TermIndex> failClientRequest, LogEntryProto... entries) {
+  TruncateIndices computeTruncateIndices(Consumer<TermIndex> failClientRequest, List<LogEntryProto> entries) {
     int arrayIndex = 0;
     long truncateIndex = -1;
 
     try(AutoCloseableLock readLock = closedSegments.readLock()) {
-      final Iterator<TermIndex> i = iterator(entries[0].getIndex());
-      for(; i.hasNext() && arrayIndex < entries.length; arrayIndex++) {
+      final Iterator<TermIndex> i = iterator(entries.get(0).getIndex());
+      for(; i.hasNext() && arrayIndex < entries.size(); arrayIndex++) {
         final TermIndex storedEntry = i.next();
-        Preconditions.assertTrue(storedEntry.getIndex() == entries[arrayIndex].getIndex(),
+        LogEntryProto logEntryProto = entries.get(arrayIndex);
+        Preconditions.assertTrue(storedEntry.getIndex() == logEntryProto.getIndex(),
             "The stored entry's index %s is not consistent with the received entries[%s]'s index %s",
-            storedEntry.getIndex(), arrayIndex, entries[arrayIndex].getIndex());
+            storedEntry.getIndex(), arrayIndex, logEntryProto.getIndex());
 
-        if (storedEntry.getTerm() != entries[arrayIndex].getTerm()) {
+        if (storedEntry.getTerm() != logEntryProto.getTerm()) {
           // we should truncate from the storedEntry's arrayIndex
           truncateIndex = storedEntry.getIndex();
           if (LOG.isTraceEnabled()) {
             LOG.trace("{}: truncate to {}, arrayIndex={}, ti={}, storedEntry={}, entries={}",
                 name, truncateIndex, arrayIndex,
-                TermIndex.valueOf(entries[arrayIndex]), storedEntry,
+                TermIndex.valueOf(logEntryProto), storedEntry,
                 LogProtoUtils.toLogEntriesString(entries));
           }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
index c2cfb7580..086c10dea 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLogTest.java
@@ -19,6 +19,8 @@ package org.apache.ratis.server.raftlog.memory;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
@@ -52,20 +54,20 @@ public class MemoryRaftLogTest extends BaseTest {
 
     MemoryRaftLog raftLog = new MemoryRaftLog(memberId, () -> -1, prop);
     raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
-    LogEntryProto[] entries1 = new LogEntryProto[2];
-    entries1[0] = LogEntryProto.newBuilder().setIndex(0).setTerm(0).build();
-    entries1[1] = LogEntryProto.newBuilder().setIndex(1).setTerm(0).build();
+    List<LogEntryProto> entries1 = new ArrayList<>();
+    entries1.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
+    entries1.add(LogEntryProto.newBuilder().setIndex(1).setTerm(0).build());
     raftLog.append(entries1).forEach(CompletableFuture::join);
 
-    LogEntryProto[] entries2 = new LogEntryProto[1];
-    entries2[0] = LogEntryProto.newBuilder().setIndex(0).setTerm(0).build();
+    List<LogEntryProto> entries2 = new ArrayList<>();
+    entries2.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
     raftLog.append(entries2).forEach(CompletableFuture::join);
 
     final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
     assertEquals(2, termIndices.length);
     for (int i = 0; i < 2; i++) {
-      assertEquals(entries1[i].getIndex(), termIndices[i].getIndex());
-      assertEquals(entries1[i].getTerm(), termIndices[i].getTerm());
+      assertEquals(entries1.get(i).getIndex(), termIndices[i].getIndex());
+      assertEquals(entries1.get(i).getTerm(), termIndices[i].getTerm());
     }
   }
 
@@ -80,18 +82,18 @@ public class MemoryRaftLogTest extends BaseTest {
 
     MemoryRaftLog raftLog = new MemoryRaftLog(memberId, () -> -1, prop);
     raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
-    LogEntryProto[] entries1 = new LogEntryProto[2];
-    entries1[0] = LogEntryProto.newBuilder().setIndex(0).setTerm(0).build();
-    entries1[1] = LogEntryProto.newBuilder().setIndex(1).setTerm(0).build();
+    List<LogEntryProto> entries1 = new ArrayList<>();
+    entries1.add(LogEntryProto.newBuilder().setIndex(0).setTerm(0).build());
+    entries1.add(LogEntryProto.newBuilder().setIndex(1).setTerm(0).build());
     raftLog.append(entries1).forEach(CompletableFuture::join);
 
-    LogEntryProto[] entries2 = new LogEntryProto[1];
-    entries2[0] = LogEntryProto.newBuilder().setIndex(0).setTerm(2).build();
+    List<LogEntryProto> entries2 = new ArrayList<>();
+    entries2.add(LogEntryProto.newBuilder().setIndex(0).setTerm(2).build());
     raftLog.append(entries2).forEach(CompletableFuture::join);
 
     final LogEntryHeader[] termIndices = raftLog.getEntries(0, 10);
     assertEquals(1, termIndices.length);
-    assertEquals(entries2[0].getIndex(), termIndices[0].getIndex());
-    assertEquals(entries2[0].getTerm(), termIndices[0].getTerm());
+    assertEquals(entries2.get(0).getIndex(), termIndices[0].getIndex());
+    assertEquals(entries2.get(0).getTerm(), termIndices[0].getTerm());
   }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index a1e18a86d..87dd2ef37 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -171,7 +171,7 @@ public class TestCacheEviction extends BaseTest {
     final SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, info, storage, prop);
     raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
     List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, maxCachedNum, 7, 0);
-    LogEntryProto[] entries = generateEntries(slist);
+    List<LogEntryProto> entries = generateEntries(slist);
     raftLog.append(entries).forEach(CompletableFuture::join);
 
     // check the current cached segment number: the last segment is still open
@@ -190,7 +190,7 @@ public class TestCacheEviction extends BaseTest {
         raftLog.getRaftLogCache().getCachedSegmentNum());
   }
 
-  private LogEntryProto[] generateEntries(List<SegmentRange> slist) {
+  private List<LogEntryProto> generateEntries(List<SegmentRange> slist) {
     List<LogEntryProto> eList = new ArrayList<>();
     for (SegmentRange range : slist) {
       for (long index = range.start; index <= range.end; index++) {
@@ -198,6 +198,6 @@ public class TestCacheEviction extends BaseTest {
         eList.add(LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, index));
       }
     }
-    return eList.toArray(new LogEntryProto[eList.size()]);
+    return eList;
   }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index b4bb098a8..a78c102ba 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -523,7 +523,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       LOG.info("newEntries[0] = {}", newEntries.get(0));
       final int last = newEntries.size() - 1;
       LOG.info("newEntries[{}] = {}", last, newEntries.get(last));
-      raftLog.append(newEntries.toArray(new LogEntryProto[0])).forEach(CompletableFuture::join);
+      raftLog.append(newEntries).forEach(CompletableFuture::join);
 
       checkFailedEntries(entries, 650, retryCache);
       checkEntries(raftLog, entries, 0, 650);


[ratis] 12/12: RATIS-1669. Combine shell lib folder and root jars folder (#711)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 3bfaa496e97167cc9554e32284be159383b808ea
Author: leo65535 <le...@163.com>
AuthorDate: Fri Aug 12 16:45:51 2022 +0800

    RATIS-1669. Combine shell lib folder and root jars folder (#711)
    
    
    (cherry picked from commit 8b69f748e1d5495c7f860c790323f65cd9594ce0)
---
 .gitignore                                         |  1 +
 pom.xml                                            |  5 +++
 ratis-assembly/src/main/assembly/bin.xml           | 31 ----------------
 ratis-assembly/src/main/assembly/examples-bin.xml  | 14 +++-----
 ratis-assembly/src/main/assembly/shell-bin.xml     | 13 ++++---
 ratis-examples/pom.xml                             |  5 ++-
 ratis-shell/pom.xml                                | 41 +++++++---------------
 ratis-shell/src/main/libexec/ratis-shell-config.sh | 11 ++++++
 8 files changed, 43 insertions(+), 78 deletions(-)

diff --git a/.gitignore b/.gitignore
index 11a695c7e..f56494487 100644
--- a/.gitignore
+++ b/.gitignore
@@ -16,3 +16,4 @@
 target
 build
 patchprocess
+dependency-reduced-pom.xml
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 7743fa9dd..f6f0acd66 100644
--- a/pom.xml
+++ b/pom.xml
@@ -392,6 +392,11 @@
         <artifactId>slf4j-api</artifactId>
         <version>1.7.29</version>
       </dependency>
+      <dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-simple</artifactId>
+        <version>1.7.29</version>
+      </dependency>
       <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
diff --git a/ratis-assembly/src/main/assembly/bin.xml b/ratis-assembly/src/main/assembly/bin.xml
index 2d0858eb9..59462c322 100644
--- a/ratis-assembly/src/main/assembly/bin.xml
+++ b/ratis-assembly/src/main/assembly/bin.xml
@@ -76,36 +76,5 @@
       <fileMode>0644</fileMode>
       <directoryMode>0755</directoryMode>
     </fileSet>
-    <!-- Include dev support tools -->
-<!--    <fileSet>-->
-<!--      <directory>${project.basedir}/../dev-support</directory>-->
-<!--      <outputDirectory>dev-support</outputDirectory>-->
-<!--      <fileMode>0644</fileMode>-->
-<!--      <directoryMode>0755</directoryMode>-->
-<!--    </fileSet>-->
-<!--    <fileSet>-->
-<!--      <directory>${project.basedir}/../ratis-shell/target/</directory>-->
-<!--      <outputDirectory>ratis-shell/lib/shell</outputDirectory>-->
-<!--      <fileMode>755</fileMode>-->
-<!--      <includes>-->
-<!--        <include>ratis-shell-*-jar-with-dependencies.jar</include>-->
-<!--      </includes>-->
-<!--    </fileSet>-->
-<!--    <fileSet>-->
-<!--      <directory>${project.basedir}/../ratis-shell/src/main/bin</directory>-->
-<!--      <outputDirectory>ratis-shell/bin</outputDirectory>-->
-<!--      <fileMode>755</fileMode>-->
-<!--    </fileSet>-->
-<!--    <fileSet>-->
-<!--      <directory>${project.basedir}/../ratis-shell/src/main/libexec</directory>-->
-<!--      <outputDirectory>ratis-shell/libexec</outputDirectory>-->
-<!--      <fileMode>0644</fileMode>-->
-<!--      <directoryMode>0755</directoryMode>-->
-<!--    </fileSet>-->
-<!--    <fileSet>-->
-<!--      <directory>${project.basedir}/../ratis-shell/src/main/conf</directory>-->
-<!--      <outputDirectory>ratis-shell/conf</outputDirectory>-->
-<!--      <fileMode>644</fileMode>-->
-<!--    </fileSet>-->
   </fileSets>
 </assembly>
diff --git a/ratis-assembly/src/main/assembly/examples-bin.xml b/ratis-assembly/src/main/assembly/examples-bin.xml
index c88f75930..340fd5b2c 100644
--- a/ratis-assembly/src/main/assembly/examples-bin.xml
+++ b/ratis-assembly/src/main/assembly/examples-bin.xml
@@ -52,8 +52,8 @@
       <fileMode>0644</fileMode>
     </fileSet>
     <fileSet>
-      <directory>${project.basedir}/src/main/resources</directory>
-      <outputDirectory>.</outputDirectory>
+      <directory>${project.basedir}/../ratis-examples</directory>
+      <outputDirectory>examples</outputDirectory>
       <includes>
         <include>README.md</include>
       </includes>
@@ -65,19 +65,13 @@
       <includes>
         <include>*.*</include>
       </includes>
-      <fileMode>755</fileMode>
+      <fileMode>0755</fileMode>
     </fileSet>
-    <!-- Include dev support tools -->
-<!--    <fileSet>-->
-<!--      <directory>${project.basedir}/../dev-support</directory>-->
-<!--      <outputDirectory>dev-support</outputDirectory>-->
-<!--      <fileMode>0644</fileMode>-->
-<!--      <directoryMode>0755</directoryMode>-->
-<!--    </fileSet>-->
     <fileSet>
       <directory>${project.basedir}/../ratis-examples/src/main/resources</directory>
       <outputDirectory>examples/conf</outputDirectory>
       <includes>
+        <include>conf.properties</include>
         <include>log4j.properties</include>
       </includes>
       <fileMode>644</fileMode>
diff --git a/ratis-assembly/src/main/assembly/shell-bin.xml b/ratis-assembly/src/main/assembly/shell-bin.xml
index 5ecbb5ca6..86424d854 100644
--- a/ratis-assembly/src/main/assembly/shell-bin.xml
+++ b/ratis-assembly/src/main/assembly/shell-bin.xml
@@ -22,12 +22,15 @@
   <fileSets>
     <fileSet>
       <directory>${project.basedir}/../ratis-shell/target/</directory>
-      <outputDirectory>lib/shell</outputDirectory>
-      <fileMode>755</fileMode>
+      <outputDirectory>jars</outputDirectory>
       <includes>
-        <include>ratis-shell-*-jar-with-dependencies.jar</include>
+        <include>ratis-shell-${project.version}.jar</include>
       </includes>
     </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/../ratis-shell/target/lib/</directory>
+      <outputDirectory>jars</outputDirectory>
+    </fileSet>
     <fileSet>
       <directory>${project.basedir}/..</directory>
       <outputDirectory>.</outputDirectory>
@@ -48,12 +51,12 @@
     <fileSet>
       <directory>${project.basedir}/../ratis-shell/src/main/bin</directory>
       <outputDirectory>bin</outputDirectory>
-      <fileMode>755</fileMode>
+      <fileMode>0755</fileMode>
     </fileSet>
     <fileSet>
       <directory>${project.basedir}/../ratis-shell/src/main/libexec</directory>
       <outputDirectory>libexec</outputDirectory>
-      <fileMode>0644</fileMode>
+      <fileMode>0755</fileMode>
       <directoryMode>0755</directoryMode>
     </fileSet>
     <fileSet>
diff --git a/ratis-examples/pom.xml b/ratis-examples/pom.xml
index a3213c212..1f8b86041 100644
--- a/ratis-examples/pom.xml
+++ b/ratis-examples/pom.xml
@@ -147,9 +147,8 @@
                     <exclude>META-INF/*.DSA</exclude>
                     <exclude>META-INF/*.RSA</exclude>
                     <exclude>**/org/apache/log4j/chainsaw/**</exclude>
-                    <exclude>**/org/apache/log4j/jdbc/JDBCAppender.class</exclude>
-                    <exclude>**/org/apache/log4j/net/JMSAppender.class</exclude>
-                    <exclude>**/org/apache/log4j/net/JMSSink.class</exclude>
+                    <exclude>**/org/apache/log4j/jdbc/**</exclude>
+                    <exclude>**/org/apache/log4j/net/**</exclude>
                   </excludes>
                 </filter>
               </filters>
diff --git a/ratis-shell/pom.xml b/ratis-shell/pom.xml
index 12345d772..c3b8d5e76 100644
--- a/ratis-shell/pom.xml
+++ b/ratis-shell/pom.xml
@@ -27,10 +27,12 @@
     <dependency>
       <artifactId>ratis-client</artifactId>
       <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <artifactId>ratis-common</artifactId>
       <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
     </dependency>
 
     <dependency>
@@ -42,6 +44,10 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.reflections</groupId>
       <artifactId>reflections</artifactId>
@@ -52,41 +58,18 @@
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
+        <artifactId>maven-dependency-plugin</artifactId>
         <executions>
           <execution>
+            <id>copy</id>
             <phase>package</phase>
             <goals>
-              <goal>shade</goal>
+              <goal>copy-dependencies</goal>
             </goals>
             <configuration>
-              <finalName>${project.artifactId}-${project.version}-jar-with-dependencies</finalName>
-
-              <filters>
-                <filter>
-                  <artifact>*:*</artifact>
-                  <excludes>
-                    <exclude>LICENSE</exclude>
-                    <exclude>META-INF/*.SF</exclude>
-                    <exclude>META-INF/*.DSA</exclude>
-                    <exclude>META-INF/*.RSA</exclude>
-                    <exclude>**/org/apache/log4j/chainsaw/**</exclude>
-                    <exclude>**/org/apache/log4j/jdbc/JDBCAppender.class</exclude>
-                    <exclude>**/org/apache/log4j/net/JMSAppender.class</exclude>
-                    <exclude>**/org/apache/log4j/net/JMSSink.class</exclude>
-                  </excludes>
-                </filter>
-              </filters>
-
-              <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                  <manifestEntries>
-                    <Main-Class>org.apache.ratis.shell.cli.sh.RatisShell</Main-Class>
-                  </manifestEntries>
-                </transformer>
-              </transformers>
+              <outputDirectory>
+                ${project.build.directory}/lib
+              </outputDirectory>
             </configuration>
           </execution>
         </executions>
diff --git a/ratis-shell/src/main/libexec/ratis-shell-config.sh b/ratis-shell/src/main/libexec/ratis-shell-config.sh
index 0fed399a4..f2b4f3c38 100644
--- a/ratis-shell/src/main/libexec/ratis-shell-config.sh
+++ b/ratis-shell/src/main/libexec/ratis-shell-config.sh
@@ -32,6 +32,7 @@ RATIS_SHELL_HOME=$(dirname $(dirname "${this}"))
 RATIS_SHELL_ASSEMBLY_CLIENT_JAR="${RATIS_SHELL_HOME}/lib/shell/*"
 RATIS_SHELL_CONF_DIR="${RATIS_SHELL_CONF_DIR:-${RATIS_SHELL_HOME}/conf}"
 RATIS_SHELL_LOGS_DIR="${RATIS_SHELL_LOGS_DIR:-${RATIS_SHELL_HOME}/logs}"
+RATIS_SHELL_LIB_DIR="${RATIS_SHELL_LIB_DIR:-${RATIS_SHELL_HOME}/jars}"
 
 if [[ -e "${RATIS_SHELL_CONF_DIR}/ratis-shell-env.sh" ]]; then
   . "${RATIS_SHELL_CONF_DIR}/ratis-shell-env.sh"
@@ -58,6 +59,16 @@ if [[ ${JAVA_MAJORMINOR} != 001008 && ${JAVA_MAJOR} != 011 ]]; then
   exit 1
 fi
 
+local RATIS_SHELL_CLASSPATH
+
+while read -d '' -r jarfile ; do
+    if [[ "$RATIS_SHELL_CLASSPATH" == "" ]]; then
+        RATIS_SHELL_CLASSPATH="$jarfile";
+    else
+        RATIS_SHELL_CLASSPATH="$RATIS_SHELL_CLASSPATH":"$jarfile"
+    fi
+done < <(find "$RATIS_SHELL_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
+
 RATIS_SHELL_CLIENT_CLASSPATH="${RATIS_SHELL_CONF_DIR}/:${RATIS_SHELL_CLASSPATH}:${RATIS_SHELL_ASSEMBLY_CLIENT_JAR}"
 
 if [[ -n "${RATIS_SHELL_HOME}" ]]; then


[ratis] 10/12: RATIS-1660. Fix wrong ratis package (#705)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 8a9963c8f3defb1a909c7393bf15eb560752c9a6
Author: leo65535 <le...@163.com>
AuthorDate: Wed Aug 10 11:16:33 2022 +0800

    RATIS-1660. Fix wrong ratis package (#705)
    
    
    (cherry picked from commit 92088ff621ca44f29818b1df71e232284602a88e)
---
 pom.xml                                           |  2 +
 ratis-assembly/pom.xml                            | 41 ++++++++++++---
 ratis-assembly/src/main/assembly/bin-pkg.xml      | 28 ++++++++++
 ratis-assembly/src/main/assembly/bin.xml          | 62 +++++++++++------------
 ratis-assembly/src/main/assembly/examples-bin.xml | 14 ++---
 ratis-assembly/src/main/assembly/shell-bin.xml    |  2 +-
 6 files changed, 104 insertions(+), 45 deletions(-)

diff --git a/pom.xml b/pom.xml
index 803700fc5..7743fa9dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,6 +176,7 @@
     <maven-stylus-skin.version>1.5</maven-stylus-skin.version>
     <maven-surefire-plugin.version>3.0.0-M1</maven-surefire-plugin.version>
     <maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
+    <maven-assembly-plugin.version>3.4.1</maven-assembly-plugin.version>
 
     <checkstyle.version>9.3</checkstyle.version>
 
@@ -522,6 +523,7 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-assembly-plugin</artifactId>
+          <version>${maven-assembly-plugin.version}</version>
           <configuration>
             <!--Defer to the ratis-assembly sub-module.  It does all assembly-->
             <skipAssembly>true</skipAssembly>
diff --git a/ratis-assembly/pom.xml b/ratis-assembly/pom.xml
index 46e716fa0..0bd81aa9b 100644
--- a/ratis-assembly/pom.xml
+++ b/ratis-assembly/pom.xml
@@ -115,6 +115,7 @@
           <attach>true</attach>
         </configuration>
       </plugin>
+
       <plugin>
         <artifactId>maven-assembly-plugin</artifactId>
         <configuration>
@@ -123,14 +124,42 @@
           <skipAssembly>false</skipAssembly>
           <appendAssemblyId>true</appendAssemblyId>
           <tarLongFileMode>gnu</tarLongFileMode>
-          <descriptors>
-            <descriptor>src/main/assembly/src.xml</descriptor>
-            <descriptor>src/main/assembly/bin.xml</descriptor>
-            <descriptor>src/main/assembly/examples-bin.xml</descriptor>
-            <descriptor>src/main/assembly/shell-bin.xml</descriptor>
-          </descriptors>
         </configuration>
+        <executions>
+          <execution>
+            <id>src</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/src.xml</descriptor>
+              </descriptors>
+              <finalName>apache-ratis-${project.version}-src</finalName>
+              <appendAssemblyId>false</appendAssemblyId>
+            </configuration>
+          </execution>
+          <execution>
+            <id>default-cli</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/bin.xml</descriptor>
+                <descriptor>src/main/assembly/examples-bin.xml</descriptor>
+                <descriptor>src/main/assembly/shell-bin.xml</descriptor>
+                <descriptor>src/main/assembly/bin-pkg.xml</descriptor>
+              </descriptors>
+              <finalName>apache-ratis-${project.version}-bin</finalName>
+              <appendAssemblyId>false</appendAssemblyId>
+            </configuration>
+          </execution>
+        </executions>
       </plugin>
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-site-plugin</artifactId>
diff --git a/ratis-assembly/src/main/assembly/bin-pkg.xml b/ratis-assembly/src/main/assembly/bin-pkg.xml
new file mode 100644
index 000000000..c07c83cd5
--- /dev/null
+++ b/ratis-assembly/src/main/assembly/bin-pkg.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+  <id>bin-pkg</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/target/apache-ratis-${project.version}-bin/apache-ratis-${project.version}-bin</directory>
+      <outputDirectory>..</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/ratis-assembly/src/main/assembly/bin.xml b/ratis-assembly/src/main/assembly/bin.xml
index 0648efdb0..2d0858eb9 100644
--- a/ratis-assembly/src/main/assembly/bin.xml
+++ b/ratis-assembly/src/main/assembly/bin.xml
@@ -23,7 +23,7 @@
   -->
   <id>bin</id>
   <formats>
-    <format>tar.gz</format>
+    <format>dir</format>
   </formats>
   <moduleSets>
     <moduleSet>
@@ -77,35 +77,35 @@
       <directoryMode>0755</directoryMode>
     </fileSet>
     <!-- Include dev support tools -->
-    <fileSet>
-      <directory>${project.basedir}/../dev-support</directory>
-      <outputDirectory>dev-support</outputDirectory>
-      <fileMode>0644</fileMode>
-      <directoryMode>0755</directoryMode>
-    </fileSet>
-    <fileSet>
-      <directory>${project.basedir}/../ratis-shell/target/</directory>
-      <outputDirectory>ratis-shell/lib/shell</outputDirectory>
-      <fileMode>755</fileMode>
-      <includes>
-        <include>ratis-shell-*-jar-with-dependencies.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${project.basedir}/../ratis-shell/src/main/bin</directory>
-      <outputDirectory>ratis-shell/bin</outputDirectory>
-      <fileMode>755</fileMode>
-    </fileSet>
-    <fileSet>
-      <directory>${project.basedir}/../ratis-shell/src/main/libexec</directory>
-      <outputDirectory>ratis-shell/libexec</outputDirectory>
-      <fileMode>0644</fileMode>
-      <directoryMode>0755</directoryMode>
-    </fileSet>
-    <fileSet>
-      <directory>${project.basedir}/../ratis-shell/src/main/conf</directory>
-      <outputDirectory>ratis-shell/conf</outputDirectory>
-      <fileMode>644</fileMode>
-    </fileSet>
+<!--    <fileSet>-->
+<!--      <directory>${project.basedir}/../dev-support</directory>-->
+<!--      <outputDirectory>dev-support</outputDirectory>-->
+<!--      <fileMode>0644</fileMode>-->
+<!--      <directoryMode>0755</directoryMode>-->
+<!--    </fileSet>-->
+<!--    <fileSet>-->
+<!--      <directory>${project.basedir}/../ratis-shell/target/</directory>-->
+<!--      <outputDirectory>ratis-shell/lib/shell</outputDirectory>-->
+<!--      <fileMode>755</fileMode>-->
+<!--      <includes>-->
+<!--        <include>ratis-shell-*-jar-with-dependencies.jar</include>-->
+<!--      </includes>-->
+<!--    </fileSet>-->
+<!--    <fileSet>-->
+<!--      <directory>${project.basedir}/../ratis-shell/src/main/bin</directory>-->
+<!--      <outputDirectory>ratis-shell/bin</outputDirectory>-->
+<!--      <fileMode>755</fileMode>-->
+<!--    </fileSet>-->
+<!--    <fileSet>-->
+<!--      <directory>${project.basedir}/../ratis-shell/src/main/libexec</directory>-->
+<!--      <outputDirectory>ratis-shell/libexec</outputDirectory>-->
+<!--      <fileMode>0644</fileMode>-->
+<!--      <directoryMode>0755</directoryMode>-->
+<!--    </fileSet>-->
+<!--    <fileSet>-->
+<!--      <directory>${project.basedir}/../ratis-shell/src/main/conf</directory>-->
+<!--      <outputDirectory>ratis-shell/conf</outputDirectory>-->
+<!--      <fileMode>644</fileMode>-->
+<!--    </fileSet>-->
   </fileSets>
 </assembly>
diff --git a/ratis-assembly/src/main/assembly/examples-bin.xml b/ratis-assembly/src/main/assembly/examples-bin.xml
index 59e85400b..c88f75930 100644
--- a/ratis-assembly/src/main/assembly/examples-bin.xml
+++ b/ratis-assembly/src/main/assembly/examples-bin.xml
@@ -23,7 +23,7 @@
   -->
   <id>examples-bin</id>
   <formats>
-    <format>tar.gz</format>
+    <format>dir</format>
   </formats>
   <dependencySets>
     <dependencySet>
@@ -68,12 +68,12 @@
       <fileMode>755</fileMode>
     </fileSet>
     <!-- Include dev support tools -->
-    <fileSet>
-      <directory>${project.basedir}/../dev-support</directory>
-      <outputDirectory>dev-support</outputDirectory>
-      <fileMode>0644</fileMode>
-      <directoryMode>0755</directoryMode>
-    </fileSet>
+<!--    <fileSet>-->
+<!--      <directory>${project.basedir}/../dev-support</directory>-->
+<!--      <outputDirectory>dev-support</outputDirectory>-->
+<!--      <fileMode>0644</fileMode>-->
+<!--      <directoryMode>0755</directoryMode>-->
+<!--    </fileSet>-->
     <fileSet>
       <directory>${project.basedir}/../ratis-examples/src/main/resources</directory>
       <outputDirectory>examples/conf</outputDirectory>
diff --git a/ratis-assembly/src/main/assembly/shell-bin.xml b/ratis-assembly/src/main/assembly/shell-bin.xml
index e586cecd7..5ecbb5ca6 100644
--- a/ratis-assembly/src/main/assembly/shell-bin.xml
+++ b/ratis-assembly/src/main/assembly/shell-bin.xml
@@ -17,7 +17,7 @@
           xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
   <id>shell</id>
   <formats>
-    <format>tar.gz</format>
+    <format>dir</format>
   </formats>
   <fileSets>
     <fileSet>