You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/10 02:53:22 UTC
[iotdb] branch expr_vgraft updated: refactor commitTo
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_vgraft by this push:
new ffba16a54b refactor commitTo
ffba16a54b is described below
commit ffba16a54bae1e69e68fed72c2414feb8278d821
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Oct 10 10:53:16 2022 +0800
refactor commitTo
---
.../org/apache/iotdb/cluster/log/VotingLog.java | 14 +-
.../apache/iotdb/cluster/log/VotingLogList.java | 21 +--
.../iotdb/cluster/log/manage/RaftLogManager.java | 141 ++++++++++++---------
.../iotdb/cluster/server/member/RaftMember.java | 7 +-
.../apache/iotdb/cluster/server/monitor/Timer.java | 11 +-
.../org/apache/iotdb/cluster/utils/LogUtils.java | 5 +
6 files changed, 107 insertions(+), 92 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index 848a61c967..7e9eee9e79 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -22,27 +22,31 @@ package org.apache.iotdb.cluster.log;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
public class VotingLog {
protected Log log;
+ // for NB-Raft
protected Set<Integer> weaklyAcceptedNodeIds;
protected Set<Integer> failedNodeIds;
+ // for VGRaft
protected Set<byte[]> signatures;
- public AtomicLong acceptedTime;
private boolean hasFailed;
public VotingLog(Log log, int groupSize) {
this.log = log;
- weaklyAcceptedNodeIds = new HashSet<>(groupSize);
- acceptedTime = new AtomicLong();
failedNodeIds = new HashSet<>(groupSize);
- signatures = new HashSet<>(groupSize);
+ if (ClusterDescriptor.getInstance().getConfig().isUseFollowerSlidingWindow()) {
+ weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+ }
+ if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft()) {
+ signatures = new HashSet<>(groupSize);
+ }
}
public VotingLog(VotingLog another) {
this.log = another.log;
this.weaklyAcceptedNodeIds = another.weaklyAcceptedNodeIds;
- this.acceptedTime = another.acceptedTime;
this.failedNodeIds = another.failedNodeIds;
this.signatures = another.signatures;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 398a80eda9..d1b410f484 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -23,14 +23,12 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -39,8 +37,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER;
-
public class VotingLogList {
private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
@@ -74,23 +70,14 @@ public class VotingLogList {
private boolean tryCommit() {
RaftLogManager logManager = member.getLogManager();
- List<Log> entries = Collections.emptyList();
if (computeNewCommitIndex()
&& logManager != null
&& newCommitIndex.get() > logManager.getCommitLogIndex()) {
- long start = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
- synchronized (logManager) {
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
- start);
- long operationStartTime = RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
- try {
- logManager.commitTo(newCommitIndex.get());
- } catch (LogExecutionException e) {
- logger.error("Fail to commit {}", newCommitIndex, e);
- }
- RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(operationStartTime);
+ try {
+ logManager.commitTo(newCommitIndex.get());
+ } catch (LogExecutionException e) {
+ logger.error("Fail to commit {}", newCommitIndex, e);
}
-
return true;
} else {
return false;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index eec035dc46..46f4251ea5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.cluster.log.manage;
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_COMMIT_LOG_IN_MANAGER;
+
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.EntryCompactedException;
import org.apache.iotdb.cluster.exception.EntryUnavailableException;
@@ -37,6 +39,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -589,44 +592,16 @@ public abstract class RaftLogManager {
return entries;
}
- /**
- * Used by MaybeCommit or MaybeAppend or follower to commit newly committed entries.
- *
- * @param newCommitIndex request commitIndex
- */
- public void commitTo(long newCommitIndex) throws LogExecutionException {
- if (commitIndex >= newCommitIndex) {
- return;
- }
- long startTime = Statistic.RAFT_SENDER_COMMIT_GET_LOGS.getOperationStartTime();
- long lo = getUnCommittedEntryManager().getFirstUnCommittedIndex();
- long hi = newCommitIndex + 1;
- List<Log> entries = new ArrayList<>(getUnCommittedEntryManager().getEntries(lo, hi));
- Statistic.RAFT_SENDER_COMMIT_GET_LOGS.calOperationCostTimeFromStart(startTime);
-
- if (entries.isEmpty()) {
- return;
- }
-
- long commitLogIndex = getCommitLogIndex();
- long firstLogIndex = entries.get(0).getCurrLogIndex();
- if (commitLogIndex >= firstLogIndex) {
- logger.warn(
- "Committing logs that has already been committed: {} >= {}",
- commitLogIndex,
- firstLogIndex);
- entries
- .subList(0, (int) (getCommitLogIndex() - entries.get(0).getCurrLogIndex() + 1))
- .clear();
- }
-
+ private void checkCompaction(List<Log> entries) {
boolean needToCompactLog = false;
+ // calculate the number of old committed entries to be reserved by entry number
int numToReserveForNew = minNumOfLogsInMem;
if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
needToCompactLog = true;
numToReserveForNew = maxNumOfLogsInMem - entries.size();
}
+ // calculate the number of old committed entries to be reserved by entry size
long newEntryMemSize = 0;
for (Log entry : entries) {
if (entry.getByteSize() == 0) {
@@ -634,7 +609,7 @@ public abstract class RaftLogManager {
"{} should not go here, must be send to the follower, "
+ "so the log has been serialized exclude single node mode",
entry);
- // entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
+ entry.setByteSize((int) RamUsageEstimator.sizeOf(entry));
}
newEntryMemSize += entry.getByteSize();
}
@@ -645,17 +620,33 @@ public abstract class RaftLogManager {
committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
}
+ // reserve old committed entries with the minimum number
if (needToCompactLog) {
int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
int sizeToReserveForConfig = minNumOfLogsInMem;
- startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
- synchronized (this) {
- innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
- }
+ long startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
+ innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
}
+ }
+
+ private void removedCommitted(List<Log> entries) {
+ long commitLogIndex = getCommitLogIndex();
+ long firstLogIndex = entries.get(0).getCurrLogIndex();
+ if (commitLogIndex >= firstLogIndex) {
+ logger.warn(
+ "Committing logs that has already been committed: {} >= {}",
+ commitLogIndex,
+ firstLogIndex);
+ entries
+ .subList(0, (int) (getCommitLogIndex() - entries.get(0).getCurrLogIndex() + 1))
+ .clear();
+ }
+ }
+
+ private void commitEntries(List<Log> entries) throws LogExecutionException {
+ long startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
- startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
for (Log entry : entries) {
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.calOperationCostTimeFromStart(
entry.getCreateTime());
@@ -685,31 +676,43 @@ public abstract class RaftLogManager {
logger.error("{}: persistent raft log error:", name, e);
throw new LogExecutionException(e);
} finally {
- Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(startTime);
+ Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.calOperationCostTimeFromStart(
+ startTime);
}
+ }
- startTime = Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
- applyEntries(entries);
- Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
+ /**
+ * Used by MaybeCommit or MaybeAppend or follower to commit newly committed entries.
+ *
+ * @param newCommitIndex request commitIndex
+ */
+ public void commitTo(long newCommitIndex) throws LogExecutionException {
+ long start = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+ if (commitIndex >= newCommitIndex) {
+ return;
+ }
- long unappliedLogSize = commitLogIndex - maxHaveAppliedCommitIndex;
- if (unappliedLogSize > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
- logger.info(
- "There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
- unappliedLogSize);
- try {
- synchronized (changeApplyCommitIndexCond) {
- changeApplyCommitIndexCond.wait(
- Math.min(
- (unappliedLogSize
- - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem())
- / 10
- + 1,
- 1000));
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ synchronized (this) {
+ Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+ start);
+ long operationStartTime = RAFT_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+
+ long startTime = Statistic.RAFT_SENDER_COMMIT_GET_LOGS.getOperationStartTime();
+ long lo = getUnCommittedEntryManager().getFirstUnCommittedIndex();
+ long hi = newCommitIndex + 1;
+ List<Log> entries = new ArrayList<>(getUnCommittedEntryManager().getEntries(lo, hi));
+ Statistic.RAFT_SENDER_COMMIT_GET_LOGS.calOperationCostTimeFromStart(startTime);
+
+ if (entries.isEmpty()) {
+ return;
}
+
+ removedCommitted(entries);
+ checkCompaction(entries);
+ commitEntries(entries);
+ applyEntries(entries);
+
+ RAFT_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(operationStartTime);
}
}
@@ -736,9 +739,31 @@ public abstract class RaftLogManager {
* @param entries applying entries
*/
void applyEntries(List<Log> entries) {
+ long startTime = Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.getOperationStartTime();
for (Log entry : entries) {
applyEntry(entry);
}
+ Statistic.RAFT_SENDER_COMMIT_APPLY_LOGS.calOperationCostTimeFromStart(startTime);
+
+ long unappliedLogSize = getCommitLogIndex() - maxHaveAppliedCommitIndex;
+ if (unappliedLogSize > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
+ logger.info(
+ "There are too many unapplied logs [{}], wait for a while to avoid memory overflow",
+ unappliedLogSize);
+ try {
+ synchronized (changeApplyCommitIndexCond) {
+ changeApplyCommitIndexCond.wait(
+ Math.min(
+ (unappliedLogSize
+ - ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem())
+ / 10
+ + 1,
+ 1000));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
public void applyEntry(Log entry) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 267c0ea899..0713ea7a56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1747,14 +1747,12 @@ public abstract class RaftMember implements RaftMemberMBean {
logger.info(
"Still not receive enough votes for {}, weakly "
+ "accepted {}, wait {}ms, wait to sequence {}ms, wait to enqueue "
- + "{}ms, wait to accept "
+ "{}ms",
log,
log.getWeaklyAcceptedNodeIds(),
alreadyWait,
(log.getLog().getSequenceStartTime() - waitStart) / 1000000,
- (log.getLog().getEnqueueTime() - waitStart) / 1000000,
- (log.acceptedTime.get() - waitStart) / 1000000);
+ (log.getLog().getEnqueueTime() - waitStart) / 1000000);
nextTimeToPrint *= 2;
}
totalAccepted = votingLogList.totalAcceptedNodeNum(log);
@@ -1801,9 +1799,6 @@ public abstract class RaftMember implements RaftMemberMBean {
weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
stronglyAccepted = totalAccepted - weaklyAccepted;
- if (log.acceptedTime.get() != 0) {
- Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
- }
Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.calOperationCostTimeFromStart(startTime);
// a node has a larger term than the local node, so this node is no longer a valid leader
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index f09bfd5956..07b35b43ba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -192,7 +192,7 @@ public class Timer {
TIME_SCALE,
true,
RAFT_SENDER_COMMIT_LOG),
- RAFT_SENDER_COMMIT_LOG_IN_MANAGER(
+ RAFT_COMMIT_LOG_IN_MANAGER(
RAFT_MEMBER_SENDER,
"commit log in log manager",
TIME_SCALE,
@@ -209,25 +209,25 @@ public class Timer {
"get logs to be committed",
TIME_SCALE,
RaftMember.USE_LOG_DISPATCHER,
- RAFT_SENDER_COMMIT_LOG_IN_MANAGER),
+ RAFT_COMMIT_LOG_IN_MANAGER),
RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS(
RAFT_MEMBER_SENDER,
"delete logs exceeding capacity",
TIME_SCALE,
RaftMember.USE_LOG_DISPATCHER,
- RAFT_SENDER_COMMIT_LOG_IN_MANAGER),
+ RAFT_COMMIT_LOG_IN_MANAGER),
RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS(
RAFT_MEMBER_SENDER,
"append and stable committed logs",
TIME_SCALE,
RaftMember.USE_LOG_DISPATCHER,
- RAFT_SENDER_COMMIT_LOG_IN_MANAGER),
+ RAFT_COMMIT_LOG_IN_MANAGER),
RAFT_SENDER_COMMIT_APPLY_LOGS(
RAFT_MEMBER_SENDER,
"apply after committing logs",
TIME_SCALE,
RaftMember.USE_LOG_DISPATCHER,
- RAFT_SENDER_COMMIT_LOG_IN_MANAGER),
+ RAFT_COMMIT_LOG_IN_MANAGER),
RAFT_SENDER_COMMIT_TO_CONSUMER_LOGS(
RAFT_MEMBER_SENDER,
"provide log to consumer",
@@ -363,7 +363,6 @@ public class Timer {
RAFT_SENT_ENTRY_SIZE(RAFT_MEMBER_SENDER, "sent entry size", 1, true, ROOT),
RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
- RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/LogUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/LogUtils.java
new file mode 100644
index 0000000000..8bde2e2672
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/LogUtils.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.cluster.utils;
+
+public class LogUtils {
+
+}