You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:05 UTC

[iotdb] 05/09: Merge branch 'extract_sequencer' into expr

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d1dfe4b55054662253e5bf0e67032c053d7d4301
Merge: 918a705 d2f2645
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 15:19:35 2021 +0800

    Merge branch 'extract_sequencer' into expr
    
    # Conflicts:
    #	cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
    #	cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java

 .../apache/iotdb/cluster/config/ClusterConfig.java |  11 ++
 .../log/sequencing/AsynchronousSequencer.java      | 145 +++++++++++++++++
 .../iotdb/cluster/log/sequencing/LogSequencer.java |  42 +++++
 .../log/sequencing/LogSequencerFactory.java        |  29 ++++
 .../log/sequencing/SynchronousSequencer.java       | 103 ++++++++++++
 .../cluster/server/member/DataGroupMember.java     |   2 +
 .../cluster/server/member/MetaGroupMember.java     |   2 +
 .../iotdb/cluster/server/member/RaftMember.java    | 180 ++++++++++++---------
 8 files changed, 439 insertions(+), 75 deletions(-)

diff --cc cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 0d7f621,f1d424a..c9cbc59
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@@ -181,8 -181,8 +181,11 @@@ public class ClusterConfig 
  
    private boolean openServerRpcPort = false;
  
 +  private boolean useIndirectBroadcasting = false;
 +
+   private boolean useAsyncSequencing = false;
+ 
++
    /**
     * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
     * there is something error for getting the ip of the hostname, then set the internalIp as
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index e30f867,cb6bba1..81abb87
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@@ -41,8 -41,11 +41,9 @@@ import org.apache.iotdb.cluster.excepti
  import org.apache.iotdb.cluster.log.LogApplier;
  import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
  import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 -import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
  import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
  import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+ import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
  import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
  import org.apache.iotdb.cluster.partition.NodeAdditionResult;
  import org.apache.iotdb.cluster.partition.NodeRemovalResult;
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 41b5e50,d3b445f..2835cf3
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@@ -125,11 -124,11 +131,15 @@@ import static org.apache.iotdb.cluster.
  public abstract class RaftMember {
  
    private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
 -  public static final boolean USE_LOG_DISPATCHER = false;
 +  public static boolean USE_LOG_DISPATCHER = false;
 +  public static boolean USE_INDIRECT_LOG_DISPATCHER = false;
 +  public static boolean ENABLE_WEAK_ACCEPTANCE = true;
 +  public static boolean ENABLE_COMMIT_RETURN = false;
 +
+   protected static final LogSequencerFactory SEQUENCER_FACTORY =
+       ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing() ? new Factory()
+           : new SynchronousSequencer.Factory();
+ 
    private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
    private static final String MSG_FORWARD_ERROR =
        "{}: encountered an error when forwarding {} to" + " {}";
@@@ -184,9 -193,11 +204,11 @@@
     * offline.
     */
    volatile long lastHeartbeatReceivedTime;
 -  /**
 -   * the raft logs are all stored and maintained in the log manager
 -   */
 -  RaftLogManager logManager;
 -  /**
++
 +  /** the raft logs are all stored and maintained in the log manager */
 +  protected RaftLogManager logManager;
-   /**
++
++  /**s
     * the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
     * when this node is a leader, or start elections when this node is an elector.
     */
@@@ -255,14 -274,11 +285,16 @@@
     */
    protected PlanExecutor localExecutor;
  
 -  protected LogSequencer logSequencer;
 +  /** (logIndex, logTerm) -> append handler */
 +  protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers =
 +      new ConcurrentHashMap<>();
  
 -  protected RaftMember() {
 -  }
 +  protected VotingLogList votingLogList;
 +
 +  protected RaftMember() {}
 +
++  protected LogSequencer logSequencer;
+ 
    protected RaftMember(
        String name,
        AsyncClientPool asyncPool,
@@@ -1100,9 -1048,9 +1144,9 @@@
     * call this method. Will commit the log locally and send it to followers
     *
     * @return OK if over half of the followers accept the log or null if the leadership is lost
-    *     during the appending
+    * during the appending
     */
 -  TSStatus processPlanLocally(PhysicalPlan plan) {
 +  public TSStatus processPlanLocally(PhysicalPlan plan) {
      if (USE_LOG_DISPATCHER) {
        return processPlanLocallyV2(plan);
      }
@@@ -1173,8 -1119,10 +1215,9 @@@
      } else {
        log = new PhysicalPlanLog();
        ((PhysicalPlanLog) log).setPlan(plan);
 -      plan.setIndex(logManager.getLastLogIndex() + 1);
      }
  
+     // just like processPlanLocally,we need to check the size of log
      if (log.serialize().capacity() + Integer.BYTES
          >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
        logger.error(
@@@ -1214,17 -1137,13 +1232,18 @@@
      try {
        AppendLogResult appendLogResult =
            waitAppendResult(
 -              sendLogRequest.getVoteCounter(),
 +              sendLogRequest.getVotingLog(),
                sendLogRequest.getLeaderShipStale(),
 -              sendLogRequest.getNewLeaderTerm());
 +              sendLogRequest.getNewLeaderTerm(),
 +              sendLogRequest.getQuorumSize());
        Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
 -          sendLogRequest.getLog().getCreateTime());
 +          sendLogRequest.getVotingLog().getLog().getCreateTime());
+       long startTime;
        switch (appendLogResult) {
 +        case WEAK_ACCEPT:
 +          // TODO: change to weak
 +          Statistic.RAFT_WEAK_ACCEPT.add(1);
 +          return StatusUtils.OK;
          case OK:
            logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
            startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
@@@ -1599,13 -1518,9 +1623,13 @@@
      return term;
    }
  
-   protected synchronized LogDispatcher getLogDispatcher() {
+   public synchronized LogDispatcher getLogDispatcher() {
      if (logDispatcher == null) {
 -      logDispatcher = new LogDispatcher(this);
 +      if (USE_INDIRECT_LOG_DISPATCHER) {
 +        logDispatcher = new IndirectLogDispatcher(this);
 +      } else {
 +        logDispatcher = new LogDispatcher(this);
 +      }
      }
      return logDispatcher;
    }
@@@ -1965,23 -1840,15 +1991,25 @@@
        return AppendLogResult.TIME_OUT;
      }
  
 -    return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
 +    return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
 +  }
 +
 +  public void sendLogToFollower(
 +      VotingLog log,
 +      Node node,
 +      AtomicBoolean leaderShipStale,
 +      AtomicLong newLeaderTerm,
 +      AppendEntryRequest request,
 +      int quorumSize) {
 +    sendLogToFollower(
 +        log, node, leaderShipStale, newLeaderTerm, request, quorumSize, Collections.emptyList());
    }
  
-   /** Send "log" to "node". */
+   /**
+    * Send "log" to "node".
+    */
    public void sendLogToFollower(
 -      Log log,
 -      AtomicInteger voteCounter,
 +      VotingLog log,
        Node node,
        AtomicBoolean leaderShipStale,
        AtomicLong newLeaderTerm,
@@@ -2112,13 -1966,12 +2140,13 @@@
     * and append "log" to it. Otherwise report a log mismatch.
     *
     * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+    * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
     */
 -  protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
 +  protected AppendEntryResult appendEntry(
 +      long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
      long resp = checkPrevLogIndex(prevLogIndex);
      if (resp != Response.RESPONSE_AGREE) {
 -      return resp;
 +      return new AppendEntryResult(resp).setHeader(getHeader());
      }
  
      long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
@@@ -2134,15 -1982,17 +2162,17 @@@
      Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
      if (success != -1) {
        logger.debug("{} append a new log {}", name, log);
 -      resp = Response.RESPONSE_AGREE;
 +      result.status = Response.RESPONSE_STRONG_ACCEPT;
      } else {
        // the incoming log points to an illegal position, reject it
 -      resp = Response.RESPONSE_LOG_MISMATCH;
 +      result.status = Response.RESPONSE_LOG_MISMATCH;
      }
 -    return resp;
 +    return result;
    }
  
-   /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+   /**
+    * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+    */
    private boolean waitForPrevLog(long prevLogIndex) {
      long waitStart = System.currentTimeMillis();
      long alreadyWait = 0;
@@@ -2188,9 -2038,9 +2218,9 @@@
     *
     * @param logs append logs
     * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+    * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
     */
 -  private long appendEntries(
 +  protected AppendEntryResult appendEntries(
        long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
      logger.debug(
          "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",