You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/22 08:20:16 UTC

[ratis] branch branch-2_readIndex updated (cfd342044 -> 42c3c6ba4)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a change to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git


    from cfd342044 RATIS-1802. GrpcServerProtocolService encounters IllegalStateException: call already closed. (#839)
     new 08dc58fcb RATIS-1807. Support timeout in gRPC. (#842)
     new 252446070 RATIS-1811. Improve StreamObserver's log messages (#849)
     new 1c500bbaf RATIS-1810. Intermittent failure in TestRaftServerWithGrpc#testRaftClientMetrics (#847)
     new 3ed7c481c RATIS-1808. Rerun PreVote when Vote is timed out (#846)
     new 70fe68f0d RATIS-1815. GitHub: Enable autolink to Jira (#853)
     new 5d9e4b538 RATIS-1813. Allow ratis-shell to run in JDK 8+ (#851)
     new ee8aef255 RATIS-1796. Fix TransferLeadership stopped by heartbeat from old leader (#844)
     new 2fc1c83a6 RATIS-1814: The group info command of the Ratis shell does not show the listener (#852)
     new 5a996a42f RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)
     new 80617d4bb RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)
     new e20a2e83b RATIS-1816. appendEntryTimer is not accurate due to the return of writeFuture (#855)
     new f0d9d566e RATIS-1770. Yield leader to higher priority peer by TransferLeadership (#845)
     new c727be372 RATIS-1795. Add and distribute a Maven Wrapper (#856)
     new f50965d56 RATIS-1817. Do not send StartLeaderElection when leaderLastEntry is null (#857)
     new 649e5fef2 RATIS-1819. Use Maven Wrapper in CI workflows (#859)
     new c19db251d RATIS-1820. Update apache parent pom version and other versions. (#861)
     new 14ae143fb RATIS-1662. Intermittent failure in testEnforceLeader (#860)
     new 42c3c6ba4 RATIS-1821. Upgrade ratis-thirdparty version to 1.0.4 (#862)

The 18 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .asf.yaml                                          |   1 +
 .gitignore                                         |   1 +
 .mvn/wrapper/maven-wrapper.properties              |  18 ++
 BUILDING.md                                        |   6 +-
 dev-support/checks/_mvn_unit_report.sh             |   2 +-
 dev-support/checks/build.sh                        |   4 +-
 dev-support/checks/checkstyle.sh                   |   6 +-
 dev-support/checks/findbugs.sh                     |   6 +-
 dev-support/checks/rat.sh                          |   4 +-
 dev-support/checks/sonar.sh                        |   5 +-
 dev-support/checks/unit.sh                         |   4 +-
 .../install-runconfig.sh => find_maven.sh}         |  20 +-
 dev-support/make_rc.sh                             |  10 +-
 dev-support/run-test-repeatedly.sh                 |   6 +-
 mvnw                                               | 308 +++++++++++++++++++++
 mvnw.cmd                                           | 205 ++++++++++++++
 pom.xml                                            |  51 ++--
 ratis-assembly/pom.xml                             |   1 +
 ratis-assembly/src/main/assembly/src.xml           |  10 +
 .../org/apache/ratis/client/DataStreamClient.java  |   2 +-
 .../java/org/apache/ratis/client/RaftClient.java   |   2 +-
 .../org/apache/ratis/protocol/RoutingTable.java    |   2 +-
 .../java/org/apache/ratis/retry/RetryPolicies.java |   6 +-
 .../main/java/org/apache/ratis/util/ExitUtils.java |   2 +-
 .../org/apache/ratis/util/ResourceSemaphore.java   |  28 +-
 ratis-examples/pom.xml                             |   5 +
 .../examples/arithmetic/expression/Expression.java |   6 +-
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java |  38 ++-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  13 +-
 .../grpc/server/GrpcServerProtocolClient.java      |   7 +-
 .../grpc/server/GrpcServerProtocolService.java     |   2 +-
 .../grpc/util/ResponseNotifyClientInterceptor.java |  72 +++++
 .../ratis/grpc/util/StreamObserverWithTimeout.java | 122 ++++++++
 .../src/main/proto/Test.proto                      |  25 +-
 .../apache/ratis/server/impl/LeaderElection.java   |  95 ++++---
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  68 ++---
 .../apache/ratis/server/impl/PendingRequests.java  |  17 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  20 +-
 .../org/apache/ratis/server/impl/ServerState.java  |   1 -
 .../ratis/server/impl/TransferLeadership.java      | 239 +++++++++++++---
 .../server/leader/InstallSnapshotRequests.java     |   2 +-
 .../server/metrics/SegmentedRaftLogMetrics.java    |   4 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java |   5 +-
 .../ratis/server/impl/LeaderElectionTests.java     |   5 +-
 .../MiniRaftClusterWithSimulatedRpc.java           |   2 +-
 .../shell/cli/sh/election/TransferCommand.java     |   8 +-
 ratis-shell/src/main/libexec/ratis-shell-config.sh |  10 +-
 ratis-test/pom.xml                                 |   4 +
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  |  22 +-
 .../org/apache/ratis/grpc/util/GrpcTestClient.java | 123 ++++++++
 .../org/apache/ratis/grpc/util/GrpcTestServer.java | 108 ++++++++
 .../grpc/util/TestStreamObserverWithTimeout.java   | 119 ++++++++
 .../apache/ratis/util/TestResourceSemaphore.java   |  13 +-
 53 files changed, 1590 insertions(+), 275 deletions(-)
 create mode 100644 .mvn/wrapper/maven-wrapper.properties
 copy dev-support/{intellij/install-runconfig.sh => find_maven.sh} (73%)
 create mode 100755 mvnw
 create mode 100644 mvnw.cmd
 create mode 100644 ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java
 create mode 100644 ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
 copy ratis-experiments/src/main/flatbufs/FileTransfer.fbs => ratis-proto/src/main/proto/Test.proto (69%)
 create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
 create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java
 create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java


[ratis] 15/18: RATIS-1819. Use Maven Wrapper in CI workflows (#859)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 649e5fef2b39de06afe8caba7eb481ae73560721
Author: tison <wa...@gmail.com>
AuthorDate: Tue Mar 21 11:31:29 2023 +0800

    RATIS-1819. Use Maven Wrapper in CI workflows (#859)
    
    (cherry picked from commit 5a8bdef821c24bda579859e3bd3dacd9b0792b7d)
---
 dev-support/checks/_mvn_unit_report.sh |  2 +-
 dev-support/checks/build.sh            |  4 +++-
 dev-support/checks/checkstyle.sh       |  6 ++++--
 dev-support/checks/findbugs.sh         |  6 ++++--
 dev-support/checks/rat.sh              |  4 +++-
 dev-support/checks/sonar.sh            |  5 ++++-
 dev-support/checks/unit.sh             |  4 +++-
 dev-support/find_maven.sh              | 30 ++++++++++++++++++++++++++++++
 dev-support/make_rc.sh                 | 10 ++++------
 dev-support/run-test-repeatedly.sh     |  6 ++++--
 10 files changed, 60 insertions(+), 17 deletions(-)

diff --git a/dev-support/checks/_mvn_unit_report.sh b/dev-support/checks/_mvn_unit_report.sh
index f7123775f..cc14817a9 100755
--- a/dev-support/checks/_mvn_unit_report.sh
+++ b/dev-support/checks/_mvn_unit_report.sh
@@ -75,7 +75,7 @@ for failed_test in $(< ${REPORT_DIR}/summary.txt); do
 done
 
 ## Check if Maven was killed
-if grep -q 'Killed.* mvn .* test ' "${REPORT_DIR}/output.log"; then
+if grep -q "Killed.* ${MVN} .* test " "${REPORT_DIR}/output.log"; then
   echo 'Maven test run was killed' >> "${REPORT_DIR}/summary.txt"
 fi
 
diff --git a/dev-support/checks/build.sh b/dev-support/checks/build.sh
index ad029d626..4d7090aff 100755
--- a/dev-support/checks/build.sh
+++ b/dev-support/checks/build.sh
@@ -16,6 +16,8 @@
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 cd "$DIR/../.." || exit 1
 
+source "${DIR}/../find_maven.sh"
+
 export MAVEN_OPTS="-Xmx4096m"
-mvn -V -B -Dmaven.javadoc.skip=true -DskipTests clean install "$@"
+${MVN} -V -B -Dmaven.javadoc.skip=true -DskipTests clean install "$@"
 exit $?
diff --git a/dev-support/checks/checkstyle.sh b/dev-support/checks/checkstyle.sh
index 490f1c470..cb06fdaac 100755
--- a/dev-support/checks/checkstyle.sh
+++ b/dev-support/checks/checkstyle.sh
@@ -16,6 +16,8 @@
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 cd "$DIR/../.." || exit 1
 
+source "${DIR}/../find_maven.sh"
+
 BASE_DIR="$(pwd -P)"
 REPORT_DIR=${OUTPUT_DIR:-"$DIR/../../target/checkstyle"}
 mkdir -p "$REPORT_DIR"
@@ -24,10 +26,10 @@ REPORT_FILE="$REPORT_DIR/summary.txt"
 MAVEN_OPTIONS='-B -fae -Dcheckstyle.failOnViolation=false'
 
 declare -i rc
-mvn ${MAVEN_OPTIONS} checkstyle:check | tee  "${REPORT_DIR}/output.log"
+${MVN} ${MAVEN_OPTIONS} checkstyle:check | tee  "${REPORT_DIR}/output.log"
 rc=$?
 if [[ ${rc} -ne 0 ]]; then
-  mvn ${MAVEN_OPTIONS} clean test-compile checkstyle:check
+  ${MVN} ${MAVEN_OPTIONS} clean test-compile checkstyle:check
   rc=$?
   mkdir -p "$REPORT_DIR" # removed by mvn clean
 else
diff --git a/dev-support/checks/findbugs.sh b/dev-support/checks/findbugs.sh
index bcb43fd4f..c1723f437 100755
--- a/dev-support/checks/findbugs.sh
+++ b/dev-support/checks/findbugs.sh
@@ -16,16 +16,18 @@
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 cd "$DIR/../.." || exit 1
 
+source "${DIR}/../find_maven.sh"
+
 MAVEN_OPTIONS='-B -fae'
 
 if ! type unionBugs >/dev/null 2>&1 || ! type convertXmlToText >/dev/null 2>&1; then
   #shellcheck disable=SC2086
-  mvn ${MAVEN_OPTIONS} test-compile spotbugs:check
+  ${MVN} ${MAVEN_OPTIONS} test-compile spotbugs:check
   exit $?
 fi
 
 #shellcheck disable=SC2086
-mvn ${MAVEN_OPTIONS} test-compile spotbugs:spotbugs
+${MVN} ${MAVEN_OPTIONS} test-compile spotbugs:spotbugs
 rc=$?
 
 REPORT_DIR=${OUTPUT_DIR:-"$DIR/../../target/findbugs"}
diff --git a/dev-support/checks/rat.sh b/dev-support/checks/rat.sh
index 86802e9db..9b55878ef 100755
--- a/dev-support/checks/rat.sh
+++ b/dev-support/checks/rat.sh
@@ -16,12 +16,14 @@
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 cd "$DIR/../.." || exit 1
 
+source "${DIR}/../find_maven.sh"
+
 REPORT_DIR=${OUTPUT_DIR:-"$DIR/../../target/rat"}
 mkdir -p "$REPORT_DIR"
 
 REPORT_FILE="$REPORT_DIR/summary.txt"
 
-mvn -B -fn org.apache.rat:apache-rat-plugin:0.13:check
+${MVN} -B -fn org.apache.rat:apache-rat-plugin:0.13:check
 
 cd "$DIR/../.." || exit 1
 
diff --git a/dev-support/checks/sonar.sh b/dev-support/checks/sonar.sh
index a1f9dbe60..d78c8875f 100755
--- a/dev-support/checks/sonar.sh
+++ b/dev-support/checks/sonar.sh
@@ -16,8 +16,11 @@
 DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
 cd "$DIR/../.." || exit 1
 
+source "${DIR}/../find_maven.sh"
+
 if [ ! "$SONAR_TOKEN" ]; then
   echo "SONAR_TOKEN environment variable should be set"
   exit 1
 fi
-mvn -B verify -DskipShade -DskipTests org.sonarsource.scanner.maven:sonar-maven-plugin:3.6.0.1398:sonar -Dsonar.host.url=https://sonarcloud.io -Dsonar.organization=apache -Dsonar.projectKey=apache-ratis
+
+${MVN} -B verify -DskipShade -DskipTests org.sonarsource.scanner.maven:sonar-maven-plugin:3.6.0.1398:sonar -Dsonar.host.url=https://sonarcloud.io -Dsonar.organization=apache -Dsonar.projectKey=apache-ratis
diff --git a/dev-support/checks/unit.sh b/dev-support/checks/unit.sh
index 50a1c712f..69edde287 100755
--- a/dev-support/checks/unit.sh
+++ b/dev-support/checks/unit.sh
@@ -19,6 +19,8 @@ set -o pipefail
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 cd "$DIR/../.." || exit 1
 
+source "${DIR}/../find_maven.sh"
+
 : ${ITERATIONS:="1"}
 
 declare -i ITERATIONS
@@ -39,7 +41,7 @@ for i in $(seq 1 ${ITERATIONS}); do
     mkdir -p "${REPORT_DIR}"
   fi
 
-  mvn -B -fae test "$@" \
+  ${MVN} -B -fae test "$@" \
     | tee "${REPORT_DIR}/output.log"
   irc=$?
 
diff --git a/dev-support/find_maven.sh b/dev-support/find_maven.sh
new file mode 100644
index 000000000..20b6462b1
--- /dev/null
+++ b/dev-support/find_maven.sh
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+function find_maven() {
+    if [ "$MAVEN" != "" ]; then
+      echo "${MAVEN}"
+    else
+      local DIR
+      DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+      ( cd "$DIR/.." || exit 1 ; echo "$( pwd -P )/mvnw" )
+    fi
+}
+
+MVN="$( find_maven )"
+export MVN
diff --git a/dev-support/make_rc.sh b/dev-support/make_rc.sh
index e02145555..b5bec51dd 100755
--- a/dev-support/make_rc.sh
+++ b/dev-support/make_rc.sh
@@ -27,17 +27,15 @@
 # Presumes your settings.xml all set up so can sign artifacts published to mvn, etc.
 set -e
 # Set mvn and mvnopts
-mvn=mvn
-if [ "$MAVEN" != "" ]; then
-  mvn="${MAVEN}"
-fi
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+source "${DIR}/find_maven.sh"
 mvnopts="-Xmx1g"
 if [ "$MAVEN_OPTS" != "" ]; then
   mvnopts="${MAVEN_OPTS}"
 fi
 
 mvnGet() {
-  ${mvn} -q -Dexec.executable="echo" -Dexec.args="\${${1}}" --non-recursive \
+  ${MVN} -q -Dexec.executable="echo" -Dexec.args="\${${1}}" --non-recursive \
     org.codehaus.mojo:exec-maven-plugin:1.6.0:exec 2>/dev/null
 }
 
@@ -90,7 +88,7 @@ fi
 
 
 mvnFun() {
-  MAVEN_OPTS="${mvnopts}" ${mvn} -Dmaven.repo.local="${repodir}" "$@"
+  MAVEN_OPTS="${mvnopts}" ${MVN} -Dmaven.repo.local="${repodir}" "$@"
 }
 
 prepare-src() {
diff --git a/dev-support/run-test-repeatedly.sh b/dev-support/run-test-repeatedly.sh
index dd3b87421..6824c1449 100755
--- a/dev-support/run-test-repeatedly.sh
+++ b/dev-support/run-test-repeatedly.sh
@@ -24,10 +24,12 @@ fi
 TEST_PATTERN=$1
 TEST_NAME=`echo ${TEST_PATTERN} | cut -d# -f 1`
 
-MVN="mvn"
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+source "${DIR}/find_maven.sh"
+
 set -ex
 
-mvn clean
+${MVN} clean
 
 for i in `seq 1 99`;
 do


[ratis] 12/18: RATIS-1770. Yield leader to higher priority peer by TransferLeadership (#845)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit f0d9d566e96a18776eb6e959d194dd7a228c3c93
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Sun Mar 19 14:34:31 2023 +0800

    RATIS-1770. Yield leader to higher priority peer by TransferLeadership (#845)
    
    (cherry picked from commit ee642b0b537629108227864f3458daf20950b9bc)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  68 ++----
 .../apache/ratis/server/impl/RaftServerImpl.java   |  11 +-
 .../org/apache/ratis/server/impl/ServerState.java  |   1 -
 .../ratis/server/impl/TransferLeadership.java      | 235 ++++++++++++++++++---
 .../ratis/server/impl/LeaderElectionTests.java     |   4 +-
 .../shell/cli/sh/election/TransferCommand.java     |   8 +-
 6 files changed, 234 insertions(+), 93 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 694183bcf..6627d8e7b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -26,8 +26,6 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -62,6 +60,7 @@ import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -424,6 +423,10 @@ class LeaderStateImpl implements LeaderState {
     return currentTerm;
   }
 
+  TermIndex getLastEntry() {
+    return server.getState().getLastEntry();
+  }
+
   @Override
   public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
     if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
@@ -658,46 +661,14 @@ class LeaderStateImpl implements LeaderState {
     return pendingStepDown.submitAsync(request);
   }
 
-  private synchronized void sendStartLeaderElection(RaftPeerId follower, TermIndex lastEntry) {
-    ServerState state = server.getState();
-    TermIndex currLastEntry = state.getLastEntry();
-    if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
-      LOG.warn("{} can not send StartLeaderElectionRequest to follower:{} because currLastEntry:{} " +
-              "did not match lastEntry:{}", this, follower, currLastEntry, lastEntry);
-      return;
-    }
-    LOG.info("{}: send StartLeaderElectionRequest to follower {} on term {}, lastEntry={}",
-        this, follower, currentTerm, lastEntry);
-
-    final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
-        server.getMemberId(), follower, lastEntry);
-    CompletableFuture.supplyAsync(() -> {
-      server.getLeaderElectionMetrics().onTransferLeadership();
-      try {
-        StartLeaderElectionReplyProto replyProto = server.getServerRpc().startLeaderElection(r);
-        LOG.info("{} received {} reply of StartLeaderElectionRequest from follower:{}",
-            this, replyProto.getServerReply().getSuccess() ? "success" : "fail", follower);
-      } catch (IOException e) {
-        LOG.warn("{} send StartLeaderElectionRequest throw exception", this, e);
+  private static LogAppender chooseUpToDateFollower(List<LogAppender> followers, TermIndex leaderLastEntry) {
+    for(LogAppender f : followers) {
+      if (TransferLeadership.isFollowerUpToDate(f.getFollower(), leaderLastEntry)
+          == TransferLeadership.Result.SUCCESS) {
+        return f;
       }
-      return null;
-    });
-  }
-
-  boolean sendStartLeaderElection(FollowerInfo followerInfo) {
-    final RaftPeerId followerId = followerInfo.getId();
-    final TermIndex leaderLastEntry = server.getState().getLastEntry();
-    if (leaderLastEntry == null) {
-      sendStartLeaderElection(followerId, null);
-      return true;
     }
-
-    final long followerMatchIndex = followerInfo.getMatchIndex();
-    if (followerMatchIndex >= leaderLastEntry.getIndex()) {
-      sendStartLeaderElection(followerId, leaderLastEntry);
-      return true;
-    }
-    return false;
+    return null;
   }
 
   private void prepare() {
@@ -778,7 +749,7 @@ class LeaderStateImpl implements LeaderState {
     } else {
       eventQueue.submit(checkStagingEvent);
     }
-    server.getTransferLeadership().onFollowerAppendEntriesReply(this, follower);
+    server.getTransferLeadership().onFollowerAppendEntriesReply(follower);
   }
 
   @Override
@@ -1044,7 +1015,7 @@ class LeaderStateImpl implements LeaderState {
     }
     final int leaderPriority = leader.getPriority();
 
-    FollowerInfo highestPriorityInfo = null;
+    final List<LogAppender> highestPriorityInfos = new ArrayList<>();
     int highestPriority = Integer.MIN_VALUE;
     for (LogAppender logAppender : senders) {
       final RaftPeer follower = conf.getPeer(logAppender.getFollowerId());
@@ -1053,12 +1024,17 @@ class LeaderStateImpl implements LeaderState {
       }
       final int followerPriority = follower.getPriority();
       if (followerPriority > leaderPriority && followerPriority >= highestPriority) {
-        highestPriority = followerPriority;
-        highestPriorityInfo = logAppender.getFollower();
+        if (followerPriority > highestPriority) {
+          highestPriority = followerPriority;
+          highestPriorityInfos.clear();
+        }
+        highestPriorityInfos.add(logAppender);
       }
     }
-    if (highestPriorityInfo != null) {
-      sendStartLeaderElection(highestPriorityInfo);
+    final TermIndex leaderLastEntry = getLastEntry();
+    final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos, leaderLastEntry);
+    if (appender != null) {
+      server.getTransferLeadership().start(appender);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c64aaebba..5ecbc36d4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -225,7 +225,7 @@ class RaftServerImpl implements RaftServer.Division,
         .setProperties(getRaftServer().getProperties())
         .build());
 
-    this.transferLeadership = new TransferLeadership(this);
+    this.transferLeadership = new TransferLeadership(this, properties);
     this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
     this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
 
@@ -1074,10 +1074,6 @@ class RaftServerImpl implements RaftServer.Division,
     return transferLeadership.isSteppingDown();
   }
 
-  void finishTransferLeadership() {
-    transferLeadership.finish(state.getLeaderId(), false);
-  }
-
   CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
       throws IOException {
     if (request.getNewLeader() == null) {
@@ -1463,6 +1459,10 @@ class RaftServerImpl implements RaftServer.Division,
     return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex());
   }
 
+  ExecutorService getServerExecutor() {
+    return serverExecutor;
+  }
+
   @SuppressWarnings("checkstyle:parameternumber")
   private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
       RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
@@ -1852,5 +1852,6 @@ class RaftServerImpl implements RaftServer.Division,
 
   void onGroupLeaderElected() {
     this.firstElectionSinceStartup.set(false);
+    transferLeadership.complete(TransferLeadership.Result.SUCCESS);
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 3ced7b9c7..fa685325e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -317,7 +317,6 @@ class ServerState implements Closeable {
       LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
           getMemberId(), oldLeaderId, newLeaderId, getCurrentTerm(), op, suffix);
       if (newLeaderId != null) {
-        server.finishTransferLeadership();
         server.onGroupLeaderElected();
       }
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index c5c1a46cb..beab02b67 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -17,27 +17,110 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.StringUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 public class TransferLeadership {
   public static final Logger LOG = LoggerFactory.getLogger(TransferLeadership.class);
 
+  private static class Context {
+    private final TransferLeadershipRequest request;
+    private final Supplier<LogAppender> transferee;
+
+    Context(TransferLeadershipRequest request, Supplier<LogAppender> transferee) {
+      this.request = request;
+      this.transferee = transferee;
+    }
+
+    TransferLeadershipRequest getRequest() {
+      return request;
+    }
+
+    RaftPeerId getTransfereeId() {
+      return request.getNewLeader();
+    }
+
+    LogAppender getTransfereeLogAppender() {
+      return transferee.get();
+    }
+  }
+
+  static class Result {
+    enum Type {
+      SUCCESS,
+      DIFFERENT_LEADER,
+      NULL_FOLLOWER,
+      NULL_LOG_APPENDER,
+      NOT_UP_TO_DATE,
+      TIMED_OUT,
+      FAILED_TO_START,
+      COMPLETED_EXCEPTIONALLY,
+    }
+
+    static final Result SUCCESS = new Result(Type.SUCCESS);
+    static final Result DIFFERENT_LEADER = new Result(Type.DIFFERENT_LEADER);
+    static final Result NULL_FOLLOWER = new Result(Type.NULL_FOLLOWER);
+    static final Result NULL_LOG_APPENDER = new Result(Type.NULL_LOG_APPENDER);
+
+    private final Type type;
+    private final String errorMessage;
+    private final Throwable exception;
+
+    private Result(Type type) {
+      this(type, null);
+    }
+
+    private Result(Type type, String errorMessage, Throwable exception) {
+      this.type = type;
+      this.errorMessage = errorMessage;
+      this.exception = exception;
+    }
+
+    Result(Type type, String errorMessage) {
+      this(type, errorMessage, null);
+    }
+
+    Result(Throwable t) {
+      this(Type.COMPLETED_EXCEPTIONALLY, null, t);
+    }
+
+    Type getType() {
+      return type;
+    }
+
+    @Override
+    public String toString() {
+      if (exception == null) {
+        return type + (errorMessage == null ? "" : "(" + errorMessage + ")");
+      }
+      return type + ": " + StringUtils.stringifyException(exception);
+    }
+  }
+
   class PendingRequest {
     private final TransferLeadershipRequest request;
     private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
@@ -54,17 +137,20 @@ public class TransferLeadership {
       return replyFuture;
     }
 
-    void complete(RaftPeerId currentLeader, boolean timeout) {
+    void complete(Result result) {
       if (replyFuture.isDone()) {
         return;
       }
-
+      final RaftPeerId currentLeader = server.getState().getLeaderId();
       if (currentLeader != null && currentLeader.equals(request.getNewLeader())) {
         replyFuture.complete(server.newSuccessReply(request));
-      } else if (timeout) {
+      } else {
+        if (result.getType() == Result.Type.SUCCESS) {
+          result = Result.DIFFERENT_LEADER;
+        }
         final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId()
             + ": Failed to transfer leadership to " + request.getNewLeader()
-            + " (timed out " + request.getTimeoutMs() + "ms): current leader is " + currentLeader);
+            + " (the current leader is " + currentLeader + "): " + result);
         replyFuture.complete(server.newExceptionReply(request, tle));
       }
     }
@@ -76,11 +162,14 @@ public class TransferLeadership {
   }
 
   private final RaftServerImpl server;
+  private final TimeDuration requestTimeout;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
   private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
 
-  TransferLeadership(RaftServerImpl server) {
+  TransferLeadership(RaftServerImpl server, RaftProperties properties) {
     this.server = server;
+    this.requestTimeout = RaftServerConfigKeys.Rpc.requestTimeout(properties);
   }
 
   private Optional<RaftPeerId> getTransferee() {
@@ -92,50 +181,130 @@ public class TransferLeadership {
     return pending.get() != null;
   }
 
-  void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo follower) {
-    final Optional<RaftPeerId> transferee = getTransferee();
-    // If the transferee has just append some entries and becomes up-to-date,
-    // send StartLeaderElection to it
-    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()
-        && leaderState.sendStartLeaderElection(follower)) {
+  static Result isFollowerUpToDate(FollowerInfo follower, TermIndex leaderLastEntry) {
+    if (follower == null) {
+      return Result.NULL_FOLLOWER;
+    } else if (leaderLastEntry != null) {
+      final long followerMatchIndex = follower.getMatchIndex();
+      if (followerMatchIndex < leaderLastEntry.getIndex()) {
+        return new Result(Result.Type.NOT_UP_TO_DATE, "followerMatchIndex = " + followerMatchIndex
+            + " < leaderLastEntry.getIndex() = " + leaderLastEntry.getIndex());
+      }
+    }
+    return Result.SUCCESS;
+  }
+
+  private Result sendStartLeaderElection(FollowerInfo follower) {
+    final TermIndex lastEntry = server.getState().getLastEntry();
+
+    final Result result = isFollowerUpToDate(follower, lastEntry);
+    if (result != Result.SUCCESS) {
+      return result;
+    }
+
+    final RaftPeerId transferee = follower.getId();
+    LOG.info("{}: sendStartLeaderElection to follower {}, lastEntry={}",
+        server.getMemberId(), transferee, lastEntry);
+
+    final RaftProtos.StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
+        server.getMemberId(), transferee, lastEntry);
+    final CompletableFuture<RaftProtos.StartLeaderElectionReplyProto> f = CompletableFuture.supplyAsync(() -> {
+      server.getLeaderElectionMetrics().onTransferLeadership();
+      try {
+        return server.getServerRpc().startLeaderElection(r);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to sendStartLeaderElection to follower " + transferee, e);
+      }
+    }, server.getServerExecutor()).whenComplete((reply, exception) -> {
+      if (reply != null) {
+        LOG.info("{}: Received startLeaderElection reply from {}: success? {}",
+            server.getMemberId(), transferee, reply.getServerReply().getSuccess());
+      } else if (exception != null) {
+        LOG.warn(server.getMemberId() + ": Failed to startLeaderElection for " + transferee, exception);
+      }
+    });
+
+    if (f.isCompletedExceptionally()) { // already failed
+      try {
+        f.join();
+      } catch (Throwable t) {
+        return new Result(t);
+      }
+    }
+    return Result.SUCCESS;
+  }
+
+  /**
+   * If the transferee has just append some entries and becomes up-to-date,
+   * send StartLeaderElection to it
+   */
+  void onFollowerAppendEntriesReply(FollowerInfo follower) {
+    if (!getTransferee().filter(t -> t.equals(follower.getId())).isPresent()) {
+      return;
+    }
+    final Result result = sendStartLeaderElection(follower);
+    if (result == Result.SUCCESS) {
       LOG.info("{}: sent StartLeaderElection to transferee {} after received AppendEntriesResponse",
-          server.getMemberId(), transferee.get());
+          server.getMemberId(), follower.getId());
     }
   }
 
-  private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId transferee) {
+  private Result tryTransferLeadership(Context context) {
+    final RaftPeerId transferee = context.getTransfereeId();
     LOG.info("{}: start transferring leadership to {}", server.getMemberId(), transferee);
-    final LogAppender appender = leaderState.getLogAppender(transferee).orElse(null);
-
+    final LogAppender appender = context.getTransfereeLogAppender();
     if (appender == null) {
-      LOG.error("{}: cannot find LogAppender for transferee {}", server.getMemberId(), transferee);
-      return;
+      return Result.NULL_LOG_APPENDER;
     }
     final FollowerInfo follower = appender.getFollower();
-    if (leaderState.sendStartLeaderElection(follower)) {
-      LOG.info("{}: sent StartLeaderElection to transferee {} immediately as it already has up-to-date log",
-          server.getMemberId(), transferee);
-    } else {
-      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee {} is not up-to-date",
-          server.getMemberId(), transferee);
+    final Result result = sendStartLeaderElection(follower);
+    if (result.getType() == Result.Type.SUCCESS) {
+      LOG.info("{}: {} sent StartLeaderElection to transferee {} immediately as it already has up-to-date log",
+          server.getMemberId(), result, transferee);
+    } else if (result.getType() == Result.Type.NOT_UP_TO_DATE) {
+      LOG.info("{}: {} notifying LogAppender to send AppendEntries to transferee {}",
+          server.getMemberId(), result, transferee);
       appender.notifyLogAppender();
     }
+    return result;
+  }
+
+  void start(LogAppender transferee) {
+    // TransferLeadership will block client request, so we don't want wait too long.
+    // If everything goes well, transferee should be elected within the min rpc timeout.
+    final long timeout = server.properties().minRpcTimeoutMs();
+    final TransferLeadershipRequest request = new TransferLeadershipRequest(ClientId.emptyClientId(),
+        server.getId(), server.getMemberId().getGroupId(), 0, transferee.getFollowerId(), timeout);
+    start(new Context(request, () -> transferee));
   }
 
   CompletableFuture<RaftClientReply> start(LeaderStateImpl leaderState, TransferLeadershipRequest request) {
+    final Context context = new Context(request,
+        JavaUtils.memoize(() -> leaderState.getLogAppender(request.getNewLeader()).orElse(null)));
+    return start(context);
+  }
+
+  private CompletableFuture<RaftClientReply> start(Context context) {
+    final TransferLeadershipRequest request = context.getRequest();
     final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
     final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
     if (previous != null) {
       return createReplyFutureFromPreviousRequest(request, previous);
     }
-    tryTransferLeadership(leaderState, request.getNewLeader());
-
-    // if timeout is not specified in request, default to random election timeout
-    final TimeDuration timeout = request.getTimeoutMs() == 0 ? server.getRandomElectionTimeout()
-        : TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
-    scheduler.onTimeout(timeout, () -> finish(server.getState().getLeaderId(), true),
-        LOG, () -> "Failed to transfer leadership to " + request.getNewLeader() + ": timeout after " + timeout);
-    return supplier.get().getReplyFuture();
+    final PendingRequest pendingRequest = supplier.get();
+    final Result result = tryTransferLeadership(context);
+    final Result.Type type = result.getType();
+    if (type != Result.Type.SUCCESS && type != Result.Type.NOT_UP_TO_DATE) {
+      pendingRequest.complete(result);
+    } else {
+      // if timeout is not specified in request, use default request timeout
+      final TimeDuration timeout = request.getTimeoutMs() == 0 ? requestTimeout
+          : TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
+      scheduler.onTimeout(timeout, () -> complete(new Result(Result.Type.TIMED_OUT,
+              timeout.toString(TimeUnit.SECONDS, 3))),
+          LOG, () -> "Failed to handle timeout");
+    }
+    return pendingRequest.getReplyFuture();
   }
 
   private CompletableFuture<RaftClientReply> createReplyFutureFromPreviousRequest(
@@ -158,8 +327,8 @@ public class TransferLeadership {
     }
   }
 
-  void finish(RaftPeerId currentLeader, boolean timeout) {
+  void complete(Result result) {
     Optional.ofNullable(pending.getAndSet(null))
-        .ifPresent(r -> r.complete(currentLeader, timeout));
+        .ifPresent(r -> r.complete(result));
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index f77d96f5e..76761249b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -259,7 +259,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
               long cost = System.currentTimeMillis() - start;
               Assert.assertTrue(cost > timeoutMs);
               Assert.assertTrue(e.getMessage().contains("Failed to transfer leadership to"));
-              Assert.assertTrue(e.getMessage().contains("timed out"));
+              Assert.assertTrue(e.getMessage().contains(TransferLeadership.Result.Type.TIMED_OUT.toString()));
             }
 
             return true;
@@ -282,7 +282,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
 
         // after transfer timeout, leader should accept request
         RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
-        Assert.assertTrue(reply.getReplierId().equals(leader.getId().toString()));
+        Assert.assertEquals(leader.getId().toString(), reply.getReplierId());
         Assert.assertTrue(reply.isSuccess());
 
         deIsolate(cluster, newLeader.getId());
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
index b5180548c..7dba5ae9e 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
@@ -58,12 +58,8 @@ public class TransferCommand extends AbstractRatisCommand {
     super.run(cl);
 
     String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
-    // TODO: Default timeout should be set to 0, which means let server decide (based on election timeout).
-    //       However, occasionally the request could timeout too fast while the transfer is in progress.
-    //       i.e. request timeout doesn't mean transfer leadership has failed.
-    //       Currently, Ratis shell returns merely based on the result of the request.
-    //       So we set a larger default timeout here (3s).
-    final TimeDuration timeoutDefault = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+    // Default timeout is 0, which means let server decide (will use default request timeout).
+    final TimeDuration timeoutDefault = TimeDuration.ZERO;
     // Default timeout for legacy mode matches with the legacy command (version 2.4.x and older).
     final TimeDuration timeoutLegacy = TimeDuration.valueOf(60, TimeUnit.SECONDS);
     final Optional<TimeDuration> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ? Optional.empty() :


[ratis] 02/18: RATIS-1811. Improve StreamObserver's log messages (#849)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 252446070a1461ebc8e807b78f4aaad759700507
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Thu Mar 9 10:53:13 2023 +0800

    RATIS-1811. Improve StreamObserver's log messages (#849)
    
    (cherry picked from commit 1ca77a1ca3736fab666094ef79048d338d966554)
---
 .../java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 8a58cdd9c..08386f83c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -148,7 +148,7 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
     }
     @Override
     public void onError(Throwable t) {
-      GrpcUtil.warn(LOG, () -> getId() + ": installSnapshot onError, lastRequest: " + getPreviousRequestString(), t);
+      GrpcUtil.warn(LOG, () -> getId() + ": "+ op + " onError, lastRequest: " + getPreviousRequestString(), t);
       if (isClosed.compareAndSet(false, true)) {
         Status status = Status.fromThrowable(t);
         if (status != null && status.getCode() != Status.Code.CANCELLED) {


[ratis] 07/18: RATIS-1796. Fix TransferLeadership stopped by heartbeat from old leader (#844)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit ee8aef255cf622c49297ca7b90d44db06c5dfe6d
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Tue Mar 14 10:18:23 2023 +0800

    RATIS-1796. Fix TransferLeadership stopped by heartbeat from old leader (#844)
    
    (cherry picked from commit 05c0db0444eaade1b18e49465c3cde2ea5336485)
---
 .../org/apache/ratis/server/impl/LeaderElection.java     | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 5f79940cb..86ec62325 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -187,16 +187,23 @@ class LeaderElection implements Runnable {
 
   private final RaftServerImpl server;
   private final boolean skipPreVote;
+  private final ConfAndTerm round0;
 
-  LeaderElection(RaftServerImpl server, boolean skipPreVote) {
+  LeaderElection(RaftServerImpl server, boolean force) {
     this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
     this.lifeCycle = new LifeCycle(this);
     this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
         .setThreadGroup(server.getThreadGroup()).build();
     this.server = server;
-    this.skipPreVote = skipPreVote ||
+    this.skipPreVote = force ||
         !RaftServerConfigKeys.LeaderElection.preVote(
             server.getRaftServer().getProperties());
+    try {
+      // increase term of the candidate in advance if it's forced to election
+      this.round0 = force ? server.getState().initElection(Phase.ELECTION) : null;
+    } catch (IOException e) {
+      throw new IllegalStateException(name + ": Failed to initialize election", e);
+    }
   }
 
   void start() {
@@ -303,7 +310,10 @@ class LeaderElection implements Runnable {
       if (!shouldRun()) {
         return false;
       }
-      final ConfAndTerm confAndTerm = server.getState().initElection(phase);
+      // If round0 is non-null, we have already called initElection in the constructor,
+      // reuse round0 to avoid initElection again for the first round
+      final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ?
+          round0 : server.getState().initElection(phase);
       electionTerm = confAndTerm.getTerm();
       conf = confAndTerm.getConf();
     }


[ratis] 13/18: RATIS-1795. Add and distribute a Maven Wrapper (#856)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit c727be372dc9cdb4212fdce39991a7fce684857a
Author: tison <wa...@gmail.com>
AuthorDate: Mon Mar 20 17:45:43 2023 +0800

    RATIS-1795. Add and distribute a Maven Wrapper (#856)
    
    (cherry picked from commit bef892426333a2b6dc0088797a57a6e80d549cec)
---
 .gitignore                               |   1 +
 .mvn/wrapper/maven-wrapper.properties    |  18 ++
 BUILDING.md                              |   6 +-
 mvnw                                     | 308 +++++++++++++++++++++++++++++++
 mvnw.cmd                                 | 205 ++++++++++++++++++++
 ratis-assembly/src/main/assembly/src.xml |  10 +
 6 files changed, 546 insertions(+), 2 deletions(-)

diff --git a/.gitignore b/.gitignore
index 6af045672..937945310 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,4 +17,5 @@ target
 build
 patchprocess
 dependency-reduced-pom.xml
+.mvn/wrapper/maven-wrapper.jar
 .vscode/
diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
new file mode 100644
index 000000000..08ea486aa
--- /dev/null
+++ b/.mvn/wrapper/maven-wrapper.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.0/apache-maven-3.9.0-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar
diff --git a/BUILDING.md b/BUILDING.md
index b91ffba6f..68b4c0b88 100644
--- a/BUILDING.md
+++ b/BUILDING.md
@@ -14,8 +14,10 @@
 
 # Building
 
-Apache Ratis uses Apache Maven to build the artifacts.
-It is required to have Maven 3.3.9 or later.
+Apache Ratis uses Apache Maven 3.3.9 or later to build the artifacts.
+You can use the bundled Maven Wrapper to build without a pre-installed Maven,
+by replacing `mvn` with the `mvnw` (*nix) or `mvnw.cmd` (Windows) script in the root path.
+
 Apache Ratis is written in Java 8.
 Therefore, it as well requires Java 8 or later.
 
diff --git a/mvnw b/mvnw
new file mode 100755
index 000000000..8d937f4c1
--- /dev/null
+++ b/mvnw
@@ -0,0 +1,308 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Apache Maven Wrapper startup batch script, version 3.2.0
+#
+# Required ENV vars:
+# ------------------
+#   JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+#   MAVEN_OPTS - parameters passed to the Java VM when running Maven
+#     e.g. to debug Maven itself, use
+#       set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+#   MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+  if [ -f /usr/local/etc/mavenrc ] ; then
+    . /usr/local/etc/mavenrc
+  fi
+
+  if [ -f /etc/mavenrc ] ; then
+    . /etc/mavenrc
+  fi
+
+  if [ -f "$HOME/.mavenrc" ] ; then
+    . "$HOME/.mavenrc"
+  fi
+
+fi
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "$(uname)" in
+  CYGWIN*) cygwin=true ;;
+  MINGW*) mingw=true;;
+  Darwin*) darwin=true
+    # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+    if [ -z "$JAVA_HOME" ]; then
+      if [ -x "/usr/libexec/java_home" ]; then
+        JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME
+      else
+        JAVA_HOME="/Library/Java/Home"; export JAVA_HOME
+      fi
+    fi
+    ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=$(java-config --jre-home)
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+  [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] &&
+    JAVA_HOME="$(cd "$JAVA_HOME" || (echo "cannot cd into $JAVA_HOME."; exit 1); pwd)"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+  javaExecutable="$(which javac)"
+  if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then
+    # readlink(1) is not available as standard on Solaris 10.
+    readLink=$(which readlink)
+    if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then
+      if $darwin ; then
+        javaHome="$(dirname "\"$javaExecutable\"")"
+        javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac"
+      else
+        javaExecutable="$(readlink -f "\"$javaExecutable\"")"
+      fi
+      javaHome="$(dirname "\"$javaExecutable\"")"
+      javaHome=$(expr "$javaHome" : '\(.*\)/bin')
+      JAVA_HOME="$javaHome"
+      export JAVA_HOME
+    fi
+  fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)"
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo "Error: JAVA_HOME is not defined correctly." >&2
+  echo "  We cannot execute $JAVACMD" >&2
+  exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+  echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+  if [ -z "$1" ]
+  then
+    echo "Path not specified to find_maven_basedir"
+    return 1
+  fi
+
+  basedir="$1"
+  wdir="$1"
+  while [ "$wdir" != '/' ] ; do
+    if [ -d "$wdir"/.mvn ] ; then
+      basedir=$wdir
+      break
+    fi
+    # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+    if [ -d "${wdir}" ]; then
+      wdir=$(cd "$wdir/.." || exit 1; pwd)
+    fi
+    # end of workaround
+  done
+  printf '%s' "$(cd "$basedir" || exit 1; pwd)"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+  if [ -f "$1" ]; then
+    # Remove \r in case we run on Windows within Git Bash
+    # and check out the repository with auto CRLF management
+    # enabled. Otherwise, we may read lines that are delimited with
+    # \r\n and produce $'-Xarg\r' rather than -Xarg due to word
+    # splitting rules.
+    tr -s '\r\n' ' ' < "$1"
+  fi
+}
+
+log() {
+  if [ "$MVNW_VERBOSE" = true ]; then
+    printf '%s\n' "$1"
+  fi
+}
+
+BASE_DIR=$(find_maven_basedir "$(dirname "$0")")
+if [ -z "$BASE_DIR" ]; then
+  exit 1;
+fi
+
+MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}; export MAVEN_PROJECTBASEDIR
+log "$MAVEN_PROJECTBASEDIR"
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+wrapperJarPath="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar"
+if [ -r "$wrapperJarPath" ]; then
+    log "Found $wrapperJarPath"
+else
+    log "Couldn't find $wrapperJarPath, downloading it ..."
+
+    if [ -n "$MVNW_REPOURL" ]; then
+      wrapperUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
+    else
+      wrapperUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
+    fi
+    while IFS="=" read -r key value; do
+      # Remove '\r' from value to allow usage on windows as IFS does not consider '\r' as a separator ( considers space, tab, new line ('\n'), and custom '=' )
+      safeValue=$(echo "$value" | tr -d '\r')
+      case "$key" in (wrapperUrl) wrapperUrl="$safeValue"; break ;;
+      esac
+    done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties"
+    log "Downloading from: $wrapperUrl"
+
+    if $cygwin; then
+      wrapperJarPath=$(cygpath --path --windows "$wrapperJarPath")
+    fi
+
+    if command -v wget > /dev/null; then
+        log "Found wget ... using wget"
+        [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--quiet"
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            wget $QUIET "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
+        else
+            wget $QUIET --http-user="$MVNW_USERNAME" --http-password="$MVNW_PASSWORD" "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath"
+        fi
+    elif command -v curl > /dev/null; then
+        log "Found curl ... using curl"
+        [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--silent"
+        if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+            curl $QUIET -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath"
+        else
+            curl $QUIET --user "$MVNW_USERNAME:$MVNW_PASSWORD" -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath"
+        fi
+    else
+        log "Falling back to using Java to download"
+        javaSource="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.java"
+        javaClass="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.class"
+        # For Cygwin, switch paths to Windows format before running javac
+        if $cygwin; then
+          javaSource=$(cygpath --path --windows "$javaSource")
+          javaClass=$(cygpath --path --windows "$javaClass")
+        fi
+        if [ -e "$javaSource" ]; then
+            if [ ! -e "$javaClass" ]; then
+                log " - Compiling MavenWrapperDownloader.java ..."
+                ("$JAVA_HOME/bin/javac" "$javaSource")
+            fi
+            if [ -e "$javaClass" ]; then
+                log " - Running MavenWrapperDownloader.java ..."
+                ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$wrapperUrl" "$wrapperJarPath") || rm -f "$wrapperJarPath"
+            fi
+        fi
+    fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+# If specified, validate the SHA-256 sum of the Maven wrapper jar file
+wrapperSha256Sum=""
+while IFS="=" read -r key value; do
+  case "$key" in (wrapperSha256Sum) wrapperSha256Sum=$value; break ;;
+  esac
+done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties"
+if [ -n "$wrapperSha256Sum" ]; then
+  wrapperSha256Result=false
+  if command -v sha256sum > /dev/null; then
+    if echo "$wrapperSha256Sum  $wrapperJarPath" | sha256sum -c > /dev/null 2>&1; then
+      wrapperSha256Result=true
+    fi
+  elif command -v shasum > /dev/null; then
+    if echo "$wrapperSha256Sum  $wrapperJarPath" | shasum -a 256 -c > /dev/null 2>&1; then
+      wrapperSha256Result=true
+    fi
+  else
+    echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available."
+    echo "Please install either command, or disable validation by removing 'wrapperSha256Sum' from your maven-wrapper.properties."
+    exit 1
+  fi
+  if [ $wrapperSha256Result = false ]; then
+    echo "Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised." >&2
+    echo "Investigate or delete $wrapperJarPath to attempt a clean download." >&2
+    echo "If you updated your Maven version, you need to update the specified wrapperSha256Sum property." >&2
+    exit 1
+  fi
+fi
+
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=$(cygpath --path --windows "$JAVA_HOME")
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=$(cygpath --path --windows "$CLASSPATH")
+  [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+    MAVEN_PROJECTBASEDIR=$(cygpath --path --windows "$MAVEN_PROJECTBASEDIR")
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $*"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+# shellcheck disable=SC2086 # safe args
+exec "$JAVACMD" \
+  $MAVEN_OPTS \
+  $MAVEN_DEBUG_OPTS \
+  -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+  "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+  ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/mvnw.cmd b/mvnw.cmd
new file mode 100644
index 000000000..f80fbad3e
--- /dev/null
+++ b/mvnw.cmd
@@ -0,0 +1,205 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM    http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Apache Maven Wrapper startup batch script, version 3.2.0
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM     e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on"  echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %*
+if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %*
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set WRAPPER_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
+
+FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+    IF "%%A"=="wrapperUrl" SET WRAPPER_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Found %WRAPPER_JAR%
+    )
+) else (
+    if not "%MVNW_REPOURL%" == "" (
+        SET WRAPPER_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar"
+    )
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Couldn't find %WRAPPER_JAR%, downloading it ...
+        echo Downloading from: %WRAPPER_URL%
+    )
+
+    powershell -Command "&{"^
+		"$webclient = new-object System.Net.WebClient;"^
+		"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+		"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+		"}"^
+		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%WRAPPER_URL%', '%WRAPPER_JAR%')"^
+		"}"
+    if "%MVNW_VERBOSE%" == "true" (
+        echo Finished downloading %WRAPPER_JAR%
+    )
+)
+@REM End of extension
+
+@REM If specified, validate the SHA-256 sum of the Maven wrapper jar file
+SET WRAPPER_SHA_256_SUM=""
+FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+    IF "%%A"=="wrapperSha256Sum" SET WRAPPER_SHA_256_SUM=%%B
+)
+IF NOT %WRAPPER_SHA_256_SUM%=="" (
+    powershell -Command "&{"^
+       "$hash = (Get-FileHash \"%WRAPPER_JAR%\" -Algorithm SHA256).Hash.ToLower();"^
+       "If('%WRAPPER_SHA_256_SUM%' -ne $hash){"^
+       "  Write-Output 'Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised.';"^
+       "  Write-Output 'Investigate or delete %WRAPPER_JAR% to attempt a clean download.';"^
+       "  Write-Output 'If you updated your Maven version, you need to update the specified wrapperSha256Sum property.';"^
+       "  exit 1;"^
+       "}"^
+       "}"
+    if ERRORLEVEL 1 goto error
+)
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% ^
+  %JVM_CONFIG_MAVEN_PROPS% ^
+  %MAVEN_OPTS% ^
+  %MAVEN_DEBUG_OPTS% ^
+  -classpath %WRAPPER_JAR% ^
+  "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^
+  %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat"
+if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%"=="on" pause
+
+if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE%
+
+cmd /C exit /B %ERROR_CODE%
diff --git a/ratis-assembly/src/main/assembly/src.xml b/ratis-assembly/src/main/assembly/src.xml
index b6e2e576c..37ea22b02 100644
--- a/ratis-assembly/src/main/assembly/src.xml
+++ b/ratis-assembly/src/main/assembly/src.xml
@@ -93,14 +93,24 @@
       <directory>${project.basedir}/..</directory>
       <outputDirectory>.</outputDirectory>
       <includes>
+        <include>.mvn/wrapper/maven-wrapper.properties</include>
         <include>BUILDING.md</include>
         <include>LICENSE</include>
         <include>NOTICE</include>
         <include>README.md</include>
+        <include>mvnw.cmd</include>
         <include>pom.xml</include>
         <include>start-build-env.sh</include>
       </includes>
       <fileMode>0644</fileMode>
     </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/..</directory>
+      <outputDirectory>.</outputDirectory>
+      <includes>
+        <include>mvnw</include>
+      </includes>
+      <fileMode>0755</fileMode>
+    </fileSet>
   </fileSets>
 </assembly>


[ratis] 14/18: RATIS-1817. Do not send StartLeaderElection when leaderLastEntry is null (#857)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit f50965d56103d6e80a050de32219bfca9b74970c
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Mon Mar 20 18:02:53 2023 +0800

    RATIS-1817. Do not send StartLeaderElection when leaderLastEntry is null (#857)
    
    (cherry picked from commit 050dc44b6224054dda01553f5cc9a69a7e32b2da)
---
 .../org/apache/ratis/server/impl/TransferLeadership.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index beab02b67..74ada6541 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -184,12 +184,16 @@ public class TransferLeadership {
   static Result isFollowerUpToDate(FollowerInfo follower, TermIndex leaderLastEntry) {
     if (follower == null) {
       return Result.NULL_FOLLOWER;
-    } else if (leaderLastEntry != null) {
-      final long followerMatchIndex = follower.getMatchIndex();
-      if (followerMatchIndex < leaderLastEntry.getIndex()) {
-        return new Result(Result.Type.NOT_UP_TO_DATE, "followerMatchIndex = " + followerMatchIndex
-            + " < leaderLastEntry.getIndex() = " + leaderLastEntry.getIndex());
-      }
+    }
+    if (leaderLastEntry == null) {
+      // The transferee is expecting leaderLastEntry to be non-null,
+      // return NOT_UP_TO_DATE to indicate TransferLeadership should wait.
+      return new Result(Result.Type.NOT_UP_TO_DATE, "leaderLastEntry is null");
+    }
+    final long followerMatchIndex = follower.getMatchIndex();
+    if (followerMatchIndex < leaderLastEntry.getIndex()) {
+      return new Result(Result.Type.NOT_UP_TO_DATE, "followerMatchIndex = " + followerMatchIndex
+          + " < leaderLastEntry.getIndex() = " + leaderLastEntry.getIndex());
     }
     return Result.SUCCESS;
   }


[ratis] 04/18: RATIS-1808. Rerun PreVote when Vote is timed out (#846)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 3ed7c481c068c0a1a1d67d3a32cdf28bc27e1ce8
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Fri Mar 10 00:42:47 2023 +0800

    RATIS-1808. Rerun PreVote when Vote is timed out (#846)
    
    (cherry picked from commit 601d01deb517e7d7093fcafd91589aa14c2dd72d)
---
 .../apache/ratis/server/impl/LeaderElection.java   | 81 +++++++++++-----------
 1 file changed, 39 insertions(+), 42 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index ced72604a..5f79940cb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -234,11 +234,12 @@ class LeaderElection implements Runnable {
       return;
     }
 
-    final Timer.Context electionContext = server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
-    try {
-      if (skipPreVote || askForVotes(Phase.PRE_VOTE)) {
-        if (askForVotes(Phase.ELECTION)) {
-          server.changeToLeader();
+    try (Timer.Context ignored = server.getLeaderElectionMetrics().getLeaderElectionTimer().time()) {
+      for (int round = 0; shouldRun(); round++) {
+        if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
+          if (askForVotes(Phase.ELECTION, round)) {
+            server.changeToLeader();
+          }
         }
       }
     } catch(Exception e) {
@@ -257,7 +258,6 @@ class LeaderElection implements Runnable {
       }
     } finally {
       // Update leader election completion metric(s).
-      electionContext.stop();
       server.getLeaderElectionMetrics().onNewLeaderElectionCompletion();
       lifeCycle.checkStateAndClose(() -> {});
     }
@@ -296,48 +296,45 @@ class LeaderElection implements Runnable {
   }
 
   /** Send requestVote rpc to all other peers for the given phase. */
-  private boolean askForVotes(Phase phase) throws InterruptedException, IOException {
-    for(int round = 0; shouldRun(); round++) {
-      final long electionTerm;
-      final RaftConfigurationImpl conf;
-      synchronized (server) {
-        if (!shouldRun()) {
-          return false;
-        }
-        final ConfAndTerm confAndTerm = server.getState().initElection(phase);
-        electionTerm = confAndTerm.getTerm();
-        conf = confAndTerm.getConf();
+  private boolean askForVotes(Phase phase, int round) throws InterruptedException, IOException {
+    final long electionTerm;
+    final RaftConfigurationImpl conf;
+    synchronized (server) {
+      if (!shouldRun()) {
+        return false;
       }
+      final ConfAndTerm confAndTerm = server.getState().initElection(phase);
+      electionTerm = confAndTerm.getTerm();
+      conf = confAndTerm.getConf();
+    }
 
-      LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
-      final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
-      LOG.info("{} {} round {}: result {}", this, phase, round, r);
+    LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
+    final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
+    LOG.info("{} {} round {}: result {}", this, phase, round, r);
 
-      synchronized (server) {
-        if (!shouldRun(electionTerm)) {
-          return false; // term already passed or this should not run anymore.
-        }
+    synchronized (server) {
+      if (!shouldRun(electionTerm)) {
+        return false; // term already passed or this should not run anymore.
+      }
 
-        switch (r.getResult()) {
-          case PASSED:
-            return true;
-          case NOT_IN_CONF:
-          case SHUTDOWN:
-            server.getRaftServer().close();
-            server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto());
-            return false;
-          case TIMEOUT:
-            continue; // should retry
-          case REJECTED:
-          case DISCOVERED_A_NEW_TERM:
-            final long term = r.maxTerm(server.getState().getCurrentTerm());
-            server.changeToFollowerAndPersistMetadata(term, false, r);
-            return false;
-          default: throw new IllegalArgumentException("Unable to process result " + r.result);
-        }
+      switch (r.getResult()) {
+        case PASSED:
+          return true;
+        case NOT_IN_CONF:
+        case SHUTDOWN:
+          server.getRaftServer().close();
+          server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto());
+          return false;
+        case TIMEOUT:
+          return false; // should retry
+        case REJECTED:
+        case DISCOVERED_A_NEW_TERM:
+          final long term = r.maxTerm(server.getState().getCurrentTerm());
+          server.changeToFollowerAndPersistMetadata(term, false, r);
+          return false;
+        default: throw new IllegalArgumentException("Unable to process result " + r.result);
       }
     }
-    return false;
   }
 
   private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,


[ratis] 09/18: RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 5a996a42f4901fa9155b76dd639e84213224098a
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 14 18:52:16 2023 +0800

    RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850)
    
    (cherry picked from commit 917671cf35cec5357f078158873d98a13d29157c)
---
 .../org/apache/ratis/util/ResourceSemaphore.java   | 28 +++++-----------
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  4 +--
 .../grpc/server/GrpcServerProtocolClient.java      |  4 +--
 .../ratis/grpc/util/StreamObserverWithTimeout.java | 37 +++++++++++++++++++---
 .../apache/ratis/server/impl/PendingRequests.java  | 17 +++++++---
 .../server/leader/InstallSnapshotRequests.java     |  2 +-
 .../org/apache/ratis/grpc/util/GrpcTestClient.java |  2 +-
 .../grpc/util/TestStreamObserverWithTimeout.java   |  3 --
 .../apache/ratis/util/TestResourceSemaphore.java   | 13 ++++----
 9 files changed, 64 insertions(+), 46 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
index b9e0ff5c7..fb75feeaa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
@@ -88,14 +88,9 @@ public class ResourceSemaphore extends Semaphore {
   /**
    * Track a group of resources with a list of {@link ResourceSemaphore}s.
    */
-
-  public enum ResourceAcquireStatus {
-    SUCCESS,
-    FAILED_IN_ELEMENT_LIMIT,
-    FAILED_IN_BYTE_SIZE_LIMIT
-  }
-
   public static class Group {
+    public static final int SUCCESS = -1;
+
     private final List<ResourceSemaphore> resources;
 
     public Group(int... limits) {
@@ -115,7 +110,8 @@ public class ResourceSemaphore extends Semaphore {
       return resources.get(i);
     }
 
-    public ResourceAcquireStatus tryAcquire(int... permits) {
+    /** @return {@link #SUCCESS} if successfully acquired; otherwise, return the failed index. */
+    public int tryAcquire(int... permits) {
       Preconditions.assertTrue(permits.length == resources.size(),
           () -> "items.length = " + permits.length + " != resources.size() = " + resources.size());
       int i = 0;
@@ -126,24 +122,16 @@ public class ResourceSemaphore extends Semaphore {
         }
       }
 
-
       if (i == permits.length) {
-        return ResourceAcquireStatus.SUCCESS; // successfully acquired all resources
-      }
-
-      ResourceAcquireStatus acquireStatus;
-      if (i == 0) {
-        acquireStatus =  ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT;
-      } else {
-        acquireStatus =  ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT;
+        return SUCCESS; // successfully acquired all resources
       }
 
       // failed at i, releasing all previous resources
-      for(i--; i >= 0; i--) {
-        resources.get(i).release(permits[i]);
+      for(int k = i - 1; k >= 0; k--) {
+        resources.get(k).release(permits[k]);
       }
 
-      return acquireStatus;
+      return i;
     }
 
     public void acquire(int... permits) throws InterruptedException {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 0f975bf62..c688b66cd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -598,7 +598,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     final String requestId = UUID.randomUUID().toString();
     try {
       snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
-          requestTimeoutDuration, responseHandler);
+          requestTimeoutDuration, 8, responseHandler); //FIXME: RATIS-1809
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         if (isRunning()) {
           snapshotRequestObserver.onNext(request);
@@ -649,7 +649,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     }
     try {
       snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot",
-          requestTimeoutDuration, responseHandler);
+          requestTimeoutDuration, 0, responseHandler);
 
       snapshotRequestObserver.onNext(request);
       getFollower().updateLastRpcSendTime(false);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index c3f8730e7..d1bb70728 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -137,8 +137,8 @@ public class GrpcServerProtocolClient implements Closeable {
   }
 
   StreamObserver<InstallSnapshotRequestProto> installSnapshot(
-      String name, TimeDuration timeout, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    return StreamObserverWithTimeout.newInstance(name, timeout,
+      String name, TimeDuration timeout, int limit, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+    return StreamObserverWithTimeout.newInstance(name, timeout, limit,
         i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
   }
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
index 2b875f3ed..8fa30b1cc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -21,6 +21,7 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.ResourceSemaphore;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
@@ -34,13 +35,19 @@ import java.util.function.IntSupplier;
 public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
   public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
 
-  public static <T> StreamObserverWithTimeout<T> newInstance(String name, TimeDuration timeout,
+  public static <T> StreamObserverWithTimeout<T> newInstance(
+      String name, TimeDuration timeout, int outstandingLimit,
       Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
     final AtomicInteger responseCount = new AtomicInteger();
-    final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(
-        r -> responseCount.getAndIncrement());
+    final ResourceSemaphore semaphore = outstandingLimit > 0? new ResourceSemaphore(outstandingLimit): null;
+    final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(r -> {
+      responseCount.getAndIncrement();
+      if (semaphore != null) {
+        semaphore.release();
+      }
+    });
     return new StreamObserverWithTimeout<>(
-        name, timeout, responseCount::get, newStreamObserver.apply(interceptor));
+        name, timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor));
   }
 
   private final String name;
@@ -51,17 +58,37 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
   private final AtomicBoolean isClose = new AtomicBoolean();
   private final AtomicInteger requestCount = new AtomicInteger();
   private final IntSupplier responseCount;
+  private final ResourceSemaphore semaphore;
 
   private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
-      StreamObserver<T> observer) {
+      ResourceSemaphore semaphore, StreamObserver<T> observer) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
     this.timeout = timeout;
     this.responseCount = responseCount;
+    this.semaphore = semaphore;
     this.observer = observer;
   }
 
+  private void acquire(T request) {
+    if (semaphore == null) {
+      return;
+    }
+    boolean acquired = false;
+    for (; !acquired && !isClose.get(); ) {
+      try {
+        acquired = semaphore.tryAcquire(timeout.getDuration(), timeout.getUnit());
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("Interrupted onNext " + request, e);
+      }
+    }
+    if (!acquired) {
+      throw new IllegalStateException("Failed onNext " + request + ": already closed.");
+    }
+  }
+
   @Override
   public void onNext(T request) {
+    acquire(request);
     observer.onNext(request);
     final int id = requestCount.incrementAndGet();
     scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 0812c29fd..2f2ca8f7d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -61,6 +61,12 @@ class PendingRequests {
 
   static class Permit {}
 
+  /**
+   * The return type of {@link RequestLimits#tryAcquire(int)}.
+   * The order of the enum value must match the order in {@link RequestLimits}.
+   */
+  enum Acquired { FAILED_IN_ELEMENT_LIMIT, FAILED_IN_BYTE_SIZE_LIMIT, SUCCESS }
+
   static class RequestLimits extends ResourceSemaphore.Group {
     RequestLimits(int elementLimit, int megabyteLimit) {
       super(elementLimit, megabyteLimit);
@@ -74,8 +80,9 @@ class PendingRequests {
       return get(1).used();
     }
 
-    ResourceSemaphore.ResourceAcquireStatus tryAcquire(int messageSizeMb) {
-      return tryAcquire(1, messageSizeMb);
+    Acquired tryAcquire(int messageSizeMb) {
+      final int acquired = tryAcquire(1, messageSizeMb);
+      return acquired == SUCCESS? PendingRequests.Acquired.SUCCESS: PendingRequests.Acquired.values()[acquired];
     }
 
     void releaseExtraMb(int extraMb) {
@@ -112,13 +119,13 @@ class PendingRequests {
     Permit tryAcquire(Message message) {
       final int messageSize = Message.getSize(message);
       final int messageSizeMb = roundUpMb(messageSize );
-      final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(messageSizeMb);
+      final Acquired acquired = resource.tryAcquire(messageSizeMb);
       LOG.trace("tryAcquire {} MB? {}", messageSizeMb, acquired);
-      if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT) {
+      if (acquired == Acquired.FAILED_IN_ELEMENT_LIMIT) {
         raftServerMetrics.onRequestQueueLimitHit();
         raftServerMetrics.onResourceLimitHit();
         return null;
-      } else if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT) {
+      } else if (acquired == Acquired.FAILED_IN_BYTE_SIZE_LIMIT) {
         raftServerMetrics.onRequestByteSizeLimitHit();
         raftServerMetrics.onResourceLimitHit();
         return null;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
index f52253b24..cdb6603c2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
@@ -113,7 +113,7 @@ class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
   private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() {
     final int numFiles = snapshot.getFiles().size();
     if (fileIndex >= numFiles) {
-      throw new NoSuchElementException();
+      throw new NoSuchElementException("fileIndex = " + fileIndex + " >= numFiles = " + numFiles);
     }
     final FileInfo info = snapshot.getFiles().get(fileIndex);
     try {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
index 0923b27fe..7434b2d79 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -53,7 +53,7 @@ class GrpcTestClient implements Closeable {
   }
 
   static StreamObserverFactory withTimeout(TimeDuration timeout) {
-    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout,
+    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, 2,
         i -> stub.withInterceptors(i).hello(responseHandler));
   }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
index dac58812d..1439f9b9d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
@@ -92,9 +92,6 @@ public class TestStreamObserverWithTimeout extends BaseTest {
 
         final List<CompletableFuture<String>> futures = new ArrayList<>();
         for (String m : messages) {
-          if (type == Type.WithTimeout) {
-            timeout.sleep();
-          }
           futures.add(client.send(m));
         }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
index d085161cf..6fe1aed7e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
@@ -24,17 +24,17 @@ import org.junit.Test;
 
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT;
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT;
-import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.SUCCESS;
+import static org.apache.ratis.util.ResourceSemaphore.Group.SUCCESS;
 
 public class TestResourceSemaphore extends BaseTest {
   @Test(timeout = 5000)
   public void testGroup() throws InterruptedException, TimeoutException {
+    final int FAILED_IN_ELEMENT_LIMIT = 0;
+    final int FAILED_IN_BYTE_SIZE_LIMIT = 1;
     final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
 
     assertUsed(g, 0, 0);
-    assertAcquire(g, ResourceSemaphore.ResourceAcquireStatus.SUCCESS, 1, 1);
+    assertAcquire(g, SUCCESS, 1, 1);
     assertUsed(g, 1, 1);
     assertAcquire(g, FAILED_IN_BYTE_SIZE_LIMIT, 1, 1);
     assertUsed(g, 1, 1);
@@ -86,9 +86,8 @@ public class TestResourceSemaphore extends BaseTest {
     }
   }
 
-  static void assertAcquire(ResourceSemaphore.Group g, ResourceSemaphore.ResourceAcquireStatus expected,
-      int... permits) {
-    final ResourceSemaphore.ResourceAcquireStatus computed = g.tryAcquire(permits);
+  static void assertAcquire(ResourceSemaphore.Group g, int expected, int... permits) {
+    final int computed = g.tryAcquire(permits);
     Assert.assertEquals(expected, computed);
   }
 


[ratis] 05/18: RATIS-1815. GitHub: Enable autolink to Jira (#853)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 70fe68f0d97130206f7e055c05deb0526e65a942
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Mon Mar 13 15:33:35 2023 +0800

    RATIS-1815. GitHub: Enable autolink to Jira (#853)
    
    (cherry picked from commit edbfbb3cd984e0735bc1c946c6d8c9b1a4b750ef)
---
 .asf.yaml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/.asf.yaml b/.asf.yaml
index 7caa1de5e..16e358f62 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -24,6 +24,7 @@ github:
     squash:  true
     merge:   false
     rebase:  false
+  autolink_jira: RATIS
 
 notifications:
   commits:      commits@ratis.apache.org


[ratis] 11/18: RATIS-1816. appendEntryTimer is not accurate due to the return of writeFuture (#855)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit e20a2e83bed8c5685a63ea60e9b6d34d8d8007d7
Author: Xiangpeng Hu <65...@users.noreply.github.com>
AuthorDate: Fri Mar 17 08:17:07 2023 +0800

    RATIS-1816. appendEntryTimer is not accurate due to the return of writeFuture (#855)
    
    (cherry picked from commit 9a3f7e333e87b0107d9a1eb860a41a8d882d67a1)
---
 .../org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java     | 4 ++--
 .../org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java  | 5 ++---
 2 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
index 810fcb003..b84726041 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
@@ -142,8 +142,8 @@ public class SegmentedRaftLogMetrics extends RaftLogMetricsBase {
     registry.counter(RAFT_LOG_APPEND_ENTRY_COUNT).inc();
   }
 
-  public Timer getRaftLogAppendEntryTimer() {
-    return getTimer(RAFT_LOG_APPEND_ENTRY_LATENCY);
+  public Timer.Context startAppendEntryTimer() {
+    return getTimer(RAFT_LOG_APPEND_ENTRY_LATENCY).time();
   }
 
   public Timer getRaftLogQueueTimer() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 0cb613ac0..5913f4adf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -373,12 +373,12 @@ public class SegmentedRaftLog extends RaftLogBase {
 
   @Override
   protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
-    final Timer.Context context = getRaftLogMetrics().getRaftLogAppendEntryTimer().time();
     checkLogState();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry));
     }
     try(AutoCloseableLock writeLock = writeLock()) {
+      final Timer.Context appendEntryTimerContext = getRaftLogMetrics().startAppendEntryTimer();
       validateLogEntry(entry);
       final LogSegment currentOpenSegment = cache.getOpenSegment();
       if (currentOpenSegment == null) {
@@ -413,12 +413,11 @@ public class SegmentedRaftLog extends RaftLogBase {
       } else {
         cache.appendEntry(entry, LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
       }
+      writeFuture.whenComplete((clientReply, exception) -> appendEntryTimerContext.stop());
       return writeFuture;
     } catch (Exception e) {
       LOG.error("{}: Failed to append {}", getName(), LogProtoUtils.toLogEntryString(entry), e);
       throw e;
-    } finally {
-      context.stop();
     }
   }
 


[ratis] 17/18: RATIS-1662. Intermittent failure in testEnforceLeader (#860)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 14ae143fba1e05482c590dc27d24c45389fd8cc9
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Wed Mar 22 08:21:55 2023 +0800

    RATIS-1662. Intermittent failure in testEnforceLeader (#860)
    
    (cherry picked from commit 6692234535bc45ff60c8e54c7164b0d18093e5c6)
---
 .../src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java | 1 +
 .../apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 76761249b..0ea08da3e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -316,6 +316,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
       final RaftServer.Division currLeader = cluster.getLeader();
       LOG.info("try enforcing leader to " + newLeader + " but " +
           (currLeader == null ? "no leader for round " + i : "new leader is " + currLeader.getId()));
+      TimeDuration.ONE_SECOND.sleep();
     }
     LOG.info(cluster.printServers());
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index 437da929f..d0e7e9f5c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -108,7 +108,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
         .map(s -> serverRequestReply.getQueue(s.getId().toString()))
         .forEach(q -> q.delayTakeRequestTo.set(delayMs));
 
-    final long sleepMs = 3 * getTimeoutMax().toLong(TimeUnit.MILLISECONDS) / 2;
+    final long sleepMs = 3 * getTimeoutMax().toLong(TimeUnit.MILLISECONDS);
     Thread.sleep(sleepMs);
   }
 


[ratis] 06/18: RATIS-1813. Allow ratis-shell to run in JDK 8+ (#851)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 5d9e4b5383cf8e2395fbed4013a750ad92ff15d4
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Mon Mar 13 17:47:14 2023 +0800

    RATIS-1813. Allow ratis-shell to run in JDK 8+ (#851)
    
    (cherry picked from commit bde35c6662489a6198fa7960bed767f152e4d180)
---
 ratis-shell/src/main/libexec/ratis-shell-config.sh | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/ratis-shell/src/main/libexec/ratis-shell-config.sh b/ratis-shell/src/main/libexec/ratis-shell-config.sh
index 27d2ee0a1..71fda692b 100644
--- a/ratis-shell/src/main/libexec/ratis-shell-config.sh
+++ b/ratis-shell/src/main/libexec/ratis-shell-config.sh
@@ -50,12 +50,12 @@ if [[ -z "${JAVA}" ]]; then
   fi
 fi
 
-# Check Java version == 1.8 or == 11
+# Check Java version == 1.8 or >= 8
 JAVA_VERSION=$(${JAVA} -version 2>&1 | awk -F '"' '/version/ {print $2}')
-JAVA_MAJORMINOR=$(echo "${JAVA_VERSION}" | awk -F. '{printf("%03d%03d",$1,$2);}')
-JAVA_MAJOR=$(echo "${JAVA_VERSION}" | awk -F. '{printf("%03d",$1);}')
-if [[ ${JAVA_MAJORMINOR} != 001008 && ${JAVA_MAJOR} != 011 ]]; then
-  echo "Error: ratis-shell requires Java 8 or Java 11, currently Java $JAVA_VERSION found."
+JAVA_MAJORMINOR=$(echo "${JAVA_VERSION}" | awk -F. '{printf "%d.%d",$1,$2}')
+JAVA_MAJOR=$(echo "${JAVA_VERSION}" | awk -F. '{print $1}')
+if [[ ${JAVA_MAJORMINOR} != 1.8 && ${JAVA_MAJOR} -lt 8 ]]; then
+  echo "Error: ratis-shell requires Java 8+, currently Java $JAVA_VERSION found."
   exit 1
 fi
 


[ratis] 01/18: RATIS-1807. Support timeout in gRPC. (#842)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 08dc58fcba8b66480066f0b9f0cc9860415a9346
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Mar 6 20:32:54 2023 -0800

    RATIS-1807. Support timeout in gRPC. (#842)
    
    (cherry picked from commit 69263c884573b87eada10bdd269c9e9d31fb2e94)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |   7 +-
 .../grpc/server/GrpcServerProtocolClient.java      |   7 +-
 .../grpc/util/ResponseNotifyClientInterceptor.java |  72 ++++++++++++
 .../ratis/grpc/util/StreamObserverWithTimeout.java |  95 ++++++++++++++++
 ratis-proto/src/main/proto/Test.proto              |  37 +++++++
 .../org/apache/ratis/grpc/util/GrpcTestClient.java | 123 +++++++++++++++++++++
 .../org/apache/ratis/grpc/util/GrpcTestServer.java | 108 ++++++++++++++++++
 .../grpc/util/TestStreamObserverWithTimeout.java   | 122 ++++++++++++++++++++
 8 files changed, 566 insertions(+), 5 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index c91880097..0f975bf62 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -597,7 +597,8 @@ public class GrpcLogAppender extends LogAppenderBase {
     StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
     final String requestId = UUID.randomUUID().toString();
     try {
-      snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+      snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
+          requestTimeoutDuration, responseHandler);
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         if (isRunning()) {
           snapshotRequestObserver.onNext(request);
@@ -647,7 +648,9 @@ public class GrpcLogAppender extends LogAppenderBase {
       LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request));
     }
     try {
-      snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+      snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot",
+          requestTimeoutDuration, responseHandler);
+
       snapshotRequestObserver.onNext(request);
       getFollower().updateLastRpcSendTime(false);
       responseHandler.addPending(request);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 4c28c1df4..c3f8730e7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc.server;
 
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -136,9 +137,9 @@ public class GrpcServerProtocolClient implements Closeable {
   }
 
   StreamObserver<InstallSnapshotRequestProto> installSnapshot(
-      StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-        .installSnapshot(responseHandler);
+      String name, TimeDuration timeout, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+    return StreamObserverWithTimeout.newInstance(name, timeout,
+        i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
   }
 
   // short-circuit the backoff timer and make them reconnect immediately.
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java
new file mode 100644
index 000000000..77577b06d
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.thirdparty.io.grpc.CallOptions;
+import org.apache.ratis.thirdparty.io.grpc.Channel;
+import org.apache.ratis.thirdparty.io.grpc.ClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Consumer;
+
+/**
+ * Invoke the given notifier when receiving a response.
+ */
+public class ResponseNotifyClientInterceptor implements ClientInterceptor {
+  public static final Logger LOG = LoggerFactory.getLogger(ResponseNotifyClientInterceptor.class);
+
+  private final Consumer<Object> notifier;
+
+  public ResponseNotifyClientInterceptor(Consumer<Object> notifier) {
+    this.notifier = notifier;
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+      MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+    LOG.debug("callOptions {}", callOptions);
+    return new Call<>(next.newCall(method, callOptions));
+  }
+
+  private final class Call<ReqT, RespT>
+      extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
+
+    private Call(ClientCall<ReqT, RespT> delegate) {
+      super(delegate);
+    }
+
+    @Override
+    public void start(Listener<RespT> responseListener, Metadata headers) {
+      LOG.debug("start {}", headers);
+      super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
+        @Override
+        public void onMessage(RespT message) {
+          LOG.debug("onMessage {}", message);
+          notifier.accept(message);
+          super.onMessage(message);
+        }
+      }, headers);
+    }
+  }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
new file mode 100644
index 000000000..2b875f3ed
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+
+public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
+  public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
+
+  public static <T> StreamObserverWithTimeout<T> newInstance(String name, TimeDuration timeout,
+      Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
+    final AtomicInteger responseCount = new AtomicInteger();
+    final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(
+        r -> responseCount.getAndIncrement());
+    return new StreamObserverWithTimeout<>(
+        name, timeout, responseCount::get, newStreamObserver.apply(interceptor));
+  }
+
+  private final String name;
+  private final TimeDuration timeout;
+  private final StreamObserver<T> observer;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
+  private final AtomicBoolean isClose = new AtomicBoolean();
+  private final AtomicInteger requestCount = new AtomicInteger();
+  private final IntSupplier responseCount;
+
+  private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
+      StreamObserver<T> observer) {
+    this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+    this.timeout = timeout;
+    this.responseCount = responseCount;
+    this.observer = observer;
+  }
+
+  @Override
+  public void onNext(T request) {
+    observer.onNext(request);
+    final int id = requestCount.incrementAndGet();
+    scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
+        LOG, () -> name + ": Timeout check failed for request: " + request);
+  }
+
+  private void handleTimeout(int id, T request) {
+    if (id > responseCount.getAsInt()) {
+      onError(new TimeoutIOException(name + ": Timed out " + timeout + " for sending request " + request));
+    }
+  }
+
+  @Override
+  public void onError(Throwable throwable) {
+    if (isClose.compareAndSet(false, true)) {
+      observer.onError(throwable);
+    }
+  }
+
+  @Override
+  public void onCompleted() {
+    if (isClose.compareAndSet(false, true)) {
+      observer.onCompleted();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
diff --git a/ratis-proto/src/main/proto/Test.proto b/ratis-proto/src/main/proto/Test.proto
new file mode 100644
index 000000000..8d5769ff3
--- /dev/null
+++ b/ratis-proto/src/main/proto/Test.proto
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.ratis.test.proto";
+option java_outer_classname = "TestProto";
+
+package org.apache.ratis.test;
+
+service Greeter {
+    rpc Hello (stream HelloRequest)
+        returns (stream HelloReply) {}
+}
+
+message HelloRequest {
+    string name = 1;
+}
+
+message HelloReply {
+    string message = 1;
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
new file mode 100644
index 000000000..0923b27fe
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.test.proto.GreeterGrpc;
+import org.apache.ratis.test.proto.GreeterGrpc.GreeterStub;
+import org.apache.ratis.test.proto.HelloReply;
+import org.apache.ratis.test.proto.HelloRequest;
+import org.apache.ratis.thirdparty.io.grpc.Deadline;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+/** gRPC client for testing */
+class GrpcTestClient implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(GrpcTestClient.class);
+
+  @FunctionalInterface
+  interface StreamObserverFactory
+      extends BiFunction<GreeterStub, StreamObserver<HelloReply>, StreamObserver<HelloRequest>> {
+  }
+
+  static StreamObserverFactory withDeadline(TimeDuration timeout) {
+    final Deadline d = Deadline.after(timeout.getDuration(), timeout.getUnit());
+    return (stub, responseHandler) -> stub.withDeadline(d).hello(responseHandler);
+  }
+
+  static StreamObserverFactory withTimeout(TimeDuration timeout) {
+    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout,
+        i -> stub.withInterceptors(i).hello(responseHandler));
+  }
+
+  private final ManagedChannel channel;
+  private final StreamObserver<HelloRequest> requestHandler;
+  private final Queue<CompletableFuture<String>> replies = new ConcurrentLinkedQueue<>();
+
+  GrpcTestClient(String host, int port, StreamObserverFactory factory) {
+    this.channel = ManagedChannelBuilder.forAddress(host, port)
+        .usePlaintext()
+        .build();
+
+    final GreeterStub asyncStub = GreeterGrpc.newStub(channel);
+    final StreamObserver<HelloReply> responseHandler = new StreamObserver<HelloReply>() {
+      @Override
+      public void onNext(HelloReply helloReply) {
+        replies.poll().complete(helloReply.getMessage());
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        LOG.info("onError", throwable);
+        completeExceptionally(throwable);
+      }
+
+      @Override
+      public void onCompleted() {
+        LOG.info("onCompleted");
+        completeExceptionally(new IllegalStateException("onCompleted"));
+      }
+
+      void completeExceptionally(Throwable throwable) {
+        replies.forEach(f -> f.completeExceptionally(throwable));
+        replies.clear();
+      }
+    };
+
+    this.requestHandler = factory.apply(asyncStub, responseHandler);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      /* After the request handler is cancelled, no more life-cycle hooks are allowed,
+       * see {@link org.apache.ratis.thirdparty.io.grpc.ClientCall.Listener#cancel(String, Throwable)} */
+      // requestHandler.onCompleted();
+      channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw IOUtils.toInterruptedIOException("Failed to close", e);
+    }
+  }
+
+  CompletableFuture<String> send(String name) {
+    LOG.info("send {}", name);
+    final HelloRequest request = HelloRequest.newBuilder().setName(name).build();
+    final CompletableFuture<String> f = new CompletableFuture<>();
+    try {
+      requestHandler.onNext(request);
+      replies.offer(f);
+    } catch (IllegalStateException e) {
+      // already closed
+      f.completeExceptionally(e);
+    }
+    return f;
+  }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java
new file mode 100644
index 000000000..ec9d63b13
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.test.proto.GreeterGrpc;
+import org.apache.ratis.test.proto.HelloReply;
+import org.apache.ratis.test.proto.HelloRequest;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/** gRPC server for testing */
+class GrpcTestServer implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(GrpcTestServer.class);
+
+  private final Server server;
+
+  GrpcTestServer(int port, int slow, TimeDuration timeout) {
+    this.server = ServerBuilder.forPort(port)
+        .addService(new GreeterImpl(slow, timeout))
+        .build();
+  }
+
+  int start() throws IOException {
+    server.start();
+    return server.getPort();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw IOUtils.toInterruptedIOException("Failed to close", e);
+    }
+  }
+
+  static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
+    static String toReplySuffix(String request) {
+      return ") Hello " + request;
+    }
+
+    private final int slow;
+    private final TimeDuration shortSleepTime;
+    private final TimeDuration longSleepTime;
+    private int count = 0;
+
+    GreeterImpl(int slow, TimeDuration timeout) {
+      this.slow = slow;
+      this.shortSleepTime = timeout.multiply(0.25);
+      this.longSleepTime = timeout.multiply(2);
+    }
+
+    @Override
+    public StreamObserver<HelloRequest> hello(StreamObserver<HelloReply> responseObserver) {
+      return new StreamObserver<HelloRequest>() {
+        @Override
+        public void onNext(HelloRequest helloRequest) {
+          final String reply = count + toReplySuffix(helloRequest.getName());
+          final TimeDuration sleepTime = count < slow ? shortSleepTime : longSleepTime;
+          LOG.info("count = {}, slow = {}, sleep {}", reply, slow, sleepTime);
+          try {
+            sleepTime.sleep();
+          } catch (InterruptedException e) {
+            responseObserver.onError(e);
+            return;
+          }
+          responseObserver.onNext(HelloReply.newBuilder().setMessage(reply).build());
+          count++;
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+          LOG.error("onError", throwable);
+        }
+
+        @Override
+        public void onCompleted() {
+          responseObserver.onCompleted();
+        }
+      };
+    }
+  }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
new file mode 100644
index 000000000..dac58812d
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.grpc.util.GrpcTestClient.StreamObserverFactory;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
+import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.Slf4jUtils;
+import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+public class TestStreamObserverWithTimeout extends BaseTest {
+  {
+    Slf4jUtils.setLogLevel(ResponseNotifyClientInterceptor.LOG, Level.TRACE);
+  }
+
+  enum Type {
+    WithDeadline(GrpcTestClient::withDeadline),
+    WithTimeout(GrpcTestClient::withTimeout);
+
+    private final Function<TimeDuration, StreamObserverFactory> factory;
+
+    Type(Function<TimeDuration, StreamObserverFactory> function) {
+      this.factory = function;
+    }
+
+    StreamObserverFactory createFunction(TimeDuration timeout) {
+      return factory.apply(timeout);
+    }
+  }
+
+  @Test
+  public void testWithDeadline() throws Exception {
+    //the total sleep time is within the deadline
+    runTestTimeout(2, Type.WithDeadline);
+  }
+
+  @Test
+  public void testWithDeadlineFailure() {
+    //Expected to have DEADLINE_EXCEEDED
+    testFailureCase("total sleep time is longer than the deadline",
+        () -> runTestTimeout(5, Type.WithDeadline),
+        ExecutionException.class, StatusRuntimeException.class);
+  }
+
+  @Test
+  public void testWithTimeout() throws Exception {
+    //Each sleep time is within the timeout,
+    //Note that the total sleep time is longer than the timeout, but it does not matter.
+    runTestTimeout(5, Type.WithTimeout);
+  }
+
+  void runTestTimeout(int slow, Type type) throws Exception {
+    LOG.info("slow = {}, {}", slow, type);
+    final TimeDuration timeout = ONE_SECOND.multiply(0.5);
+    final StreamObserverFactory function = type.createFunction(timeout);
+    final InetSocketAddress address = NetUtils.createLocalServerAddress();
+
+    final List<String> messages = new ArrayList<>();
+    for (int i = 0; i < 2 * slow; i++) {
+      messages.add("m" + i);
+    }
+    try (GrpcTestServer server = new GrpcTestServer(address.getPort(), slow, timeout)) {
+      final int port = server.start();
+      try (GrpcTestClient client = new GrpcTestClient(address.getHostName(), port, function)) {
+
+        final List<CompletableFuture<String>> futures = new ArrayList<>();
+        for (String m : messages) {
+          if (type == Type.WithTimeout) {
+            timeout.sleep();
+          }
+          futures.add(client.send(m));
+        }
+
+        int i = 0;
+        for (; i < slow; i++) {
+          final String expected = i + GrpcTestServer.GreeterImpl.toReplySuffix(messages.get(i));
+          final String reply = futures.get(i).get();
+          Assert.assertEquals("expected = " + expected + " != reply = " + reply, expected, reply);
+          LOG.info("{}) passed", i);
+        }
+
+        for (; i < messages.size(); i++) {
+          final CompletableFuture<String> f = futures.get(i);
+          try {
+            final String reply = f.get();
+            Assert.fail(i + ") reply = " + reply + ", "
+                + StringUtils.completableFuture2String(f, false));
+          } catch (ExecutionException e) {
+             LOG.info("GOOD! {}) {}, {}", i, StringUtils.completableFuture2String(f, true), e);
+          }
+        }
+      }
+    }
+  }
+}


[ratis] 03/18: RATIS-1810. Intermittent failure in TestRaftServerWithGrpc#testRaftClientMetrics (#847)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 1c500bbaf9e3638f1b41527150072425fc9d9370
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Fri Mar 10 00:40:10 2023 +0800

    RATIS-1810. Intermittent failure in TestRaftServerWithGrpc#testRaftClientMetrics (#847)
    
    (cherry picked from commit ed7e547b8ba3a8435d132fe097d8db9f434f5cf0)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   |  4 +---
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  | 22 +++++++++++++++++-----
 2 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 494272e1a..6f761d1f3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -890,9 +890,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     final RaftClientRequest.Type type = request.getType();
     replyFuture.whenComplete((clientReply, exception) -> {
-      if (clientReply.isSuccess()) {
-        timer.map(Timer::time).ifPresent(Timer.Context::stop);
-      }
+      timer.map(Timer::time).ifPresent(Timer.Context::stop);
       if (exception != null || clientReply.getException() != null) {
         raftServerMetrics.incFailedRequestCount(type);
       }
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 88e306e3c..863fc82bb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -27,6 +27,8 @@ import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_BYTE
 import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RESOURCE_LIMIT_HIT_COUNTER;
 
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.security.SecurityTestUtils;
 import org.apache.ratis.BaseTest;
@@ -318,24 +320,34 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
     try (final RaftClient client = cluster.createClient()) {
       final CompletableFuture<RaftClientReply> f1 = client.async().send(new SimpleMessage("testing"));
       Assert.assertTrue(f1.get().isSuccess());
-      Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_WRITE_REQUEST).getCount() > 0);
+      final Timer write = raftServerMetrics.getTimer(RAFT_CLIENT_WRITE_REQUEST);
+      JavaUtils.attempt(() -> Assert.assertTrue(write.getCount() > 0),
+          3, TimeDuration.ONE_SECOND, "writeTimer metrics", LOG);
 
       final CompletableFuture<RaftClientReply> f2 = client.async().sendReadOnly(new SimpleMessage("testing"));
       Assert.assertTrue(f2.get().isSuccess());
-      Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_READ_REQUEST).getCount() > 0);
+      final Timer read = raftServerMetrics.getTimer(RAFT_CLIENT_READ_REQUEST);
+      JavaUtils.attempt(() -> Assert.assertTrue(read.getCount() > 0),
+          3, TimeDuration.ONE_SECOND, "readTimer metrics", LOG);
 
       final CompletableFuture<RaftClientReply> f3 = client.async().sendStaleRead(new SimpleMessage("testing"),
           0, leader.getId());
       Assert.assertTrue(f3.get().isSuccess());
-      Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_STALE_READ_REQUEST).getCount() > 0);
+      final Timer staleRead = raftServerMetrics.getTimer(RAFT_CLIENT_STALE_READ_REQUEST);
+      JavaUtils.attempt(() -> Assert.assertTrue(staleRead.getCount() > 0),
+          3, TimeDuration.ONE_SECOND, "staleReadTimer metrics", LOG);
 
       final CompletableFuture<RaftClientReply> f4 = client.async().watch(0, RaftProtos.ReplicationLevel.ALL);
       Assert.assertTrue(f4.get().isSuccess());
-      Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, "-ALL")).getCount() > 0);
+      final Timer watchAll = raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, "-ALL"));
+      JavaUtils.attempt(() -> Assert.assertTrue(watchAll.getCount() > 0),
+          3, TimeDuration.ONE_SECOND, "watchAllTimer metrics", LOG);
 
       final CompletableFuture<RaftClientReply> f5 = client.async().watch(0, RaftProtos.ReplicationLevel.MAJORITY);
       Assert.assertTrue(f5.get().isSuccess());
-      Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, "")).getCount() > 0);
+      final Timer watch = raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, ""));
+      JavaUtils.attempt(() -> Assert.assertTrue(watch.getCount() > 0),
+          3, TimeDuration.ONE_SECOND, "watchTimer metrics", LOG);
     }
   }
 


[ratis] 16/18: RATIS-1820. Update apache parent pom version and other versions. (#861)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit c19db251d082648b143db4cbfbc3ff6b3fac4ed8
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Mar 22 08:17:34 2023 +0800

    RATIS-1820. Update apache parent pom version and other versions. (#861)
    
    (cherry picked from commit 60962832c4d409d50dd1e28f653658d20c6c232c)
---
 pom.xml                                            | 47 +++++++++-------------
 ratis-assembly/pom.xml                             |  1 +
 .../org/apache/ratis/client/DataStreamClient.java  |  2 +-
 .../java/org/apache/ratis/client/RaftClient.java   |  2 +-
 .../org/apache/ratis/protocol/RoutingTable.java    |  2 +-
 .../java/org/apache/ratis/retry/RetryPolicies.java |  6 +--
 .../main/java/org/apache/ratis/util/ExitUtils.java |  2 +-
 ratis-examples/pom.xml                             |  5 +++
 .../examples/arithmetic/expression/Expression.java |  6 +--
 ratis-test/pom.xml                                 |  4 ++
 10 files changed, 39 insertions(+), 38 deletions(-)

diff --git a/pom.xml b/pom.xml
index a63135097..2d447605a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
   <parent>
     <groupId>org.apache</groupId>
     <artifactId>apache</artifactId>
-    <version>23</version>
+    <version>25</version>
     <relativePath /> <!-- resolve from repository -->
   </parent>
 
@@ -160,6 +160,7 @@
 
 
   <properties>
+    <project.build.outputTimestamp>2023-01-01T00:00:00Z</project.build.outputTimestamp>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
      <!-- where to find the generated LICENSE files -->
@@ -169,26 +170,26 @@
     <license.bundles.dependencies>false</license.bundles.dependencies>
 
     <!-- Maven plugin versions -->
-    <maven-bundle-plugin.version>2.5.3</maven-bundle-plugin.version>
-    <maven-clover2-plugin.version>3.3.0</maven-clover2-plugin.version>
-    <maven-install-plugin.version>3.0.0-M1</maven-install-plugin.version>
-    <maven-pdf-plugin.version>1.2</maven-pdf-plugin.version>
-    <maven-stylus-skin.version>1.5</maven-stylus-skin.version>
-    <maven-surefire-plugin.version>3.0.0-M1</maven-surefire-plugin.version>
-    <maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
-    <maven-assembly-plugin.version>3.4.1</maven-assembly-plugin.version>
+    <maven-bundle-plugin.version>5.1.8</maven-bundle-plugin.version>
+    <maven-checkstyle-plugin.version>3.2.1</maven-checkstyle-plugin.version>
+    <maven-clover2-plugin.version>4.0.6</maven-clover2-plugin.version>
+    <maven-pdf-plugin.version>1.6.1</maven-pdf-plugin.version>
+    <maven-surefire-plugin.version>3.0.0</maven-surefire-plugin.version>
+    <wagon-ssh.version>3.5.3</wagon-ssh.version>
 
-    <checkstyle.version>9.3</checkstyle.version>
 
-    <protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
-    <license-maven-plugin.version>1.19</license-maven-plugin.version>
+    <!-- org.codehaus.mojo -->
+    <build-helper-maven-plugin.version>3.3.0</build-helper-maven-plugin.version>
+    <exec-maven-plugin.version>3.1.0</exec-maven-plugin.version>
+    <extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version>
+    <license-maven-plugin.version>4.1</license-maven-plugin.version>
+
+    <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
     <copy-rename-maven-plugin.version>1.0</copy-rename-maven-plugin.version>
 
-    <build-helper-maven-plugin.version>1.9</build-helper-maven-plugin.version>
-    <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
+    <checkstyle.version>10.9.2</checkstyle.version>
     <spotbugs.version>4.2.1</spotbugs.version>
     <spotbugs-plugin.version>4.2.0</spotbugs-plugin.version>
-    <wagon-ssh.version>1.0</wagon-ssh.version>
 
     <distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
     <distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
@@ -220,8 +221,7 @@
     <!--metrics-->
     <dropwizard.version>3.2.5</dropwizard.version>
 
-    <bouncycastle.version>1.70</bouncycastle.version>
-    <slf4j.version>1.7.36</slf4j.version>
+    <slf4j.version>2.0.7</slf4j.version>
   </properties>
 
   <dependencyManagement>
@@ -406,12 +406,6 @@
         <version>${slf4j.version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>com.beust</groupId>
-        <artifactId>jcommander</artifactId>
-        <version>1.72</version>
-      </dependency>
-
       <dependency>
         <groupId>io.dropwizard.metrics</groupId>
         <artifactId>metrics-core</artifactId>
@@ -492,12 +486,12 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-install-plugin</artifactId>
-          <version>${maven-install-plugin.version}</version>
         </plugin>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-javadoc-plugin</artifactId>
           <configuration>
+            <source>8</source>
             <additionalJOptions>
               <additionalJOption>-Xmaxwarns</additionalJOption>
               <additionalJOption>10000</additionalJOption>
@@ -549,7 +543,6 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-assembly-plugin</artifactId>
-          <version>${maven-assembly-plugin.version}</version>
           <configuration>
             <!--Defer to the ratis-assembly sub-module.  It does all assembly-->
             <skipAssembly>true</skipAssembly>
@@ -891,7 +884,7 @@
                   <goal>jar</goal>
                 </goals>
                 <configuration>
-                  <destDir>${project.build.directory}</destDir>
+                  <outputDirectory>${project.build.directory}</outputDirectory>
                 </configuration>
               </execution>
             </executions>
@@ -936,7 +929,7 @@
               <dependency>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>extra-enforcer-rules</artifactId>
-                <version>1.0-beta-6</version>
+                <version>${extra-enforcer-rules.version}</version>
               </dependency>
             </dependencies>
             <executions>
diff --git a/ratis-assembly/pom.xml b/ratis-assembly/pom.xml
index 19eee578b..3088c88c8 100644
--- a/ratis-assembly/pom.xml
+++ b/ratis-assembly/pom.xml
@@ -117,6 +117,7 @@
       </plugin>
 
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-assembly-plugin</artifactId>
         <configuration>
           <!--Else will use ratis-assembly as final name.-->
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index b00f18219..9af2bc286 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -46,7 +46,7 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable {
   }
 
   /** To build {@link DataStreamClient} objects */
-  class Builder {
+  final class Builder {
     private RaftPeer dataStreamServer;
     private DataStreamClientRpc dataStreamClientRpc;
     private RaftProperties properties;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index d4c2f16e9..c273ea61c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -86,7 +86,7 @@ public interface RaftClient extends Closeable {
   }
 
   /** To build {@link RaftClient} objects. */
-  class Builder {
+  final class Builder {
     private ClientId clientId;
     private RaftClientRpc clientRpc;
     private RaftGroup group;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
index d6fc1a512..0157fe49a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RoutingTable.java
@@ -52,7 +52,7 @@ public interface RoutingTable {
   }
 
   /** To build a {@link RoutingTable}. */
-  class Builder {
+  final class Builder {
     private final AtomicReference<Map<RaftPeerId, Set<RaftPeerId>>> ref = new AtomicReference<>(new HashMap<>());
 
     private Builder() {}
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index 7c22d767c..803070557 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -52,7 +52,7 @@ public interface RetryPolicies {
     private static final NoRetry NO_RETRY = new NoRetry();
   }
 
-  class RetryForeverNoSleep implements RetryPolicy {
+  final class RetryForeverNoSleep implements RetryPolicy {
     private RetryForeverNoSleep() {}
 
     @Override
@@ -66,7 +66,7 @@ public interface RetryPolicies {
     }
   }
 
-  class NoRetry implements RetryPolicy {
+  final class NoRetry implements RetryPolicy {
     private NoRetry() {}
 
     @Override
@@ -101,7 +101,7 @@ public interface RetryPolicies {
   }
 
   /** For any requests, keep retrying a limited number of attempts with a fixed sleep time between attempts. */
-  class RetryLimited extends RetryForeverWithSleep  {
+  final class RetryLimited extends RetryForeverWithSleep  {
     private final int maxAttempts;
     private final Supplier<String> myString;
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
index 2f66db6aa..2ff3464f5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
@@ -40,7 +40,7 @@ public interface ExitUtils {
     }
   }
 
-  class States {
+  final class States {
     private static final Logger LOG = LoggerFactory.getLogger(ExitUtils.class);
     private static final States INSTANCE = new States();
 
diff --git a/ratis-examples/pom.xml b/ratis-examples/pom.xml
index 33f4b4ae8..5499b0f8c 100644
--- a/ratis-examples/pom.xml
+++ b/ratis-examples/pom.xml
@@ -23,6 +23,10 @@
   <artifactId>ratis-examples</artifactId>
   <name>Apache Ratis Examples</name>
 
+  <properties>
+    <jcommander.version>1.82</jcommander.version>
+  </properties>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.ratis</groupId>
@@ -95,6 +99,7 @@
     <dependency>
       <groupId>com.beust</groupId>
       <artifactId>jcommander</artifactId>
+      <version>${jcommander.version}</version>
     </dependency>
 
     <dependency>
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
index af7b62526..2511b1b44 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
@@ -44,10 +44,8 @@ public interface Expression extends Evaluable {
 
   int length();
 
-  class Utils {
-    private Utils() {
-
-    }
+  final class Utils {
+    private Utils() {}
 
     public static Message toMessage(final Expression e) {
       final byte[] buf = new byte[e.length()];
diff --git a/ratis-test/pom.xml b/ratis-test/pom.xml
index 4b5151c6c..999877e0e 100644
--- a/ratis-test/pom.xml
+++ b/ratis-test/pom.xml
@@ -23,6 +23,10 @@
   <artifactId>ratis-test</artifactId>
   <name>Apache Ratis Test</name>
 
+  <properties>
+    <bouncycastle.version>1.70</bouncycastle.version>
+  </properties>
+
   <dependencies>
     <dependency>
       <artifactId>ratis-common</artifactId>


[ratis] 10/18: RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 80617d4bb801a166b364620f429d2b53dd9e207a
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Mar 15 16:47:34 2023 +0800

    RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848)
    
    (cherry picked from commit cf62b4298ad778fbe1dff8478899883aa91a2234)
---
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 38 +++++++++++++++++++++-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 10 ++++--
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index febf54172..48b57fa63 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -19,13 +19,27 @@ package org.apache.ratis.grpc;
 
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.function.Consumer;
 
-import static org.apache.ratis.conf.ConfUtils.*;
+import static org.apache.ratis.conf.ConfUtils.get;
+import static org.apache.ratis.conf.ConfUtils.getBoolean;
+import static org.apache.ratis.conf.ConfUtils.getInt;
+import static org.apache.ratis.conf.ConfUtils.getSizeInBytes;
+import static org.apache.ratis.conf.ConfUtils.getTimeDuration;
+import static org.apache.ratis.conf.ConfUtils.printAll;
+import static org.apache.ratis.conf.ConfUtils.requireMax;
+import static org.apache.ratis.conf.ConfUtils.requireMin;
+import static org.apache.ratis.conf.ConfUtils.set;
+import static org.apache.ratis.conf.ConfUtils.setBoolean;
+import static org.apache.ratis.conf.ConfUtils.setInt;
+import static org.apache.ratis.conf.ConfUtils.setSizeInBytes;
+import static org.apache.ratis.conf.ConfUtils.setTimeDuration;
 
 public interface GrpcConfigKeys {
   Logger LOG = LoggerFactory.getLogger(GrpcConfigKeys.class);
@@ -234,6 +248,28 @@ public interface GrpcConfigKeys {
       setInt(properties::setInt, LEADER_OUTSTANDING_APPENDS_MAX_KEY, maxAppend);
     }
 
+    String INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY = PREFIX + ".install_snapshot.request.element-limit";
+    int INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_DEFAULT = 8;
+    static int installSnapshotRequestElementLimit(RaftProperties properties) {
+      return getInt(properties::getInt, INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY,
+          INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(0));
+    }
+    static void setInstallSnapshotRequestElementLimit(RaftProperties properties, int elementLimit) {
+      setInt(properties::setInt, INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY, elementLimit);
+    }
+
+    String INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY = PREFIX + ".install_snapshot.request.timeout";
+    TimeDuration INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT = RaftServerConfigKeys.Rpc.REQUEST_TIMEOUT_DEFAULT;
+    static TimeDuration installSnapshotRequestTimeout(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT.getUnit()),
+          INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY, INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT, getDefaultLog());
+    }
+    static void setInstallSnapshotRequestTimeout(RaftProperties properties,
+                                                 TimeDuration installSnapshotRequestTimeout) {
+      setTimeDuration(properties::setTimeDuration,
+          INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY, installSnapshotRequestTimeout);
+    }
+
     String HEARTBEAT_CHANNEL_KEY = PREFIX + ".heartbeat.channel";
     boolean HEARTBEAT_CHANNEL_DEFAULT = true;
     static boolean heartbeatChannel(RaftProperties properties) {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index c688b66cd..0cf0e663f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -70,6 +70,8 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final boolean installSnapshotEnabled;
 
   private final TimeDuration requestTimeoutDuration;
+  private final TimeDuration installSnapshotStreamTimeout;
+  private final int maxOutstandingInstallSnapshots;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
 
   private volatile StreamObservers appendLogRequestObserver;
@@ -88,6 +90,9 @@ public class GrpcLogAppender extends LogAppenderBase {
     final RaftProperties properties = server.getRaftServer().getProperties();
     this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
     this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties);
+    this.maxOutstandingInstallSnapshots = GrpcConfigKeys.Server.installSnapshotRequestElementLimit(properties);
+    this.installSnapshotStreamTimeout = GrpcConfigKeys.Server.installSnapshotRequestTimeout(properties)
+        .multiply(maxOutstandingInstallSnapshots);
     this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
 
@@ -597,8 +602,9 @@ public class GrpcLogAppender extends LogAppenderBase {
     StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
     final String requestId = UUID.randomUUID().toString();
     try {
-      snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
-          requestTimeoutDuration, 8, responseHandler); //FIXME: RATIS-1809
+      snapshotRequestObserver = getClient().installSnapshot(
+          getFollower().getName() + "-installSnapshot-" + requestId,
+          installSnapshotStreamTimeout, maxOutstandingInstallSnapshots, responseHandler);
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         if (isRunning()) {
           snapshotRequestObserver.onNext(request);


[ratis] 08/18: RATIS-1814: The group info command of the Ratis shell does not show the listener (#852)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 2fc1c83a6bb27f09a315906c212da3c1221245de
Author: qian0817 <qi...@gmail.com>
AuthorDate: Tue Mar 14 16:31:11 2023 +0800

    RATIS-1814: The group info command of the Ratis shell does not show the listener (#852)
    
    (cherry picked from commit 34a0740901e02b26298aac06295d4552169be788)
---
 .../src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6f761d1f3..c64aaebba 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -580,7 +580,10 @@ class RaftServerImpl implements RaftServer.Division,
       role.getLeaderState().ifPresent(
           leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
     } else {
-      getRaftConf().getAllPeers().stream()
+      RaftConfigurationImpl raftConf = getRaftConf();
+      Stream.concat(
+              raftConf.getAllPeers(RaftPeerRole.FOLLOWER).stream(),
+              raftConf.getAllPeers(RaftPeerRole.LISTENER).stream())
           .map(RaftPeer::getId)
           .filter(id -> !id.equals(getId()))
           .map(commitInfoCache::get)


[ratis] 18/18: RATIS-1821. Upgrade ratis-thirdparty version to 1.0.4 (#862)

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_readIndex
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 42c3c6ba40b062e07910b67a2a022e695744963f
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Wed Mar 22 12:59:25 2023 +0800

    RATIS-1821. Upgrade ratis-thirdparty version to 1.0.4 (#862)
    
    (cherry picked from commit d495846f0b754a67594cc48f5559ca36838fd700)
---
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2d447605a..599b71f2a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -206,11 +206,11 @@
     <maven.min.version>3.3.9</maven.min.version>
 
     <!-- Contains all shaded thirdparty dependencies -->
-    <ratis.thirdparty.version>1.0.3</ratis.thirdparty.version>
+    <ratis.thirdparty.version>1.0.4</ratis.thirdparty.version>
 
     <!-- Need these for the protobuf compiler. *MUST* match what is in ratis-thirdparty -->
     <shaded.protobuf.version>3.19.6</shaded.protobuf.version>
-    <shaded.grpc.version>1.48.1</shaded.grpc.version>
+    <shaded.grpc.version>1.51.1</shaded.grpc.version>
 
     <!-- Test properties -->
     <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>