You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:04 UTC

[iotdb] 04/09: add sequencer factory

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d2f26457e52132f04f28ac0bb22360d5bb99c06c
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 15:15:07 2021 +0800

    add sequencer factory
---
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 ++
 .../log/sequencing/AsynchronousSequencer.java      |   8 ++
 .../log/sequencing/LogSequencerFactory.java        |  29 +++++
 .../log/sequencing/SynchronousSequencer.java       |   8 ++
 .../cluster/server/member/DataGroupMember.java     |   2 +-
 .../cluster/server/member/MetaGroupMember.java     |   2 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 144 ++++++++++++++-------
 7 files changed, 155 insertions(+), 48 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index afa202d..f1d424a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -181,6 +181,8 @@ public class ClusterConfig {
 
   private boolean openServerRpcPort = false;
 
+  private boolean useAsyncSequencing = false;
+
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
    * there is something error for getting the ip of the hostname, then set the internalIp as
@@ -539,4 +541,12 @@ public class ClusterConfig {
   public void setClusterInfoRpcPort(int clusterInfoRpcPort) {
     this.clusterInfoRpcPort = clusterInfoRpcPort;
   }
+
+  public boolean isUseAsyncSequencing() {
+    return useAsyncSequencing;
+  }
+
+  public void setUseAsyncSequencing(boolean useAsyncSequencing) {
+    this.useAsyncSequencing = useAsyncSequencing;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index eb33bea..63cc38b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -134,4 +134,12 @@ public class AsynchronousSequencer implements LogSequencer {
   public void setLogManager(RaftLogManager logManager) {
     this.logManager = logManager;
   }
+
+  public static class Factory implements LogSequencerFactory {
+
+    @Override
+    public LogSequencer create(RaftMember member, RaftLogManager logManager) {
+      return new AsynchronousSequencer(member, logManager);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
new file mode 100644
index 0000000..627ef84
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.cluster.log.sequencing;
+
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+
+public interface LogSequencerFactory {
+
+  LogSequencer create(RaftMember member, RaftLogManager logManager);
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 4327065..4da61b5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -92,4 +92,12 @@ public class SynchronousSequencer implements LogSequencer {
 
     return new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm, appendEntryRequest);
   }
+
+  public static class Factory implements LogSequencerFactory {
+
+    @Override
+    public LogSequencer create(RaftMember member, RaftLogManager logManager) {
+      return new SynchronousSequencer(member, logManager);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 6b66216..4adc4d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -211,7 +211,7 @@ public class DataGroupMember extends RaftMember {
     logManager =
         new FilePartitionedSnapshotLogManager(
             dataLogApplier, metaGroupMember.getPartitionTable(), allNodes.get(0), thisNode, this);
-    logSequencer = new SynchronousSequencer(this, logManager);
+    logSequencer = SEQUENCER_FACTORY.create(this, logManager);
     initPeerMap();
     term.set(logManager.getHardState().getCurrentTerm());
     voteFor = logManager.getHardState().getVoteFor();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index ef3ca8b..cb6bba1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -266,7 +266,7 @@ public class MetaGroupMember extends RaftMember {
     // committed logs are applied to the state machine (the IoTDB instance) through the applier
     LogApplier metaLogApplier = new MetaLogApplier(this);
     logManager = new MetaSingleSnapshotLogManager(metaLogApplier, this);
-    logSequencer = new SynchronousSequencer(this, logManager);
+    logSequencer = SEQUENCER_FACTORY.create(this, logManager);
     term.set(logManager.getHardState().getCurrentTerm());
     voteFor = logManager.getHardState().getVoteFor();
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 8cb88b9..d3b445f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -38,7 +38,11 @@ import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer;
+import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
 import org.apache.iotdb.cluster.log.sequencing.LogSequencer;
+import org.apache.iotdb.cluster.log.sequencing.LogSequencerFactory;
+import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -113,12 +117,17 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
 
 /**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so
+ * on.
  */
 @SuppressWarnings("java:S3077") // reference volatile is enough
 public abstract class RaftMember {
+
   private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
   public static final boolean USE_LOG_DISPATCHER = false;
+  protected static final LogSequencerFactory SEQUENCER_FACTORY =
+      ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing() ? new Factory()
+          : new SynchronousSequencer.Factory();
 
   private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
   private static final String MSG_FORWARD_ERROR =
@@ -138,22 +147,32 @@ public abstract class RaftMember {
    * on this may be woken.
    */
   private final Object waitLeaderCondition = new Object();
-  /** the lock is to make sure that only one thread can apply snapshot at the same time */
+  /**
+   * the lock is to make sure that only one thread can apply snapshot at the same time
+   */
   private final Object snapshotApplyLock = new Object();
 
   private final Object heartBeatWaitObject = new Object();
 
   protected Node thisNode = ClusterConstant.EMPTY_NODE;
 
-  /** the nodes that belong to the same raft group as thisNode. */
+  /**
+   * the nodes that belong to the same raft group as thisNode.
+   */
   protected PartitionGroup allNodes;
 
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-  /** the name of the member, to distinguish several members in the logs. */
+  /**
+   * the name of the member, to distinguish several members in the logs.
+   */
   String name;
-  /** to choose nodes to send request of joining cluster randomly. */
+  /**
+   * to choose nodes to send request of joining cluster randomly.
+   */
   Random random = new Random();
-  /** when the node is a leader, this map is used to track log progress of each follower. */
+  /**
+   * when the node is a leader, this map is used to track log progress of each follower.
+   */
   Map<Node, Peer> peerMap;
   /**
    * the current term of the node, this object also works as lock of some transactions of the member
@@ -174,7 +193,9 @@ public abstract class RaftMember {
    * offline.
    */
   volatile long lastHeartbeatReceivedTime;
-  /** the raft logs are all stored and maintained in the log manager */
+  /**
+   * the raft logs are all stored and maintained in the log manager
+   */
   RaftLogManager logManager;
   /**
    * the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
@@ -192,7 +213,9 @@ public abstract class RaftMember {
    * member by comparing it with the current last log index.
    */
   long lastReportedLogIndex;
-  /** the thread pool that runs catch-up tasks */
+  /**
+   * the thread pool that runs catch-up tasks
+   */
   private ExecutorService catchUpService;
   /**
    * lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -228,26 +251,33 @@ public abstract class RaftMember {
    * one slow node.
    */
   private ExecutorService serialToParallelPool;
-  /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
+  /**
+   * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+   */
   private ExecutorService commitLogPool;
 
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
-   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
+   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
+   * logs.
    */
   private LogDispatcher logDispatcher;
 
-  /** If this node can not be the leader, this parameter will be set true. */
+  /**
+   * If this node can not be the leader, this parameter will be set true.
+   */
   private volatile boolean skipElection = false;
 
   /**
-   * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+   * localExecutor is used to directly execute plans like load configuration in the underlying
+   * IoTDB
    */
   protected PlanExecutor localExecutor;
 
   protected LogSequencer logSequencer;
 
-  protected RaftMember() {}
+  protected RaftMember() {
+  }
 
   protected RaftMember(
       String name,
@@ -560,7 +590,9 @@ public abstract class RaftMember {
     return result;
   }
 
-  /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+  /**
+   * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
+   */
   public long appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntriesRequest", name);
 
@@ -714,16 +746,22 @@ public abstract class RaftMember {
     return lastCatchUpResponseTime;
   }
 
-  /** Sub-classes will add their own process of HeartBeatResponse in this method. */
-  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
+  /**
+   * Sub-classes will add their own process of HeartBeatResponse in this method.
+   */
+  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
+  }
 
-  /** The actions performed when the node wins in an election (becoming a leader). */
-  public void onElectionWins() {}
+  /**
+   * The actions performed when the node wins in an election (becoming a leader).
+   */
+  public void onElectionWins() {
+  }
 
   /**
    * Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
-   * follower. If some of the required logs are removed, also send the snapshot. <br>
-   * notice that if a part of data is in the snapshot, then it is not in the logs.
+   * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
+   * a part of data is in the snapshot, then it is not in the logs.
    */
   public void catchUp(Node follower, long lastLogIdx) {
     // for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -810,7 +848,9 @@ public abstract class RaftMember {
     }
   }
 
-  /** call back after syncLeader */
+  /**
+   * call back after syncLeader
+   */
   public interface CheckConsistency {
 
     /**
@@ -819,7 +859,7 @@ public abstract class RaftMember {
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
      * @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
-     *     implements.
+     *                                   implements.
      */
     void postCheckConsistency(long leaderCommitId, long localAppliedId)
         throws CheckConsistencyException;
@@ -828,8 +868,7 @@ public abstract class RaftMember {
   public static class MidCheckConsistency implements CheckConsistency {
 
     /**
-     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
-     * CHECK_MID_CONSISTENCY_EXCEPTION
+     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw CHECK_MID_CONSISTENCY_EXCEPTION
      *
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
@@ -841,7 +880,7 @@ public abstract class RaftMember {
       if (leaderCommitId == Long.MAX_VALUE
           || leaderCommitId == Long.MIN_VALUE
           || leaderCommitId - localAppliedId
-              > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+          > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
         throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
       }
     }
@@ -874,7 +913,7 @@ public abstract class RaftMember {
    * @param checkConsistency check after syncleader
    * @return true if the node has caught up, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *     value after timeout
+   *                                   value after timeout
    */
   public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
     if (character == NodeCharacter.LEADER) {
@@ -893,7 +932,9 @@ public abstract class RaftMember {
     return waitUntilCatchUp(checkConsistency);
   }
 
-  /** Wait until the leader of this node becomes known or time out. */
+  /**
+   * Wait until the leader of this node becomes known or time out.
+   */
   public void waitLeader() {
     long startTime = System.currentTimeMillis();
     while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -920,7 +961,7 @@ public abstract class RaftMember {
    *
    * @return true if this node has caught up before timeout, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *     value after timeout
+   *                                   value after timeout
    */
   protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
       throws CheckConsistencyException {
@@ -953,7 +994,7 @@ public abstract class RaftMember {
    * sync local applyId to leader commitId
    *
    * @param leaderCommitId leader commit id
-   * @param fastFail if enable, when log differ too much, return false directly.
+   * @param fastFail       if enable, when log differ too much, return false directly.
    * @return true if leaderCommitId <= localAppliedId
    */
   public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
@@ -1007,7 +1048,7 @@ public abstract class RaftMember {
    * call this method. Will commit the log locally and send it to followers
    *
    * @return OK if over half of the followers accept the log or null if the leadership is lost
-   *     during the appending
+   * during the appending
    */
   TSStatus processPlanLocally(PhysicalPlan plan) {
     if (USE_LOG_DISPATCHER) {
@@ -1223,7 +1264,9 @@ public abstract class RaftMember {
     return peerMap;
   }
 
-  /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
+  /**
+   * @return true if there is a log whose index is "index" and term is "term", false otherwise
+   */
   public boolean matchLog(long index, long term) {
     boolean matched = logManager.matchTerm(term, index);
     logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1242,15 +1285,18 @@ public abstract class RaftMember {
     return syncLock;
   }
 
-  /** Sub-classes will add their own process of HeartBeatRequest in this method. */
-  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
+  /**
+   * Sub-classes will add their own process of HeartBeatRequest in this method.
+   */
+  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
+  }
 
   /**
    * Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
    * request is valid.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkElectorLogProgress(ElectionRequest electionRequest) {
 
@@ -1294,7 +1340,7 @@ public abstract class RaftMember {
    * lastLogIndex is smaller than the voter's Otherwise accept the election.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkLogProgress(long lastLogIndex, long lastLogTerm) {
     long response;
@@ -1311,10 +1357,10 @@ public abstract class RaftMember {
   /**
    * Forward a non-query plan to a node using the default client.
    *
-   * @param plan a non-query plan
-   * @param node cannot be the local node
+   * @param plan   a non-query plan
+   * @param node   cannot be the local node
    * @param header must be set for data group communication, set to null for meta group
-   *     communication
+   *               communication
    * @return a TSStatus indicating if the forwarding is successful.
    */
   public TSStatus forwardPlan(PhysicalPlan plan, Node node, RaftNode header) {
@@ -1345,7 +1391,7 @@ public abstract class RaftMember {
   /**
    * Forward a non-query plan to "receiver" using "client".
    *
-   * @param plan a non-query plan
+   * @param plan   a non-query plan
    * @param header to determine which DataGroupMember of "receiver" will process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
@@ -1427,7 +1473,7 @@ public abstract class RaftMember {
    * Get an asynchronous thrift client of the given node.
    *
    * @return an asynchronous thrift client or null if the caller tries to connect the local node or
-   *     the node cannot be reached.
+   * the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
     return getAsyncClient(node, asyncClientPool, true);
@@ -1614,7 +1660,7 @@ public abstract class RaftMember {
    * heartbeat timer.
    *
    * @param fromLeader true if the request is from a leader, false if the request is from an
-   *     elector.
+   *                   elector.
    */
   public void stepDown(long newTerm, boolean fromLeader) {
     synchronized (term) {
@@ -1646,7 +1692,9 @@ public abstract class RaftMember {
     this.thisNode = thisNode;
   }
 
-  /** @return the header of the data raft group or null if this is in a meta group. */
+  /**
+   * @return the header of the data raft group or null if this is in a meta group.
+   */
   public RaftNode getHeader() {
     return null;
   }
@@ -1795,7 +1843,9 @@ public abstract class RaftMember {
     return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
   }
 
-  /** Send "log" to "node". */
+  /**
+   * Send "log" to "node".
+   */
   public void sendLogToFollower(
       Log log,
       AtomicInteger voteCounter,
@@ -1916,7 +1966,7 @@ public abstract class RaftMember {
    * and append "log" to it. Otherwise report a log mismatch.
    *
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
     long resp = checkPrevLogIndex(prevLogIndex);
@@ -1940,7 +1990,9 @@ public abstract class RaftMember {
     return resp;
   }
 
-  /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+  /**
+   * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+   */
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
@@ -1986,7 +2038,7 @@ public abstract class RaftMember {
    *
    * @param logs append logs
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   private long appendEntries(
       long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {