You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/06 01:53:56 UTC

[iotdb] branch expr_vgraft updated: enable out-of-order enqueue for NB-Raft

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_vgraft by this push:
     new 7cb4e6d549 enable out-of-order enqueue for NB-Raft
7cb4e6d549 is described below

commit 7cb4e6d549cd4e31c7f0c6756269e9fa43080ec6
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Oct 6 09:53:51 2022 +0800

    enable out-of-order enqueue for NB-Raft
---
 .../log/sequencing/SynchronousSequencer.java       | 27 +++++++++++++-----
 .../iotdb/cluster/server/member/RaftMember.java    | 33 ++++++++++++++++------
 2 files changed, 44 insertions(+), 16 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 07ce3a1cc7..f44a10a03e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -51,9 +51,19 @@ public class SynchronousSequencer implements LogSequencer {
     this.logManager = logManager;
   }
 
+  private SendLogRequest enqueueEntry(SendLogRequest sendLogRequest) {
+    long startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+
+    if (member.getAllNodes().size() > 1) {
+      member.getLogDispatcher().offer(sendLogRequest);
+    }
+    Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+    return sendLogRequest;
+  }
+
   @Override
   public SendLogRequest sequence(Log log) {
-    SendLogRequest sendLogRequest;
+    SendLogRequest sendLogRequest = null;
 
     long startTime =
         Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
@@ -87,15 +97,13 @@ public class SynchronousSequencer implements LogSequencer {
 
           startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
           sendLogRequest = buildSendLogRequest(log);
+          log.setCreateTime(System.nanoTime());
           Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
 
-          startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
-          log.setCreateTime(System.nanoTime());
-          if (member.getAllNodes().size() > 1) {
-            member.getLogDispatcher().offer(sendLogRequest);
+          if (!(ClusterDescriptor.getInstance().getConfig().isUseFollowerSlidingWindow()
+              && ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance())) {
+            sendLogRequest = enqueueEntry(sendLogRequest);
           }
-          Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
-
           break;
         }
         try {
@@ -111,6 +119,11 @@ public class SynchronousSequencer implements LogSequencer {
       }
     }
 
+    if (ClusterDescriptor.getInstance().getConfig().isUseFollowerSlidingWindow()
+        && ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance()) {
+      sendLogRequest = enqueueEntry(sendLogRequest);
+    }
+
     return sendLogRequest;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 5c29f6e3f3..c89aa376ac 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1887,6 +1887,12 @@ public abstract class RaftMember implements RaftMemberMBean {
   }
 
   public AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
+    AppendEntryRequest request = buildAppendEntryRequestBasic(log, serializeNow);
+    request = buildAppendEntryRequestExtended(request, log, serializeNow);
+    return request;
+  }
+
+  protected AppendEntryRequest buildAppendEntryRequestBasic(Log log, boolean serializeNow) {
     AppendEntryRequest request = new AppendEntryRequest();
     request.setTerm(term.get());
     if (serializeNow) {
@@ -1895,25 +1901,34 @@ public abstract class RaftMember implements RaftMemberMBean {
       log.setByteSize(byteBuffer.array().length);
       request.entry = byteBuffer;
       Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
-      request.setEntryHash(byteBuffer.hashCode());
     }
-    request.setLeader(getThisNode());
-    // don't need lock because even if it's larger than the commitIndex when appending this log to
-    // logManager, the follower can handle the larger commitIndex with no effect
-    request.setLeaderCommit(logManager.getCommitLogIndex());
-    request.setPrevLogIndex(log.getCurrLogIndex() - 1);
-    request.setIsFromLeader(true);
-    request.setLeaderSignature(KeyManager.INSTANCE.getNodeSignature());
-
     try {
       request.setPrevLogTerm(logManager.getTerm(log.getCurrLogIndex() - 1));
     } catch (Exception e) {
       logger.error("getTerm failed for newly append entries", e);
     }
+    request.setLeader(getThisNode());
+    // don't need lock because even if it's larger than the commitIndex when appending this log to
+    // logManager, the follower can handle the larger commitIndex with no effect
+    request.setLeaderCommit(logManager.getCommitLogIndex());
+    request.setPrevLogIndex(log.getCurrLogIndex() - 1);
     if (getHeader() != null) {
       // data groups use header to find a particular DataGroupMember
       request.setHeader(getHeader());
     }
+
+    return request;
+  }
+
+  protected AppendEntryRequest buildAppendEntryRequestExtended(AppendEntryRequest request, Log log,
+      boolean serializeNow) {
+    request.setIsFromLeader(true);
+    if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft()) {
+      request.setLeaderSignature(KeyManager.INSTANCE.getNodeSignature());
+      if (serializeNow) {
+        request.setEntryHash(request.entry.hashCode());
+      }
+    }
     return request;
   }