You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/07/08 13:44:41 UTC

[iotdb] branch nbraft created (now 44427a9830)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch nbraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 44427a9830 nbraft snapshot

This branch includes the following new commits:

     new 44427a9830 nbraft snapshot

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: nbraft snapshot

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch nbraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 44427a98301e6dd02e60dfc51fd359e932e97e3b
Author: jt <jt...@163.com>
AuthorDate: Fri Jul 8 21:44:26 2022 +0800

    nbraft snapshot
---
 .../org/apache/iotdb/cluster/config/ClusterConfig.java  |  9 +++++++++
 .../apache/iotdb/cluster/config/ClusterDescriptor.java  |  4 ++++
 .../iotdb/cluster/log/FragmentedLogDispatcher.java      |  6 +++---
 .../org/apache/iotdb/cluster/log/LogDispatcher.java     |  2 +-
 .../java/org/apache/iotdb/cluster/log/VotingLog.java    |  6 +++---
 .../org/apache/iotdb/cluster/log/VotingLogList.java     |  6 ++----
 .../cluster/log/appender/SlidingWindowLogAppender.java  | 16 ++++++++++++++--
 .../iotdb/cluster/log/applier/DataLogApplier.java       |  3 ++-
 .../iotdb/cluster/log/logtypes/PhysicalPlanLog.java     | 14 +++++++++++++-
 .../server/handlers/caller/AppendNodeEntryHandler.java  | 16 +++++++++++-----
 .../apache/iotdb/cluster/server/member/RaftMember.java  | 17 +++++++++--------
 .../org/apache/iotdb/cluster/server/monitor/Timer.java  | 12 ++++++++++++
 12 files changed, 83 insertions(+), 28 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 187509b2ef..fb2bbcb8e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -185,6 +185,7 @@ public class ClusterConfig {
   private boolean useAsyncSequencing = true;
 
   private boolean useFollowerSlidingWindow = true;
+  private int slidingWindowSize = 10000;
 
   private boolean enableWeakAcceptance = true;
 
@@ -600,4 +601,12 @@ public class ClusterConfig {
   public void setOptimizeIndirectBroadcasting(boolean optimizeIndirectBroadcasting) {
     this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
   }
+
+  public int getSlidingWindowSize() {
+    return slidingWindowSize;
+  }
+
+  public void setSlidingWindowSize(int slidingWindowSize) {
+    this.slidingWindowSize = slidingWindowSize;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index a089e2d75a..765ddbdc1b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -323,6 +323,10 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "use_follower_sliding_window",
                 String.valueOf(config.isUseFollowerSlidingWindow()))));
