You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 01:16:05 UTC
[iotdb] 05/09: Merge branch 'extract_sequencer' into expr
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d1dfe4b55054662253e5bf0e67032c053d7d4301
Merge: 918a705 d2f2645
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 15:19:35 2021 +0800
Merge branch 'extract_sequencer' into expr
# Conflicts:
# cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
# cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
.../apache/iotdb/cluster/config/ClusterConfig.java | 11 ++
.../log/sequencing/AsynchronousSequencer.java | 145 +++++++++++++++++
.../iotdb/cluster/log/sequencing/LogSequencer.java | 42 +++++
.../log/sequencing/LogSequencerFactory.java | 29 ++++
.../log/sequencing/SynchronousSequencer.java | 103 ++++++++++++
.../cluster/server/member/DataGroupMember.java | 2 +
.../cluster/server/member/MetaGroupMember.java | 2 +
.../iotdb/cluster/server/member/RaftMember.java | 180 ++++++++++++---------
8 files changed, 439 insertions(+), 75 deletions(-)
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 0d7f621,f1d424a..c9cbc59
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@@ -181,8 -181,8 +181,11 @@@ public class ClusterConfig
private boolean openServerRpcPort = false;
+ private boolean useIndirectBroadcasting = false;
+
+ private boolean useAsyncSequencing = false;
+
++
/**
* create a clusterConfig class. The internalIP will be set according to the server's hostname. If
* there is something error for getting the ip of the hostname, then set the internalIp as
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index e30f867,cb6bba1..81abb87
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@@ -41,8 -41,11 +41,9 @@@ import org.apache.iotdb.cluster.excepti
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
-import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+ import org.apache.iotdb.cluster.log.sequencing.SynchronousSequencer;
import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
import org.apache.iotdb.cluster.partition.NodeAdditionResult;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 41b5e50,d3b445f..2835cf3
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@@ -125,11 -124,11 +131,15 @@@ import static org.apache.iotdb.cluster.
public abstract class RaftMember {
private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
- public static final boolean USE_LOG_DISPATCHER = false;
+ public static boolean USE_LOG_DISPATCHER = false;
+ public static boolean USE_INDIRECT_LOG_DISPATCHER = false;
+ public static boolean ENABLE_WEAK_ACCEPTANCE = true;
+ public static boolean ENABLE_COMMIT_RETURN = false;
+
+ protected static final LogSequencerFactory SEQUENCER_FACTORY =
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing() ? new Factory()
+ : new SynchronousSequencer.Factory();
+
private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
private static final String MSG_FORWARD_ERROR =
"{}: encountered an error when forwarding {} to" + " {}";
@@@ -184,9 -193,11 +204,11 @@@
* offline.
*/
volatile long lastHeartbeatReceivedTime;
- /**
- * the raft logs are all stored and maintained in the log manager
- */
- RaftLogManager logManager;
- /**
++
+ /** the raft logs are all stored and maintained in the log manager */
+ protected RaftLogManager logManager;
- /**
++
++ /**s
* the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
* when this node is a leader, or start elections when this node is an elector.
*/
@@@ -255,14 -274,11 +285,16 @@@
*/
protected PlanExecutor localExecutor;
- protected LogSequencer logSequencer;
+ /** (logIndex, logTerm) -> append handler */
+ protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers =
+ new ConcurrentHashMap<>();
- protected RaftMember() {
- }
+ protected VotingLogList votingLogList;
+
+ protected RaftMember() {}
+
++ protected LogSequencer logSequencer;
+
protected RaftMember(
String name,
AsyncClientPool asyncPool,
@@@ -1100,9 -1048,9 +1144,9 @@@
* call this method. Will commit the log locally and send it to followers
*
* @return OK if over half of the followers accept the log or null if the leadership is lost
- * during the appending
+ * during the appending
*/
- TSStatus processPlanLocally(PhysicalPlan plan) {
+ public TSStatus processPlanLocally(PhysicalPlan plan) {
if (USE_LOG_DISPATCHER) {
return processPlanLocallyV2(plan);
}
@@@ -1173,8 -1119,10 +1215,9 @@@
} else {
log = new PhysicalPlanLog();
((PhysicalPlanLog) log).setPlan(plan);
- plan.setIndex(logManager.getLastLogIndex() + 1);
}
+ // just like processPlanLocally,we need to check the size of log
if (log.serialize().capacity() + Integer.BYTES
>= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
logger.error(
@@@ -1214,17 -1137,13 +1232,18 @@@
try {
AppendLogResult appendLogResult =
waitAppendResult(
- sendLogRequest.getVoteCounter(),
+ sendLogRequest.getVotingLog(),
sendLogRequest.getLeaderShipStale(),
- sendLogRequest.getNewLeaderTerm());
+ sendLogRequest.getNewLeaderTerm(),
+ sendLogRequest.getQuorumSize());
Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
- sendLogRequest.getLog().getCreateTime());
+ sendLogRequest.getVotingLog().getLog().getCreateTime());
+ long startTime;
switch (appendLogResult) {
+ case WEAK_ACCEPT:
+ // TODO: change to weak
+ Statistic.RAFT_WEAK_ACCEPT.add(1);
+ return StatusUtils.OK;
case OK:
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
@@@ -1599,13 -1518,9 +1623,13 @@@
return term;
}
- protected synchronized LogDispatcher getLogDispatcher() {
+ public synchronized LogDispatcher getLogDispatcher() {
if (logDispatcher == null) {
- logDispatcher = new LogDispatcher(this);
+ if (USE_INDIRECT_LOG_DISPATCHER) {
+ logDispatcher = new IndirectLogDispatcher(this);
+ } else {
+ logDispatcher = new LogDispatcher(this);
+ }
}
return logDispatcher;
}
@@@ -1965,23 -1840,15 +1991,25 @@@
return AppendLogResult.TIME_OUT;
}
- return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
+ return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
+ }
+
+ public void sendLogToFollower(
+ VotingLog log,
+ Node node,
+ AtomicBoolean leaderShipStale,
+ AtomicLong newLeaderTerm,
+ AppendEntryRequest request,
+ int quorumSize) {
+ sendLogToFollower(
+ log, node, leaderShipStale, newLeaderTerm, request, quorumSize, Collections.emptyList());
}
- /** Send "log" to "node". */
+ /**
+ * Send "log" to "node".
+ */
public void sendLogToFollower(
- Log log,
- AtomicInteger voteCounter,
+ VotingLog log,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
@@@ -2112,13 -1966,12 +2140,13 @@@
* and append "log" to it. Otherwise report a log mismatch.
*
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+ protected AppendEntryResult appendEntry(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
long resp = checkPrevLogIndex(prevLogIndex);
if (resp != Response.RESPONSE_AGREE) {
- return resp;
+ return new AppendEntryResult(resp).setHeader(getHeader());
}
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
@@@ -2134,15 -1982,17 +2162,17 @@@
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
if (success != -1) {
logger.debug("{} append a new log {}", name, log);
- resp = Response.RESPONSE_AGREE;
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
} else {
// the incoming log points to an illegal position, reject it
- resp = Response.RESPONSE_LOG_MISMATCH;
+ result.status = Response.RESPONSE_LOG_MISMATCH;
}
- return resp;
+ return result;
}
- /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+ /**
+ * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+ */
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
@@@ -2188,9 -2038,9 +2218,9 @@@
*
* @param logs append logs
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- private long appendEntries(
+ protected AppendEntryResult appendEntries(
long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
logger.debug(
"{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",