You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:04 UTC
[iotdb] 04/09: add sequencer factory
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d2f26457e52132f04f28ac0bb22360d5bb99c06c
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 15:15:07 2021 +0800
add sequencer factory
---
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 ++
.../log/sequencing/AsynchronousSequencer.java | 8 ++
.../log/sequencing/LogSequencerFactory.java | 29 +++++
.../log/sequencing/SynchronousSequencer.java | 8 ++
.../cluster/server/member/DataGroupMember.java | 2 +-
.../cluster/server/member/MetaGroupMember.java | 2 +-
.../iotdb/cluster/server/member/RaftMember.java | 144 ++++++++++++++-------
7 files changed, 155 insertions(+), 48 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index afa202d..f1d424a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -181,6 +181,8 @@ public class ClusterConfig {
private boolean openServerRpcPort = false;
+ private boolean useAsyncSequencing = false;
+
/**
* create a clusterConfig class. The internalIP will be set according to the server's hostname. If
* there is something error for getting the ip of the hostname, then set the internalIp as
@@ -539,4 +541,12 @@ public class ClusterConfig {
public void setClusterInfoRpcPort(int clusterInfoRpcPort) {
this.clusterInfoRpcPort = clusterInfoRpcPort;
}
+
+ public boolean isUseAsyncSequencing() {
+ return useAsyncSequencing;
+ }
+
+ public void setUseAsyncSequencing(boolean useAsyncSequencing) {
+ this.useAsyncSequencing = useAsyncSequencing;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index eb33bea..63cc38b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -134,4 +134,12 @@ public class AsynchronousSequencer implements LogSequencer {
public void setLogManager(RaftLogManager logManager) {
this.logManager = logManager;
}
+
+ public static class Factory implements LogSequencerFactory {
+
+ @Override
+ public LogSequencer create(RaftMember member, RaftLogManager logManager) {
+ return new AsynchronousSequencer(member, logManager);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
new file mode 100644
index 0000000..627ef84
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/LogSequencerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.cluster.log.sequencing;
+
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+
+public interface LogSequencerFactory {
+
+ LogSequencer create(RaftMember member, RaftLogManager logManager);
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 4327065..4da61b5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -92,4 +92,12 @@ public class SynchronousSequencer implements LogSequencer {
return new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm, appendEntryRequest);
}
+
+ public static class Factory implements LogSequencerFactory {
+
+ @Override
+ public LogSequencer create(RaftMember member, RaftLogManager logManager) {
+ return new SynchronousSequencer(member, logManager);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 6b66216..4adc4d9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -211,7 +211,7 @@ public class DataGroupMember extends RaftMember {
logManager =
new FilePartitionedSnapshotLogManager(
dataLogApplier, metaGroupMember.getPartitionTable(), allNodes.get(0), thisNode, this);
- logSequencer = new SynchronousSequencer(this, logManager);
+ logSequencer = SEQUENCER_FACTORY.create(this, logManager);
initPeerMap();
term.set(logManager.getHardState().getCurrentTerm());
voteFor = logManager.getHardState().getVoteFor();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index ef3ca8b..cb6bba1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -266,7 +266,7 @@ public class MetaGroupMember extends RaftMember {
// committed logs are applied to the state machine (the IoTDB instance) through the applier
LogApplier metaLogApplier = new MetaLogApplier(this);
logManager = new MetaSingleSnapshotLogManager(metaLogApplier, this);
- logSequencer = new SynchronousSequencer(this, logManager);
+ logSequencer = SEQUENCER_FACTORY.create(this, logManager);
term.set(logManager.getHardState().getCurrentTerm());
voteFor = logManager.getHardState().getVoteFor();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 8cb88b9..d3b445f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -38,7 +38,11 @@ import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer;
+import org.apache.iotdb.cluster.log.sequencing.AsynchronousSequencer.Factory;
import org.apache.iotdb.cluster.log.sequencing.LogSequencer;
+import org.apache.iotdb.cluster.log.sequencing.LogSequencerFactory;
+import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -113,12 +117,17 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
/**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so
+ * on.
*/
@SuppressWarnings("java:S3077") // reference volatile is enough
public abstract class RaftMember {
+
private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
public static final boolean USE_LOG_DISPATCHER = false;
+ protected static final LogSequencerFactory SEQUENCER_FACTORY =
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing() ? new Factory()
+ : new SynchronousSequencer.Factory();
private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
private static final String MSG_FORWARD_ERROR =
@@ -138,22 +147,32 @@ public abstract class RaftMember {
* on this may be woken.
*/
private final Object waitLeaderCondition = new Object();
- /** the lock is to make sure that only one thread can apply snapshot at the same time */
+ /**
+ * the lock is to make sure that only one thread can apply snapshot at the same time
+ */
private final Object snapshotApplyLock = new Object();
private final Object heartBeatWaitObject = new Object();
protected Node thisNode = ClusterConstant.EMPTY_NODE;
- /** the nodes that belong to the same raft group as thisNode. */
+ /**
+ * the nodes that belong to the same raft group as thisNode.
+ */
protected PartitionGroup allNodes;
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- /** the name of the member, to distinguish several members in the logs. */
+ /**
+ * the name of the member, to distinguish several members in the logs.
+ */
String name;
- /** to choose nodes to send request of joining cluster randomly. */
+ /**
+ * to choose nodes to send request of joining cluster randomly.
+ */
Random random = new Random();
- /** when the node is a leader, this map is used to track log progress of each follower. */
+ /**
+ * when the node is a leader, this map is used to track log progress of each follower.
+ */
Map<Node, Peer> peerMap;
/**
* the current term of the node, this object also works as lock of some transactions of the member
@@ -174,7 +193,9 @@ public abstract class RaftMember {
* offline.
*/
volatile long lastHeartbeatReceivedTime;
- /** the raft logs are all stored and maintained in the log manager */
+ /**
+ * the raft logs are all stored and maintained in the log manager
+ */
RaftLogManager logManager;
/**
* the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
@@ -192,7 +213,9 @@ public abstract class RaftMember {
* member by comparing it with the current last log index.
*/
long lastReportedLogIndex;
- /** the thread pool that runs catch-up tasks */
+ /**
+ * the thread pool that runs catch-up tasks
+ */
private ExecutorService catchUpService;
/**
* lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -228,26 +251,33 @@ public abstract class RaftMember {
* one slow node.
*/
private ExecutorService serialToParallelPool;
- /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
+ /**
+ * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+ */
private ExecutorService commitLogPool;
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
- * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
+ * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
+ * logs.
*/
private LogDispatcher logDispatcher;
- /** If this node can not be the leader, this parameter will be set true. */
+ /**
+ * If this node can not be the leader, this parameter will be set true.
+ */
private volatile boolean skipElection = false;
/**
- * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+ * localExecutor is used to directly execute plans like load configuration in the underlying
+ * IoTDB
*/
protected PlanExecutor localExecutor;
protected LogSequencer logSequencer;
- protected RaftMember() {}
+ protected RaftMember() {
+ }
protected RaftMember(
String name,
@@ -560,7 +590,9 @@ public abstract class RaftMember {
return result;
}
- /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+ /**
+ * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
+ */
public long appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
@@ -714,16 +746,22 @@ public abstract class RaftMember {
return lastCatchUpResponseTime;
}
- /** Sub-classes will add their own process of HeartBeatResponse in this method. */
- public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
+ /**
+ * Sub-classes will add their own process of HeartBeatResponse in this method.
+ */
+ public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
+ }
- /** The actions performed when the node wins in an election (becoming a leader). */
- public void onElectionWins() {}
+ /**
+ * The actions performed when the node wins in an election (becoming a leader).
+ */
+ public void onElectionWins() {
+ }
/**
* Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
- * follower. If some of the required logs are removed, also send the snapshot. <br>
- * notice that if a part of data is in the snapshot, then it is not in the logs.
+ * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
+ * a part of data is in the snapshot, then it is not in the logs.
*/
public void catchUp(Node follower, long lastLogIdx) {
// for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -810,7 +848,9 @@ public abstract class RaftMember {
}
}
- /** call back after syncLeader */
+ /**
+ * call back after syncLeader
+ */
public interface CheckConsistency {
/**
@@ -819,7 +859,7 @@ public abstract class RaftMember {
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
* @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
- * implements.
+ * implements.
*/
void postCheckConsistency(long leaderCommitId, long localAppliedId)
throws CheckConsistencyException;
@@ -828,8 +868,7 @@ public abstract class RaftMember {
public static class MidCheckConsistency implements CheckConsistency {
/**
- * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
- * CHECK_MID_CONSISTENCY_EXCEPTION
+ * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw CHECK_MID_CONSISTENCY_EXCEPTION
*
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
@@ -841,7 +880,7 @@ public abstract class RaftMember {
if (leaderCommitId == Long.MAX_VALUE
|| leaderCommitId == Long.MIN_VALUE
|| leaderCommitId - localAppliedId
- > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+ > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
}
}
@@ -874,7 +913,7 @@ public abstract class RaftMember {
* @param checkConsistency check after syncleader
* @return true if the node has caught up, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
if (character == NodeCharacter.LEADER) {
@@ -893,7 +932,9 @@ public abstract class RaftMember {
return waitUntilCatchUp(checkConsistency);
}
- /** Wait until the leader of this node becomes known or time out. */
+ /**
+ * Wait until the leader of this node becomes known or time out.
+ */
public void waitLeader() {
long startTime = System.currentTimeMillis();
while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -920,7 +961,7 @@ public abstract class RaftMember {
*
* @return true if this node has caught up before timeout, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
@@ -953,7 +994,7 @@ public abstract class RaftMember {
* sync local applyId to leader commitId
*
* @param leaderCommitId leader commit id
- * @param fastFail if enable, when log differ too much, return false directly.
+ * @param fastFail if enable, when log differ too much, return false directly.
* @return true if leaderCommitId <= localAppliedId
*/
public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
@@ -1007,7 +1048,7 @@ public abstract class RaftMember {
* call this method. Will commit the log locally and send it to followers
*
* @return OK if over half of the followers accept the log or null if the leadership is lost
- * during the appending
+ * during the appending
*/
TSStatus processPlanLocally(PhysicalPlan plan) {
if (USE_LOG_DISPATCHER) {
@@ -1223,7 +1264,9 @@ public abstract class RaftMember {
return peerMap;
}
- /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
+ /**
+ * @return true if there is a log whose index is "index" and term is "term", false otherwise
+ */
public boolean matchLog(long index, long term) {
boolean matched = logManager.matchTerm(term, index);
logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1242,15 +1285,18 @@ public abstract class RaftMember {
return syncLock;
}
- /** Sub-classes will add their own process of HeartBeatRequest in this method. */
- void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
+ /**
+ * Sub-classes will add their own process of HeartBeatRequest in this method.
+ */
+ void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
+ }
/**
* Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
* request is valid.
*
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
- * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+ * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
long checkElectorLogProgress(ElectionRequest electionRequest) {
@@ -1294,7 +1340,7 @@ public abstract class RaftMember {
* lastLogIndex is smaller than the voter's Otherwise accept the election.
*
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
- * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+ * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
long checkLogProgress(long lastLogIndex, long lastLogTerm) {
long response;
@@ -1311,10 +1357,10 @@ public abstract class RaftMember {
/**
* Forward a non-query plan to a node using the default client.
*
- * @param plan a non-query plan
- * @param node cannot be the local node
+ * @param plan a non-query plan
+ * @param node cannot be the local node
* @param header must be set for data group communication, set to null for meta group
- * communication
+ * communication
* @return a TSStatus indicating if the forwarding is successful.
*/
public TSStatus forwardPlan(PhysicalPlan plan, Node node, RaftNode header) {
@@ -1345,7 +1391,7 @@ public abstract class RaftMember {
/**
* Forward a non-query plan to "receiver" using "client".
*
- * @param plan a non-query plan
+ * @param plan a non-query plan
* @param header to determine which DataGroupMember of "receiver" will process the request.
* @return a TSStatus indicating if the forwarding is successful.
*/
@@ -1427,7 +1473,7 @@ public abstract class RaftMember {
* Get an asynchronous thrift client of the given node.
*
* @return an asynchronous thrift client or null if the caller tries to connect the local node or
- * the node cannot be reached.
+ * the node cannot be reached.
*/
public AsyncClient getAsyncClient(Node node) {
return getAsyncClient(node, asyncClientPool, true);
@@ -1614,7 +1660,7 @@ public abstract class RaftMember {
* heartbeat timer.
*
* @param fromLeader true if the request is from a leader, false if the request is from an
- * elector.
+ * elector.
*/
public void stepDown(long newTerm, boolean fromLeader) {
synchronized (term) {
@@ -1646,7 +1692,9 @@ public abstract class RaftMember {
this.thisNode = thisNode;
}
- /** @return the header of the data raft group or null if this is in a meta group. */
+ /**
+ * @return the header of the data raft group or null if this is in a meta group.
+ */
public RaftNode getHeader() {
return null;
}
@@ -1795,7 +1843,9 @@ public abstract class RaftMember {
return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
}
- /** Send "log" to "node". */
+ /**
+ * Send "log" to "node".
+ */
public void sendLogToFollower(
Log log,
AtomicInteger voteCounter,
@@ -1916,7 +1966,7 @@ public abstract class RaftMember {
* and append "log" to it. Otherwise report a log mismatch.
*
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
long resp = checkPrevLogIndex(prevLogIndex);
@@ -1940,7 +1990,9 @@ public abstract class RaftMember {
return resp;
}
- /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+ /**
+ * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+ */
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
@@ -1986,7 +2038,7 @@ public abstract class RaftMember {
*
* @param logs append logs
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
private long appendEntries(
long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {