You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:08 UTC

[iotdb] 08/09: control voting list size

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f9a5f89b4f64c1828987630c3757adfb26780d8c
Author: jt <jt...@163.com>
AuthorDate: Wed Nov 10 14:25:45 2021 +0800

    control voting list size
---
 .../src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java   | 5 ++++-
 .../main/java/org/apache/iotdb/cluster/expr/VotingLogList.java    | 4 ++++
 .../cluster/server/handlers/caller/AppendNodeEntryHandler.java    | 8 ++++++++
 .../java/org/apache/iotdb/cluster/server/member/RaftMember.java   | 4 +++-
 4 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 050230b..a2c62ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.expr;
 
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogDispatcher;
@@ -59,7 +60,8 @@ public class ExprMember extends MetaGroupMember {
   public static boolean bypassRaft = false;
   public static boolean useSlidingWindow = false;
 
-  private int windowCapacity = 10000;
+  private int windowCapacity =
+      ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem() * 2;
   private int windowLength = 0;
   private Log[] logWindow = new Log[windowCapacity];
   private long firstPosPrevIndex = 0;
@@ -282,6 +284,7 @@ public class ExprMember extends MetaGroupMember {
 
         Statistic.RAFT_WINDOW_LENGTH.add(windowLength);
       } else {
+        Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
         result.setStatus(Response.RESPONSE_LOG_MISMATCH);
         result.setHeader(getHeader());
         return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 7e13336..2723ac8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -104,4 +104,8 @@ public class VotingLogList {
   public synchronized void clear() {
     logList.clear();
   }
+
+  public int size() {
+    return logList.size();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 73088d2..b6c0df2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -88,6 +88,14 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     long resp = response.status;
 
     if (resp == RESPONSE_STRONG_ACCEPT) {
+      synchronized (log) {
+        if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
+            >= quorumSize) {
+          log.acceptedTime = System.nanoTime();
+        }
+        log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
+        log.notifyAll();
+      }
       member
           .getVotingLogList()
           .onStronglyAccept(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 81d4ff3..cd54aa6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1632,7 +1632,9 @@ public abstract class RaftMember {
       long waitStart = System.currentTimeMillis();
       long alreadyWait = 0;
       while (stronglyAcceptedNodeNum < quorumSize
-          && (!ENABLE_WEAK_ACCEPTANCE || (totalAccepted < quorumSize))
+          && (!ENABLE_WEAK_ACCEPTANCE
+              || (totalAccepted < allNodes.size() - 1)
+              || votingLogList.size() > config.getMaxNumOfLogsInMem())
           && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
           && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
         try {