+    config.setSlidingWindowSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "sliding_window_size", String.valueOf(config.getSlidingWindowSize()))));
 
     config.setEnableWeakAcceptance(
         Boolean.parseBoolean(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index f8954effdc..c6f3c8498a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -45,13 +45,13 @@ public class FragmentedLogDispatcher extends LogDispatcher {
 
     long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
     request.getVotingLog().getLog().setEnqueueTime(System.nanoTime());
-    for (int i = 0; i < nodesLogQueues.size(); i++) {
-      BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i);
+    int i = 0;
+    for (BlockingQueue<SendLogRequest> nodeLogQueue : nodesLogQueues.values()) {
       SendLogRequest fragmentedRequest = new SendLogRequest(request);
       fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog()));
       fragmentedRequest
           .getVotingLog()
-          .setLog(new FragmentedLog((FragmentedLog) request.getVotingLog().getLog(), i));
+          .setLog(new FragmentedLog((FragmentedLog) request.getVotingLog().getLog(), i++));
       try {
         boolean addSucceeded;
         if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 3e659a1468..23729063ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -80,7 +80,7 @@ public class LogDispatcher {
   Map<Node, Boolean> nodesEnabled = new HashMap<>();
   ExecutorService executorService;
   private static ExecutorService serializationService =
-      IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
+      IoTDBThreadPoolFactory.newFixedThreadPool(
           Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
 
   public static int bindingThreadNum = clusterConfig.getDispatcherBindingThreadNum();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index ebfdccc999..447ff36363 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -32,10 +32,10 @@ public class VotingLog {
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
-    stronglyAcceptedNodeIds = new HashSet<>(groupSize);
-    weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+    stronglyAcceptedNodeIds = new HashSet<>(groupSize * 2);
+    weaklyAcceptedNodeIds = new HashSet<>(groupSize * 2);
     acceptedTime = new AtomicLong();
-    failedNodeIds = new HashSet<>(groupSize);
+    failedNodeIds = new HashSet<>(groupSize * 2);
   }
 
   public VotingLog(VotingLog another) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 7b1dde2d3e..86721ed8dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -78,9 +78,7 @@ public class VotingLogList {
         VotingLog votingLog = logList.get(i);
         if (votingLog.getLog().getCurrLogIndex() <= index
             && votingLog.getLog().getCurrLogTerm() == term) {
-          synchronized (votingLog) {
-            votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
-          }
+          votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
           if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
             lastEntryIndexToCommit = i;
           }
@@ -110,8 +108,8 @@ public class VotingLogList {
       }
 
       for (VotingLog acceptedLog : acceptedLogs) {
+        acceptedLog.acceptedTime.set(System.nanoTime());
         synchronized (acceptedLog) {
-          acceptedLog.acceptedTime.set(System.nanoTime());
           acceptedLog.notifyAll();
         }
         if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index 1ab08449bb..c3b2339465 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -40,7 +40,7 @@ public class SlidingWindowLogAppender implements LogAppender {
 
   private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLogAppender.class);
 
-  private int windowCapacity = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
+  private int windowCapacity = ClusterDescriptor.getInstance().getConfig().getSlidingWindowSize();
   private int windowLength = 0;
   private Log[] logWindow = new Log[windowCapacity];
   private long firstPosPrevIndex;
@@ -48,6 +48,7 @@ public class SlidingWindowLogAppender implements LogAppender {
 
   private RaftMember member;
   private RaftLogManager logManager;
+  private Object oowWaitCond = new Object();
 
   public SlidingWindowLogAppender(RaftMember member) {
     this.member = member;
@@ -144,6 +145,7 @@ public class SlidingWindowLogAppender implements LogAppender {
     for (int i = 1; i <= step; i++) {
       logWindow[windowCapacity - i] = null;
     }
+    windowLength -= step;
     firstPosPrevIndex = logManager.getLastLogIndex();
   }
 
@@ -197,7 +199,9 @@ public class SlidingWindowLogAppender implements LogAppender {
       retryTime = System.currentTimeMillis() - start;
       if (result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < maxRetry) {
         try {
-          Thread.sleep(10);
+          synchronized (oowWaitCond) {
+            oowWaitCond.wait(1);
+          }
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           break;
@@ -219,6 +223,7 @@ public class SlidingWindowLogAppender implements LogAppender {
     long appendedPos = 0;
 
     AppendEntryResult result = new AppendEntryResult();
+    boolean flushed = false;
     synchronized (logManager) {
       int windowPos = (int) (log.getCurrLogIndex() - logManager.getLastLogIndex() - 1);
       if (windowPos < 0) {
@@ -238,6 +243,7 @@ public class SlidingWindowLogAppender implements LogAppender {
         checkLog(windowPos);
         if (windowPos == 0) {
           appendedPos = flushWindow(result, leaderCommit);
+          flushed = true;
         } else {
           result.status = Response.RESPONSE_WEAK_ACCEPT;
         }
@@ -250,6 +256,12 @@ public class SlidingWindowLogAppender implements LogAppender {
       }
     }
 
+    if (flushed) {
+      synchronized (oowWaitCond) {
+        oowWaitCond.notifyAll();
+      }
+    }
+
     if (appendedPos == -1) {
       // the incoming log points to an illegal position, reject it
       result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 7e0e033483..545c39ace7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
@@ -91,7 +92,7 @@ public class DataLogApplier extends BaseApplier {
                 closeFileLog.getPartitionId(),
                 closeFileLog.isSeq(),
                 false);
-      } else {
+      } else if (!(log instanceof FragmentedLog)) {
         logger.error("Unsupported log: {}", log);
       }
     } catch (Exception e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index 5d5793d350..2a2092fbbd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -57,9 +57,21 @@ public class PhysicalPlanLog extends Log {
     return DEFAULT_BUFFER_SIZE;
   }
 
+  private ThreadLocal<PublicBAOS> baosThreadLocal = new ThreadLocal<>();
+
+  private PublicBAOS getSerializeOutputStream() {
+    PublicBAOS publicBAOS = baosThreadLocal.get();
+    if (publicBAOS == null) {
+      publicBAOS = new PublicBAOS(getDefaultBufferSize());
+      baosThreadLocal.set(publicBAOS);
+    }
+    publicBAOS.reset();
+    return publicBAOS;
+  }
+
   @Override
   public ByteBuffer serialize() {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultBufferSize());
+    PublicBAOS byteArrayOutputStream = getSerializeOutputStream();
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) PHYSICAL_PLAN.ordinal());
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 7f8c7df4f4..cbd0682f5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -90,6 +90,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     long resp = response.status;
 
     if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
+      long operationStartTime = Statistic.RAFT_SENDER_HANDLE_STRONG_ACCEPT.getOperationStartTime();
       member
           .getVotingLogList()
           .onStronglyAccept(
@@ -97,6 +98,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
               log.getLog().getCurrLogTerm(),
               trueReceiver.nodeIdentifier);
       member.getPeer(trueReceiver).setMatchIndex(response.lastLogIndex);
+      Statistic.RAFT_SENDER_HANDLE_STRONG_ACCEPT.calOperationCostTimeFromStart(operationStartTime);
     } else if (resp > 0) {
       // a response > 0 is the follower's term
       // the leader ship is stale, wait for the new leader's heartbeat
@@ -120,14 +122,18 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
             new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
-      synchronized (log) {
+      long operationStartTime = Statistic.RAFT_SENDER_HANDLE_WEAK_ACCEPT.getOperationStartTime();
+      synchronized (log.getWeaklyAcceptedNodeIds()) {
         log.getWeaklyAcceptedNodeIds().add(trueReceiver.nodeIdentifier);
-        if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
-            >= quorumSize) {
-          log.acceptedTime.set(System.nanoTime());
-        }
+      }
+      if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
+          >= quorumSize) {
+        log.acceptedTime.set(System.nanoTime());
+      }
+      synchronized (log) {
         log.notifyAll();
       }
+      Statistic.RAFT_SENDER_HANDLE_WEAK_ACCEPT.calOperationCostTimeFromStart(operationStartTime);
     } else {
       // e.g., Response.RESPONSE_LOG_MISMATCH
       if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_OUT_OF_WINDOW) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0d605d3f46..9401d6c9cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -598,6 +598,14 @@ public abstract class RaftMember implements RaftMemberMBean {
     long operationStartTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
     Thread.currentThread()
         .setName(getThreadBaseName() + "-appending-" + (request.prevLogIndex + 1));
+    //    if (true) {
+    //      AppendEntryResult result = new AppendEntryResult();
+    //      result.setLastLogTerm(request.prevLogTerm);
+    //      result.setLastLogIndex(request.prevLogIndex + 1);
+    //      result.setStatus(Response.RESPONSE_STRONG_ACCEPT);
+    //      result.setHeader(getHeader());
+    //      return result;
+    //    }
     AppendEntryResult result = appendEntryInternal(request);
     Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(operationStartTime);
     return result;
@@ -1761,14 +1769,7 @@ public abstract class RaftMember implements RaftMemberMBean {
           logger.warn("Unexpected interruption when sending a log", e);
         }
         Thread.currentThread()
-            .setName(
-                threadBaseName
-                    + "-waiting-"
-                    + log.getLog().getCurrLogIndex()
-                    + "-"
-                    + log.getStronglyAcceptedNodeIds()
-                    + "-"
-                    + log.getWeaklyAcceptedNodeIds());
+            .setName(threadBaseName + "-waiting-" + log.getLog().getCurrLogIndex());
         alreadyWait = (System.nanoTime() - waitStart) / 1000000;
         if (alreadyWait > nextTimeToPrint) {
           logger.info(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 72892bdb56..4abde4d974 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -152,6 +152,18 @@ public class Timer {
         TIME_SCALE,
         true,
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_HANDLE_STRONG_ACCEPT(
+        RAFT_MEMBER_SENDER,
+        "handle strong accept",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_HANDLE_WEAK_ACCEPT(
+        RAFT_MEMBER_SENDER,
+        "handle weak accept",
+        TIME_SCALE,
+        true,
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_RECEIVER_RELAY_OFFER_LOG(
         RAFT_MEMBER_RECEIVER,
         "relay offer log",