You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/09 04:07:09 UTC
[iotdb] branch expr_vgraft updated: add more timers and timer config
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_vgraft by this push:
new 3329414c79 add more timers and timer config
3329414c79 is described below
commit 3329414c799d5cf51bfea60d8eb443a84513bb84
Author: Tian Jiang <jt...@163.com>
AuthorDate: Sun Oct 9 12:07:00 2022 +0800
add more timers and timer config
---
cluster/distribute-dc.sh | 4 +-
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 +++
.../iotdb/cluster/config/ClusterDescriptor.java | 5 ++
.../iotdb/cluster/log/FragmentedLogDispatcher.java | 6 ++
.../iotdb/cluster/log/IndirectLogDispatcher.java | 7 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 71 ++++-----------
.../apache/iotdb/cluster/log/VotingLogList.java | 100 ++++++++++++++-------
.../cluster/log/applier/AsyncDataLogApplier.java | 8 +-
.../iotdb/cluster/log/manage/RaftLogManager.java | 9 +-
.../log/sequencing/AsynchronousSequencer.java | 8 +-
.../log/sequencing/SynchronousSequencer.java | 18 ++--
.../handlers/caller/AppendNodeEntryHandler.java | 46 +++++-----
.../iotdb/cluster/server/member/RaftMember.java | 94 +++++++------------
.../apache/iotdb/cluster/server/monitor/Timer.java | 39 ++++++--
.../apache/iotdb/cluster/utils/ClusterUtils.java | 2 +-
15 files changed, 215 insertions(+), 212 deletions(-)
diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
index 75757ebcb2..599b658324 100644
--- a/cluster/distribute-dc.sh
+++ b/cluster/distribute-dc.sh
@@ -1,7 +1,7 @@
src_lib_path=/d/CodeRepo/iotdb/cluster/target/iotdb-cluster-0.14.0-SNAPSHOT/lib/iotdb*
-ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
-#ips=(dc11 dc12)
+ips=(dc15 dc16 dc17 dc18)
+#ips=(dc11 dc12 dc13 dc14 dc11 dc12)
target_lib_path=/home/jt/iotdb_expr_vg/lib
for ip in ${ips[*]}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index a9724b4280..3232ecd5b4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -221,6 +221,8 @@ public class ClusterConfig {
private boolean useCRaft = false;
+ private boolean enableInstrumenting = true;
+
/**
* create a clusterConfig class. The internalIP will be set according to the server's hostname. If
* there is something error for getting the ip of the hostname, then set the internalIp as
@@ -692,4 +694,12 @@ public class ClusterConfig {
public void setUseCRaft(boolean useCRaft) {
this.useCRaft = useCRaft;
}
+
+ public boolean isEnableInstrumenting() {
+ return enableInstrumenting;
+ }
+
+ public void setEnableInstrumenting(boolean enableInstrumenting) {
+ this.enableInstrumenting = enableInstrumenting;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 562cff9c07..42a5b80233 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -392,6 +392,11 @@ public class ClusterDescriptor {
Boolean.parseBoolean(
properties.getProperty("use_c_raft", String.valueOf(config.isUseCRaft()))));
+ config.setEnableInstrumenting(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_instrumenting", String.valueOf(config.isEnableInstrumenting()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index 49dcc642d5..fc31f792c6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -40,6 +40,10 @@ public class FragmentedLogDispatcher extends LogDispatcher {
}
public void offer(SendLogRequest request) {
+ if (!(request.getVotingLog().getLog() instanceof FragmentedLog)) {
+ super.offer(request);
+ return;
+ }
// do serialization here to avoid taking LogManager for too long
long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
@@ -89,6 +93,8 @@ public class FragmentedLogDispatcher extends LogDispatcher {
for (SendLogRequest request : currBatch) {
Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
request.getVotingLog().getLog().getEnqueueTime());
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE.calOperationCostTimeFromStart(
+ request.getVotingLog().getLog().getCreateTime());
long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 56b4cf2a50..c852575275 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -80,7 +80,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
BlockingQueue<SendLogRequest> logBlockingQueue;
logBlockingQueue =
new ArrayBlockingQueue<>(
- ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+ ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem(), true);
nodesLogQueuesList.add(new Pair<>(node, logBlockingQueue));
}
}
@@ -92,10 +92,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
pair.left,
n ->
IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-"
- + member.getName()
- + "-"
- + ClusterUtils.nodeToString(pair.left)))
+ "LogDispatcher-" + member.getName() + "-" + pair.left.nodeIdentifier))
.submit(newDispatcherThread(pair.left, pair.right));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a79bb8098a..c6cfbd2703 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -60,9 +60,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.LOG_DISPATCHER_LOG_ENQUEUE_SINGLE;
/**
* A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
@@ -111,10 +111,7 @@ public class LogDispatcher {
pair.left,
n ->
IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-"
- + member.getName()
- + "-"
- + ClusterUtils.nodeToString(pair.left)))
+ "LogDispatcher-" + member.getName() + "-" + pair.left.nodeIdentifier))
.submit(newDispatcherThread(pair.left, pair.right));
}
}
@@ -135,11 +132,13 @@ public class LogDispatcher {
}
protected boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue, SendLogRequest request) {
+ long operationStartTime = LOG_DISPATCHER_LOG_ENQUEUE_SINGLE.getOperationStartTime();
if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
long waitStart = System.currentTimeMillis();
long waitTime = 1;
while (System.currentTimeMillis() - waitStart < clusterConfig.getConnectionTimeoutInMS()) {
if (nodeLogQueue.add(request)) {
+ LOG_DISPATCHER_LOG_ENQUEUE_SINGLE.calOperationCostTimeFromStart(operationStartTime);
return true;
} else {
try {
@@ -150,9 +149,12 @@ public class LogDispatcher {
}
}
}
+ LOG_DISPATCHER_LOG_ENQUEUE_SINGLE.calOperationCostTimeFromStart(operationStartTime);
return false;
} else {
- return nodeLogQueue.add(request);
+ boolean added = nodeLogQueue.add(request);
+ LOG_DISPATCHER_LOG_ENQUEUE_SINGLE.calOperationCostTimeFromStart(operationStartTime);
+ return added;
}
}
@@ -209,31 +211,20 @@ public class LogDispatcher {
public static class SendLogRequest {
private VotingLog votingLog;
- private AtomicBoolean leaderShipStale;
- private AtomicLong newLeaderTerm;
private AppendEntryRequest appendEntryRequest;
private long enqueueTime;
private Future<ByteBuffer> serializedLogFuture;
private int quorumSize;
private boolean isVerifier;
- public SendLogRequest(
- VotingLog log,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- AppendEntryRequest appendEntryRequest,
- int quorumSize) {
+ public SendLogRequest(VotingLog log, AppendEntryRequest appendEntryRequest, int quorumSize) {
this.setVotingLog(log);
- this.setLeaderShipStale(leaderShipStale);
- this.setNewLeaderTerm(newLeaderTerm);
this.setAppendEntryRequest(appendEntryRequest);
this.setQuorumSize(quorumSize);
}
public SendLogRequest(SendLogRequest request) {
this.setVotingLog(request.votingLog);
- this.setLeaderShipStale(request.leaderShipStale);
- this.setNewLeaderTerm(request.newLeaderTerm);
this.setAppendEntryRequest(request.appendEntryRequest);
this.setQuorumSize(request.quorumSize);
this.setEnqueueTime(request.enqueueTime);
@@ -256,22 +247,6 @@ public class LogDispatcher {
this.enqueueTime = enqueueTime;
}
- public AtomicBoolean getLeaderShipStale() {
- return leaderShipStale;
- }
-
- public void setLeaderShipStale(AtomicBoolean leaderShipStale) {
- this.leaderShipStale = leaderShipStale;
- }
-
- public AtomicLong getNewLeaderTerm() {
- return newLeaderTerm;
- }
-
- void setNewLeaderTerm(AtomicLong newLeaderTerm) {
- this.newLeaderTerm = newLeaderTerm;
- }
-
public AppendEntryRequest getAppendEntryRequest() {
return appendEntryRequest;
}
@@ -330,7 +305,9 @@ public class LogDispatcher {
SendLogRequest poll = logBlockingDeque.take();
currBatch.add(poll);
if (maxBatchSize > 1 && useBatchInLogCatchUp) {
- logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
+ while (!logBlockingDeque.isEmpty() && currBatch.size() < maxBatchSize) {
+ currBatch.add(logBlockingDeque.take());
+ }
}
}
if (logger.isDebugEnabled()) {
@@ -472,8 +449,6 @@ public class LogDispatcher {
sendLogs(currBatch);
} else {
for (SendLogRequest batch : currBatch) {
- Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
- batch.getVotingLog().getLog().getCreateTime());
sendLog(batch);
}
}
@@ -485,11 +460,7 @@ public class LogDispatcher {
void sendLogSync(SendLogRequest logRequest) {
AppendNodeEntryHandler handler =
member.getAppendNodeEntryHandler(
- logRequest.getVotingLog(),
- receiver,
- logRequest.leaderShipStale,
- logRequest.newLeaderTerm,
- logRequest.quorumSize);
+ logRequest.getVotingLog(), receiver, logRequest.quorumSize);
// TODO add async interface
int retries = 5;
@@ -535,11 +506,7 @@ public class LogDispatcher {
private void sendLogAsync(SendLogRequest logRequest) {
AppendNodeEntryHandler handler =
member.getAppendNodeEntryHandler(
- logRequest.getVotingLog(),
- receiver,
- logRequest.leaderShipStale,
- logRequest.newLeaderTerm,
- logRequest.quorumSize);
+ logRequest.getVotingLog(), receiver, logRequest.quorumSize);
AsyncClient client = member.getAsyncClient(receiver);
if (client != null) {
@@ -552,6 +519,8 @@ public class LogDispatcher {
}
void sendLog(SendLogRequest logRequest) {
+ Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+ logRequest.getVotingLog().getLog().getCreateTime());
if (logger.isDebugEnabled()) {
Thread.currentThread()
.setName(baseName + "-" + logRequest.getVotingLog().getLog().getCurrLogIndex());
@@ -586,11 +555,7 @@ public class LogDispatcher {
for (SendLogRequest sendLogRequest : batch) {
AppendNodeEntryHandler handler =
member.getAppendNodeEntryHandler(
- sendLogRequest.getVotingLog(),
- receiver,
- sendLogRequest.getLeaderShipStale(),
- sendLogRequest.getNewLeaderTerm(),
- sendLogRequest.getQuorumSize());
+ sendLogRequest.getVotingLog(), receiver, sendLogRequest.getQuorumSize());
singleEntryHandlers.add(handler);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index fde20d18bf..398a80eda9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -20,22 +20,24 @@
package org.apache.iotdb.cluster.log;
import org.apache.iotdb.cluster.exception.LogExecutionException;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER;
@@ -47,40 +49,65 @@ public class VotingLogList {
private int quorumSize;
private RaftMember member;
private Map<Integer, Long> stronglyAcceptedIndices = new ConcurrentHashMap<>();
- private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+ private final ExecutorService service = Executors.newSingleThreadExecutor();
+ private AtomicLong newCommitIndex = new AtomicLong(-1);
public VotingLogList(int quorumSize, RaftMember member) {
this.quorumSize = quorumSize;
this.member = member;
- ScheduledExecutorUtil.safelyScheduleAtFixedRate(
- service,
+ service.submit(
() -> {
- long newCommitIndex = computeNewCommitIndex();
- if (newCommitIndex > member.getLogManager().getCommitLogIndex()) {
- synchronized (member.getLogManager()) {
- long operationStartTime = RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
- try {
- member.getLogManager().commitTo(newCommitIndex);
- } catch (LogExecutionException e) {
- logger.error("Fail to commit {}", newCommitIndex, e);
+ try {
+ while (true) {
+ if (!tryCommit()) {
+ synchronized (newCommitIndex) {
+ newCommitIndex.wait(1);
+ }
}
- RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(operationStartTime);
}
+ } catch (Exception e) {
+ logger.error("Unexpected exception when updating commit index", e);
}
- },
- 0,
- 1,
- TimeUnit.MILLISECONDS);
+ });
+ }
+
+ private boolean tryCommit() {
+ RaftLogManager logManager = member.getLogManager();
+
+ List<Log> entries = Collections.emptyList();
+ if (computeNewCommitIndex()
+ && logManager != null
+ && newCommitIndex.get() > logManager.getCommitLogIndex()) {
+ long start = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+ synchronized (logManager) {
+ Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+ start);
+ long operationStartTime = RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+ try {
+ logManager.commitTo(newCommitIndex.get());
+ } catch (LogExecutionException e) {
+ logger.error("Fail to commit {}", newCommitIndex, e);
+ }
+ RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(operationStartTime);
+ }
+
+ return true;
+ } else {
+ return false;
+ }
}
- private long computeNewCommitIndex() {
+ public boolean computeNewCommitIndex() {
List<Entry<Integer, Long>> nodeIndices = new ArrayList<>(stronglyAcceptedIndices.entrySet());
if (nodeIndices.size() < quorumSize) {
- return -1;
+ return false;
}
nodeIndices.sort(Entry.comparingByValue());
- return nodeIndices.get(quorumSize - 1).getValue();
+ Long value = nodeIndices.get(quorumSize - 1).getValue();
+ long oldValue = newCommitIndex.getAndUpdate(oldV -> Math.max(value, oldV));
+ return value > oldValue;
}
+
/**
* When an entry of index-term is strongly accepted by a node of acceptingNodeId, record the id in
* all entries whose index <= the accepted entry. If any entry is accepted by a quorum, remove it
@@ -88,22 +115,31 @@ public class VotingLogList {
*
* @param index
* @param term
- * @param acceptingNodeId
+ * @param acceptingNode
* @param signature
* @return the lastly removed entry if any.
*/
public void onStronglyAccept(long index, long term, Node acceptingNode, ByteBuffer signature) {
logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNode);
- stronglyAcceptedIndices.compute(
- acceptingNode.nodeIdentifier,
- (nid, idx) -> {
- if (idx == null) {
- return index;
- } else {
- return Math.max(index, idx);
- }
- });
+ Long newIndex =
+ stronglyAcceptedIndices.compute(
+ acceptingNode.nodeIdentifier,
+ (nid, oldIndex) -> {
+ if (oldIndex == null) {
+ return index;
+ } else {
+ if (index > oldIndex) {
+ return index;
+ }
+ return oldIndex;
+ }
+ });
+ if (newIndex == index) {
+ synchronized (newCommitIndex) {
+ newCommitIndex.notifyAll();
+ }
+ }
}
public int totalAcceptedNodeNum(VotingLog log) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index a717bf4543..97a621de04 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.log.applier;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
@@ -34,7 +35,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.slf4j.Logger;
@@ -168,8 +168,6 @@ public class AsyncDataLogApplier implements LogApplier {
} else if (plan instanceof CreateTimeSeriesPlan) {
PartialPath path = ((CreateTimeSeriesPlan) plan).getPath();
sgPath = IoTDB.schemaProcessor.getBelongedStorageGroup(path);
- } else if (plan instanceof DummyPlan) {
- sgPath = new PartialPath("dummy", false);
}
return sgPath;
}
@@ -217,7 +215,9 @@ public class AsyncDataLogApplier implements LogApplier {
private class DataLogConsumer implements Runnable, Consumer<Log> {
- private BlockingQueue<Log> logQueue = new ArrayBlockingQueue<>(4096);
+ private BlockingQueue<Log> logQueue =
+ new ArrayBlockingQueue<>(
+ ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
private volatile long lastLogIndex;
private volatile long lastAppliedLogIndex;
private String name;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 1a22b26a5d..eec035dc46 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -656,16 +656,15 @@ public abstract class RaftLogManager {
}
startTime = Statistic.RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS.getOperationStartTime();
+ for (Log entry : entries) {
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.calOperationCostTimeFromStart(
+ entry.getCreateTime());
+ }
try {
// Operations here are so simple that the execution could be thought
// success or fail together approximately.
// TODO: make it real atomic
getCommittedEntryManager().append(entries);
- for (Log entry : entries) {
- synchronized (entry) {
- entry.notifyAll();
- }
- }
Log lastLog = entries.get(entries.size() - 1);
getUnCommittedEntryManager().stableTo(lastLog.getCurrLogIndex());
commitIndex = lastLog.getCurrLogIndex();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index a6fda19576..d2acf0fde9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -42,8 +42,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_SENDER_SEQUENCE_LOG;
@@ -76,12 +74,8 @@ public class AsynchronousSequencer implements LogSequencer {
public SendLogRequest enqueueSendLogRequest(Log log) {
VotingLog votingLog = member.buildVotingLog(log);
- AtomicBoolean leaderShipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
- SendLogRequest request =
- new SendLogRequest(
- votingLog, leaderShipStale, newLeaderTerm, null, member.getAllNodes().size() / 2);
+ SendLogRequest request = new SendLogRequest(votingLog, null, member.getAllNodes().size() / 2);
try {
if (!unsequencedLogQueue.offer(
request,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index f44a10a03e..c1e35aa4a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -34,8 +34,6 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
/**
* SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
@@ -71,6 +69,8 @@ public class SynchronousSequencer implements LogSequencer {
while (true) {
synchronized (logManager) {
+ long occupyStart =
+ Statistic.RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND.getOperationStartTime();
if (!IoTDBDescriptor.getInstance().getConfig().isEnableMemControl()
|| (logManager.getLastLogIndex() - logManager.getCommitLogIndex()
<= ClusterDescriptor.getInstance()
@@ -104,6 +104,8 @@ public class SynchronousSequencer implements LogSequencer {
&& ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance())) {
sendLogRequest = enqueueEntry(sendLogRequest);
}
+ Statistic.RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND.calOperationCostTimeFromStart(
+ occupyStart);
break;
}
try {
@@ -116,6 +118,9 @@ public class SynchronousSequencer implements LogSequencer {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
+
+ Statistic.RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND.calOperationCostTimeFromStart(
+ occupyStart);
}
}
@@ -134,19 +139,12 @@ public class SynchronousSequencer implements LogSequencer {
private SendLogRequest buildSendLogRequest(Log log) {
VotingLog votingLog = member.buildVotingLog(log);
- AtomicBoolean leaderShipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(member.getTerm().get());
long startTime = Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime();
AppendEntryRequest appendEntryRequest = member.buildAppendEntryRequest(log, false);
Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
- return new SendLogRequest(
- votingLog,
- leaderShipStale,
- newLeaderTerm,
- appendEntryRequest,
- member.getAllNodes().size() / 2);
+ return new SendLogRequest(votingLog, appendEntryRequest, member.getAllNodes().size() / 2);
}
public static class Factory implements LogSequencerFactory {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index c9a97388d4..cc4684481e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -32,8 +32,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
@@ -52,9 +52,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
private static final Logger logger = LoggerFactory.getLogger(AppendNodeEntryHandler.class);
protected RaftMember member;
- protected AtomicLong receiverTerm;
protected VotingLog log;
- protected AtomicBoolean leaderShipStale;
protected Node directReceiver;
protected int quorumSize;
@@ -68,6 +66,9 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
}
}
+ // TODO-remove
+ private static Map<Long, Integer> entryAcceptedTimes = new ConcurrentHashMap<>();
+
@Override
public void onComplete(AppendEntryResult response) {
if (Timer.ENABLE_INSTRUMENTING) {
@@ -81,10 +82,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
logger.debug(
"{}: Append response {} from {} for log {}", member.getName(), response, trueReceiver, log);
- if (leaderShipStale.get()) {
- // someone has rejected this log because the leadership is stale
- return;
- }
long resp = response.status;
@@ -96,22 +93,33 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
log.getLog().getCurrLogTerm(),
trueReceiver,
response.signature);
+ Integer count =
+ entryAcceptedTimes.compute(
+ log.getLog().getCurrLogIndex(),
+ (index, cnt) -> {
+ if (cnt == null) {
+ cnt = 1;
+ } else {
+ cnt = cnt + 1;
+ }
+ return cnt;
+ });
+ if (count == quorumSize) {
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
+ log.getLog().getCreateTime());
+ }
+
member.getPeer(trueReceiver).setMatchIndex(response.lastLogIndex);
} else if (resp > 0) {
// a response > 0 is the follower's term
// the leadership is stale, wait for the new leader's heartbeat
- long prevReceiverTerm = receiverTerm.get();
logger.debug(
- "{}: Received a rejection from {} because term is stale: {}/{}, log: {}",
+ "{}: Received a rejection from {} because term is stale: {}, log: {}",
member.getName(),
trueReceiver,
- prevReceiverTerm,
resp,
log);
- if (resp > prevReceiverTerm) {
- receiverTerm.set(resp);
- }
- leaderShipStale.set(true);
+ member.stepDown(resp, false);
synchronized (log) {
log.notifyAll();
}
@@ -186,18 +194,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
this.member = member;
}
- public void setLeaderShipStale(AtomicBoolean leaderShipStale) {
- this.leaderShipStale = leaderShipStale;
- }
-
public void setDirectReceiver(Node follower) {
this.directReceiver = follower;
}
- public void setReceiverTerm(AtomicLong receiverTerm) {
- this.receiverTerm = receiverTerm;
- }
-
public void setQuorumSize(int quorumSize) {
this.quorumSize = quorumSize;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 6852499ff4..7985189b81 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -132,7 +132,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -739,17 +738,10 @@ public abstract class RaftMember implements RaftMemberMBean {
}
public void sendLogAsync(
- VotingLog log,
- Node node,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- AppendEntryRequest request,
- int quorumSize,
- boolean isVerifier) {
+ VotingLog log, Node node, AppendEntryRequest request, int quorumSize, boolean isVerifier) {
AsyncClient client = getSendLogAsyncClient(node);
if (client != null) {
- AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
+ AppendNodeEntryHandler handler = getAppendNodeEntryHandler(log, node, quorumSize);
try {
client.appendEntry(request, isVerifier, handler);
logger.debug("{} sending a log to {}: {}", name, node, log);
@@ -1251,12 +1243,8 @@ public abstract class RaftMember implements RaftMemberMBean {
try {
AppendLogResult appendLogResult =
- waitAppendResult(
- sendLogRequest.getVotingLog(),
- sendLogRequest.getLeaderShipStale(),
- sendLogRequest.getNewLeaderTerm(),
- sendLogRequest.getQuorumSize());
- Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
+ waitAppendResult(sendLogRequest.getVotingLog(), sendLogRequest.getQuorumSize());
+ Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_END.calOperationCostTimeFromStart(
sendLogRequest.getVotingLog().getLog().getCreateTime());
long startTime;
switch (appendLogResult) {
@@ -1296,15 +1284,12 @@ public abstract class RaftMember implements RaftMemberMBean {
public SendLogRequest buildSendLogRequest(Log log) {
VotingLog votingLog = buildVotingLog(log);
- AtomicBoolean leaderShipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(term.get());
long startTime = Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime();
AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.calOperationCostTimeFromStart(startTime);
- return new SendLogRequest(
- votingLog, leaderShipStale, newLeaderTerm, appendEntryRequest, allNodes.size() / 2);
+ return new SendLogRequest(votingLog, appendEntryRequest, allNodes.size() / 2);
}
public VotingLog buildVotingLog(Log log) {
@@ -1711,6 +1696,9 @@ public abstract class RaftMember implements RaftMemberMBean {
}
private boolean canBeWeaklyAccepted(Log log) {
+ if (log instanceof FragmentedLog) {
+ return true;
+ }
if (!(log instanceof RequestLog)) {
return false;
}
@@ -1726,6 +1714,8 @@ public abstract class RaftMember implements RaftMemberMBean {
@SuppressWarnings({"java:S2445"}) // safe synchronized
private void waitAppendResultLoop(VotingLog log, int quorumSize) {
int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
+ int weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
+ int stronglyAccepted = totalAccepted - weaklyAccepted;
long nextTimeToPrint = 5000;
long waitStart = System.nanoTime();
@@ -1736,7 +1726,7 @@ public abstract class RaftMember implements RaftMemberMBean {
synchronized (log.getLog()) {
while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
|| (!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
- && getCommitIndex() < log.getLog().getCurrLogIndex()
+ && stronglyAccepted < quorumSize
|| ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
&& log.getSignatures().size()
< TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
@@ -1768,6 +1758,8 @@ public abstract class RaftMember implements RaftMemberMBean {
nextTimeToPrint *= 2;
}
totalAccepted = votingLogList.totalAcceptedNodeNum(log);
+ weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
+ stronglyAccepted = totalAccepted - weaklyAccepted;
}
}
if (logger.isDebugEnabled()) {
@@ -1787,15 +1779,16 @@ public abstract class RaftMember implements RaftMemberMBean {
* wait until "voteCounter" counts down to zero, which means the quorum has received the log, or
* one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
*/
- protected AppendLogResult waitAppendResult(
- VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
+ protected AppendLogResult waitAppendResult(VotingLog log, int quorumSize) {
// wait for the followers to vote
long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
int totalAccepted = votingLogList.totalAcceptedNodeNum(log);
+ int weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
+ int stronglyAccepted = totalAccepted - weaklyAccepted;
if (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
|| ((!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
- && log.getLog().getCurrLogIndex() > getCommitIndex()
+ && stronglyAccepted < quorumSize
|| ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
&& log.getSignatures().size()
< TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
@@ -1805,6 +1798,8 @@ public abstract class RaftMember implements RaftMemberMBean {
waitAppendResultLoop(log, quorumSize);
}
totalAccepted = votingLogList.totalAcceptedNodeNum(log);
+ weaklyAccepted = log.getWeaklyAcceptedNodeIds().size();
+ stronglyAccepted = totalAccepted - weaklyAccepted;
if (log.acceptedTime.get() != 0) {
Statistic.RAFT_WAIT_AFTER_ACCEPTED.calOperationCostTimeFromStart(log.acceptedTime.get());
@@ -1812,8 +1807,7 @@ public abstract class RaftMember implements RaftMemberMBean {
Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.calOperationCostTimeFromStart(startTime);
// a node has a larger term than the local node, so this node is no longer a valid leader
- if (leaderShipStale.get()) {
- stepDown(newLeaderTerm.get(), false);
+ if (term.get() != log.getLog().getCurrLogTerm()) {
return AppendLogResult.LEADERSHIP_STALE;
}
// the node knows it is no long the leader from other requests
@@ -1821,12 +1815,12 @@ public abstract class RaftMember implements RaftMemberMBean {
return AppendLogResult.LEADERSHIP_STALE;
}
- if (totalAccepted >= quorumSize && log.getLog().getCurrLogIndex() > getCommitIndex()) {
+ if (totalAccepted >= quorumSize && stronglyAccepted < quorumSize) {
return AppendLogResult.WEAK_ACCEPT;
}
// cannot get enough agreements within a certain amount of time
- if (log.getLog().getCurrLogIndex() > getCommitIndex()) {
+ if (totalAccepted < quorumSize) {
return AppendLogResult.TIME_OUT;
}
@@ -2092,11 +2086,6 @@ public abstract class RaftMember implements RaftMemberMBean {
}
logger.debug("{} sending a log to followers: {}", name, log);
- // if a follower has larger term than this node, leaderShipStale will be set to true and
- // newLeaderTerm will store the follower's term
- AtomicBoolean leaderShipStale = new AtomicBoolean(false);
- AtomicLong newLeaderTerm = new AtomicLong(term.get());
-
AppendEntryRequest request = buildAppendEntryRequest(log.getLog(), true);
log.getFailedNodeIds().clear();
log.setHasFailed(false);
@@ -2107,9 +2096,7 @@ public abstract class RaftMember implements RaftMemberMBean {
// follower will not be blocked
for (Node node : allNodes) {
appendLogThreadPool.submit(
- () ->
- sendLogToFollower(
- log, node, leaderShipStale, newLeaderTerm, request, quorumSize, false));
+ () -> sendLogToFollower(log, node, request, quorumSize, false));
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
}
@@ -2118,7 +2105,7 @@ public abstract class RaftMember implements RaftMemberMBean {
// there is only one member, send to it within this thread to reduce thread switching
// overhead
for (Node node : allNodes) {
- sendLogToFollower(log, node, leaderShipStale, newLeaderTerm, request, quorumSize, false);
+ sendLogToFollower(log, node, request, quorumSize, false);
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
}
@@ -2130,18 +2117,12 @@ public abstract class RaftMember implements RaftMemberMBean {
return AppendLogResult.TIME_OUT;
}
- return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
+ return waitAppendResult(log, quorumSize);
}
/** Send "log" to "node". */
public void sendLogToFollower(
- VotingLog log,
- Node node,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- AppendEntryRequest request,
- int quorumSize,
- boolean isVerifier) {
+ VotingLog log, Node node, AppendEntryRequest request, int quorumSize, boolean isVerifier) {
if (node.equals(thisNode)) {
return;
}
@@ -2163,9 +2144,9 @@ public abstract class RaftMember implements RaftMemberMBean {
}
if (config.isUseAsyncServer()) {
- sendLogAsync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize, isVerifier);
+ sendLogAsync(log, node, request, quorumSize, isVerifier);
} else {
- sendLogSync(log, node, leaderShipStale, newLeaderTerm, request, quorumSize, isVerifier);
+ sendLogSync(log, node, request, quorumSize, isVerifier);
}
}
@@ -2198,17 +2179,10 @@ public abstract class RaftMember implements RaftMemberMBean {
}
private void sendLogSync(
- VotingLog log,
- Node node,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- AppendEntryRequest request,
- int quorumSize,
- boolean isVerifier) {
+ VotingLog log, Node node, AppendEntryRequest request, int quorumSize, boolean isVerifier) {
Client client = getSyncClient(node);
if (client != null) {
- AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, quorumSize);
+ AppendNodeEntryHandler handler = getAppendNodeEntryHandler(log, node, quorumSize);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -2230,17 +2204,11 @@ public abstract class RaftMember implements RaftMemberMBean {
}
public AppendNodeEntryHandler getAppendNodeEntryHandler(
- VotingLog log,
- Node node,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- int quorumSize) {
+ VotingLog log, Node node, int quorumSize) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setDirectReceiver(node);
- handler.setLeaderShipStale(leaderShipStale);
handler.setLog(log);
handler.setMember(this);
- handler.setReceiverTerm(newLeaderTerm);
handler.setQuorumSize(quorumSize);
if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
registerAppendLogHandler(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 7ded0c910e..9d771b6465 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -34,7 +34,8 @@ public class Timer {
private static final Logger logger = LoggerFactory.getLogger(Timer.class);
- public static final boolean ENABLE_INSTRUMENTING = true;
+ public static final boolean ENABLE_INSTRUMENTING =
+ ClusterDescriptor.getInstance().getConfig().isEnableInstrumenting();
private static final String COORDINATOR = "Coordinator";
private static final String META_GROUP_MEMBER = "Meta group member";
@@ -101,6 +102,12 @@ public class Timer {
TIME_SCALE,
RaftMember.USE_LOG_DISPATCHER,
DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+ RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND(
+ RAFT_MEMBER_SENDER,
+ "occupy log manager in append",
+ TIME_SCALE,
+ RaftMember.USE_LOG_DISPATCHER,
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION),
RAFT_SENDER_APPEND_LOG_V2(
RAFT_MEMBER_SENDER,
"locally append log",
@@ -239,12 +246,6 @@ public class Timer {
RAFT_MEMBER_SENDER, "in apply queue", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
RAFT_SENDER_DATA_LOG_APPLY(
RAFT_MEMBER_SENDER, "apply data log", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
- RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT(
- RAFT_MEMBER_SENDER,
- "log from create to accept",
- TIME_SCALE,
- RaftMember.USE_LOG_DISPATCHER,
- DATA_GROUP_MEMBER_LOCAL_EXECUTION),
// raft member - receiver
RAFT_RECEIVER_LOG_PARSE(
RAFT_MEMBER_RECEIVER, "log parse", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
@@ -282,6 +283,12 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_LOG_ENQUEUE_SINGLE(
+ LOG_DISPATCHER,
+ "enqueue (single)",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_LOG_IN_QUEUE(
LOG_DISPATCHER,
"in queue",
@@ -320,6 +327,24 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT(
+ LOG_DISPATCHER,
+ "from create to accept",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT(
+ LOG_DISPATCHER,
+ "from create to commit",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_END(
+ LOG_DISPATCHER,
+ "from create to wait end",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_FROM_CREATE_TO_OK(
LOG_DISPATCHER,
"from create to OK",
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 02e8703e64..87e2d1475b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -389,6 +389,6 @@ public class ClusterUtils {
}
public static String nodeToString(Node node) {
- return node.getInternalIp() + "-" + node.getMetaPort();
+ return node.getInternalIp() + ":" + node.getMetaPort();
}
}