You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/10 02:53:22 UTC

[iotdb] branch expr_vgraft updated: refactor commitTo

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_vgraft by this push:
     new ffba16a54b refactor commitTo
ffba16a54b is described below

commit ffba16a54bae1e69e68fed72c2414feb8278d821
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Oct 10 10:53:16 2022 +0800

    refactor commitTo
---
 .../org/apache/iotdb/cluster/log/VotingLog.java    |  14 +-
 .../apache/iotdb/cluster/log/VotingLogList.java    |  21 +--
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 141 ++++++++++++---------
 .../iotdb/cluster/server/member/RaftMember.java    |   7 +-
 .../apache/iotdb/cluster/server/monitor/Timer.java |  11 +-
 .../org/apache/iotdb/cluster/utils/LogUtils.java   |   5 +
 6 files changed, 107 insertions(+), 92 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index 848a61c967..7e9eee9e79 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -22,27 +22,31 @@ package org.apache.iotdb.cluster.log;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 
 public class VotingLog {
   protected Log log;
+  // for NB-Raft
   protected Set<Integer> weaklyAcceptedNodeIds;
   protected Set<Integer> failedNodeIds;
+  // for VGRaft
   protected Set<byte[]> signatures;
-  public AtomicLong acceptedTime;
   private boolean hasFailed;
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
-    weaklyAcceptedNodeIds = new HashSet<>(groupSize);
-    acceptedTime = new AtomicLong();
     failedNodeIds = new HashSet<>(groupSize);
-    signatures = new HashSet<>(groupSize);
+    if (ClusterDescriptor.getInstance().getConfig().isUseFollowerSlidingWindow()) {
+      weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+    }
+    if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft()) {
+      signatures = new HashSet<>(groupSize);
+    }
   }
 
   public VotingLog(VotingLog another) {
     this.log = another.log;
     this.weaklyAcceptedNodeIds = another.weaklyAcceptedNodeIds;
-    this.acceptedTime = another.acceptedTime;
     this.failedNodeIds = another.failedNodeIds;
     this.signatures = another.signatures;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 398a80eda9..d1b410f484 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -23,14 +23,12 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,8 +37,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER;
-
 public class VotingLogList {
 
   private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
@@ -74,23 +70,14 @@ public class VotingLogList {
   private boolean tryCommit() {
     RaftLogManager logManager = member.getLogManager();
 
-    List<Log> entries = Collections.emptyList();
     if (computeNewCommitIndex()
         && logManager != null
         && newCommitIndex.get() > logManager.getCommitLogIndex()) {
-      long start = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
-      synchronized (logManager) {
-        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
-            start);
-        long operationStartTime = RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
-        try {
-          logManager.commitTo(newCommitIndex.get());
-        } catch (LogExecutionException e) {
-          logger.error("Fail to commit {}", newCommitIndex, e);
-        }
-        RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(operationStartTime);
+      try {
+        logManager.commitTo(newCommitIndex.get());
+      } catch (LogExecutionException e) {
+        logger.error("Fail to commit {}", newCommitIndex, e);
       }
-
       return true;
     } else {
       return false;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index eec035dc46..46f4251ea5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.cluster.log.manage;
 
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_COMMIT_LOG_IN_MANAGER;
+
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.EntryCompactedException;
 import org.apache.iotdb.cluster.exception.EntryUnavailableException;
@@ -37,6 +39,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IStateMachine;
 
+import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -589,44 +592,16 @@ public abstract class RaftLogManager {
     return entries;
   }
 
-  /**
-   * Used by MaybeCommit or MaybeAppend or follower to commit newly committed entries.
-   *
-   * @param newCommitIndex request commitIndex
-   */
-  public void commitTo(long newCommitIndex) throws LogExecutionException {
-    if (commitIndex >= newCommitIndex) {
-      return;
-    }
-    long startTime = Statistic.RAFT_SENDER_COMMIT_GET_LOGS.getOperationStartTime();
-    long lo = getUnCommittedEntryManager().getFirstUnCommittedIndex();
-    long hi = newCommitIndex + 1;
-    List<Log> entries = new ArrayList<>(getUnCommittedEntryManager().getEntries(lo, hi));
-    Statistic.RAFT_SENDER_COMMIT_GET_LOGS.calOperationCostTimeFromStart(startTime);
-
-    if (entries.isEmpty()) {
-      return;
-    }
-
-    long commitLogIndex = getCommitLogIndex();
-    long firstLogIndex = entries.get(0).getCurrLogIndex();
-    if (commitLogIndex >= firstLogIndex) {
-      logger.warn(
-          "Committing logs that has already been committed: {} >= {}",
-          commitLogIndex,
-          firstLogIndex);
-      entries
-          .subList(0, (int) (getCommitLogIndex() - entries.get(0).getCurrLogIndex() + 1))
-          .clear();
-    }
-
+  private void checkCompaction(List<Log> entries) {
     boolean needToCompactLog = false;
+    // calculate the number of old committed entries to be reserved by entry number
     int numToReserveForNew = minNumOfLogsInMem;
     if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
       needToCompactLog = true;
       numToReserveForNew = maxNumOfLogsInMem - entries.size();
     }
 
+    // calculate the number of old committed entries to be reserved by entry size
     long newEntryMemSize = 0;
     for (Log entry : entries) {
       if (entry.getByteSize() == 0) {
@@ -634,7 +609,7 @@ public abstract class RaftLogManager {
             "{} should not go here, must be send to the follower, "
                 + "so the log has been serialized exclude single node mode",
             entry);
-        // entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
+        entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
       }
       newEntryMemSize += entry.getByteSize();
     }
@@ -645,17 +620,33 @@ public abstract class RaftLogManager {
           committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
     }
 
+    // reserve old committed entries with the minimum number
     if (needToCompactLog) {
       int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
       int sizeToReserveForConfig = minNumOfLogsInMem;
-      startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
-      synchronized (this) {
-        innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
-      }
+      long startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
+      innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
       Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
     }
+  }
+
+  private void removedCommitted(List<Log> entries) {
+    long commitLogIndex = getCommitLogIndex();
+    long firstLogIndex = entries.get(0).getCurrLogIndex();
+    if (commitLogIndex >= firstLogIndex) {
+      logger.warn(
+          "Committing logs that has already been committed: {} >= {}",
+          commitLogIndex,
+          firstLogIndex);
+      entries
+          .subList(0, (int) (getCommitLogIndex() - entries.get(0).getCurrLogIndex() + 1))
+          .clear();
+    }
+  }
+
+  private void commitEntries(List<Log> entries) throws LogExecutionException {
+    long startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
 
-    startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
     for (Log entry : entries) {
       Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.calOperationCostTimeFromStart(
           entry.getCreateTime());
@@ -685,31 +676,43 @@ public abstract class RaftLogManager {
       logger.error("{}: persistent raft log error:", name, e);
       throw new LogExecutionException(e);
     } finally {
-      Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(startTime);
+      Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(
+          startTime);
     }
+  }
 
-    startTime = Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
-    applyEntries(entries);
-    Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
+  /**
+   * Used by MaybeCommit or MaybeAppend or follower to commit newly committed entries.
+   *
+   * @param newCommitIndex request commitIndex
+   */
+  public void commitTo(long newCommitIndex) throws LogExecutionException {
+    long start = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+    if (commitIndex >= newCommitIndex) {
+      return;
+    }
 
-    long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
-    if (unappliedLogSize > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
-      logger.info(
-          "There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
-          unappliedLogSize);
-      try {
-        synchronized (changeApplyCommitIndexCond) {
-          changeApplyCommitIndexCond.wait(
-              Math.min(
-                  (unappliedLogSize
-                              - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem())
-                          / 10
-                      + 1,
-                  1000));
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+    synchronized (this) {
+      Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+          start);
+      long operationStartTime = RAFT_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+
+      long startTime = Statistic.RAFT_SENDER_COMMIT_GET_LOGS.getOperationStartTime();
+      long lo = getUnCommittedEntryManager().getFirstUnCommittedIndex();
+      long hi = newCommitIndex + 1;
+      List<Log> entries = new ArrayList<>(getUnCommittedEntryManager().getEntries(lo, hi));
+      Statistic.RAFT_SENDER_COMMIT_GET_LOGS.calOperationCostTimeFromStart(startTime);
+
+      if (entries.isEmpty()) {
+        return;
       }
+
+      removedCommitted(entries);
+      checkCompaction(entries);
+      commitEntries(entries);
+      applyEntries(entries);
+
+      RAFT_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(operationStartTime);
     }
   }
 
@@ -736,9 +739,31 @@ public abstract class RaftLogManager {
    * @param entries applying entries
    */
   void applyEntries(List<Log> entries) {
+    long startTime = Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
     for (Log entry : entries) {
       applyEntry(entry);
     }
+    Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
+
+    long unappliedLogSize = getCommitLogIndex() - maxHaveAppliedCommitIndex;
+    if (unappliedLogSize > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
+      logger.info(
+          "There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
+          unappliedLogSize);
+      try {
+        synchronized (changeApplyCommitIndexCond) {
+          changeApplyCommitIndexCond.wait(
+              Math.min(
+                  (unappliedLogSize
+                      - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem())
+                      / 10
+                      + 1,
+                  1000));
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
   }
 
   public void applyEntry(Log entry) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 267c0ea899..0713ea7a56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1747,14 +1747,12 @@ public abstract class RaftMember implements RaftMemberMBean {
           logger.info(
               "Still not receive enough votes for {}, weakly "
                   + "accepted {}, wait {}ms, wait to sequence {}ms, wait to enqueue "
-                  + "{}ms, wait to accept "
                   + "{}ms",
               log,
               log.getWeaklyAcceptedNodeIds(),
               alreadyWait,
               (log.getLog().getSequenceStartTime() - waitStart) / 1000000,
-              (log.getLog().getEnqueueTime() - waitStart) / 1000000,
-              (log.acceptedTime.get() - waitStart) / 1000000);
+              (log.getLog().getEnqueueTime() - waitStart) / 1000000);
           nextTimeToPrint *= 2;
         }
         totalAccepted = votingLogList.totalAcceptedNodeNum(log);
@@ -1801,9 +1799,6 @@ public abstract class RaftMember implements RaftMemberMBean {
     weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
     stronglyAccepted = totalAccepted - weaklyAccepted;
 
-    if (log.acceptedTime.get() != 0) {
-      Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
-    }
     Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.calOperationCostTimeFromStart(startTime);
 
     // a node has a larger term than the local node, so this node is no longer a valid leader
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index f09bfd5956..07b35b43ba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -192,7 +192,7 @@ public class Timer {
         TIME_SCALE,
         true,
         RAFT_SENDER_COMMIT_LOG),
-    RAFT_SENDER_COMMIT_LOG_IN_MANAGER(
+    RAFT_COMMIT_LOG_IN_MANAGER(
         RAFT_MEMBER_SENDER,
         "commit log in log manager",
         TIME_SCALE,
@@ -209,25 +209,25 @@ public class Timer {
         "get logs to be committed",
         TIME_SCALE,
         RaftMember.USE_LOG_DISPATCHER,
-        RAFT_SENDER_COMMIT_LOG_IN_MANAGER),
+        RAFT_COMMIT_LOG_IN_MANAGER),
     RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS(
         RAFT_MEMBER_SENDER,
         "delete logs exceeding capacity",
         TIME_SCALE,
         RaftMember.USE_LOG_DISPATCHER,
-        RAFT_SENDER_COMMIT_LOG_IN_MANAGER),
+        RAFT_COMMIT_LOG_IN_MANAGER),
     RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS(
         RAFT_MEMBER_SENDER,
         "append and stable committed logs",
         TIME_SCALE,
         RaftMember.USE_LOG_DISPATCHER,
-        RAFT_SENDER_COMMIT_LOG_IN_MANAGER),
+        RAFT_COMMIT_LOG_IN_MANAGER),
     RAFT_SENDER_COMMIT_APPLY_LOGS(
         RAFT_MEMBER_SENDER,
         "apply after committing logs",
         TIME_SCALE,
         RaftMember.USE_LOG_DISPATCHER,
-        RAFT_SENDER_COMMIT_LOG_IN_MANAGER),
+        RAFT_COMMIT_LOG_IN_MANAGER),
     RAFT_SENDER_COMMIT_TO_CONSUMER_LOGS(
         RAFT_MEMBER_SENDER,
         "provide log to consumer",
@@ -363,7 +363,6 @@ public class Timer {
     RAFT_SENT_ENTRY_SIZE(RAFT_MEMBER_SENDER, "sent entry size", 1, true, ROOT),
     RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
     RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
-    RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
     RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
     RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
     RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/LogUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/LogUtils.java
new file mode 100644
index 0000000000..8bde2e2672
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/LogUtils.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.cluster.utils;
+
+public class LogUtils {
+
+}