You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:08 UTC
[iotdb] 08/09: control voting list size
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f9a5f89b4f64c1828987630c3757adfb26780d8c
Author: jt <jt...@163.com>
AuthorDate: Wed Nov 10 14:25:45 2021 +0800
control voting list size
---
.../src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java | 5 ++++-
.../main/java/org/apache/iotdb/cluster/expr/VotingLogList.java | 4 ++++
.../cluster/server/handlers/caller/AppendNodeEntryHandler.java | 8 ++++++++
.../java/org/apache/iotdb/cluster/server/member/RaftMember.java | 4 +++-
4 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 050230b..a2c62ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.expr;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher;
@@ -59,7 +60,8 @@ public class ExprMember extends MetaGroupMember {
public static boolean bypassRaft = false;
public static boolean useSlidingWindow = false;
- private int windowCapacity = 10000;
+ private int windowCapacity =
+ ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem() * 2;
private int windowLength = 0;
private Log[] logWindow = new Log[windowCapacity];
private long firstPosPrevIndex = 0;
@@ -282,6 +284,7 @@ public class ExprMember extends MetaGroupMember {
Statistic.RAFT_WINDOW_LENGTH.add(windowLength);
} else {
+ Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
result.setStatus(Response.RESPONSE_LOG_MISMATCH);
result.setHeader(getHeader());
return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 7e13336..2723ac8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -104,4 +104,8 @@ public class VotingLogList {
public synchronized void clear() {
logList.clear();
}
+
+ public int size() {
+ return logList.size();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 73088d2..b6c0df2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -88,6 +88,14 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
long resp = response.status;
if (resp == RESPONSE_STRONG_ACCEPT) {
+ synchronized (log) {
+ if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
+ >= quorumSize) {
+ log.acceptedTime = System.nanoTime();
+ }
+ log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
+ log.notifyAll();
+ }
member
.getVotingLogList()
.onStronglyAccept(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 81d4ff3..cd54aa6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1632,7 +1632,9 @@ public abstract class RaftMember {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
while (stronglyAcceptedNodeNum < quorumSize
- && (!ENABLE_WEAK_ACCEPTANCE || (totalAccepted < quorumSize))
+ && (!ENABLE_WEAK_ACCEPTANCE
+ || (totalAccepted < allNodes.size() - 1)
+ || votingLogList.size() > config.getMaxNumOfLogsInMem())
&& alreadyWait < RaftServer.getWriteOperationTimeoutMS()
&& !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
try {