You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/14 01:11:54 UTC
[iotdb] 01/02: add statistics
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a1006dc39d4f5bb2a73e678a1cdb7011a2d4880a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Apr 14 09:05:30 2023 +0800
add statistics
---
.../common/request/ByteBufferConsensusRequest.java | 5 +
.../iotdb/consensus/natraft/RaftConsensus.java | 31 ++++
.../consensus/natraft/protocol/RaftConfig.java | 55 +++++++
.../consensus/natraft/protocol/RaftMember.java | 83 +++++++---
.../protocol/heartbeat/HeartbeatThread.java | 4 +
.../log/appender/SlidingWindowLogAppender.java | 6 +-
.../log/dispatch/AppendNodeEntryHandler.java | 3 +
.../protocol/log/dispatch/LogDispatcher.java | 19 ++-
.../log/dispatch/flowcontrol/FlowBalancer.java | 60 +++++--
.../log/dispatch/flowcontrol/FlowMonitor.java | 86 ++++++----
.../dispatch/flowcontrol/FlowMonitorManager.java | 28 +++-
.../log/dispatch/flowcontrol/FlowWindow.java} | 26 ++-
.../protocol/log/manager/RaftLogManager.java | 3 +
.../serialization/SyncLogDequeSerializer.java | 1 +
.../iotdb/consensus/natraft/utils/NodeReport.java | 176 +++++++++++++++++++++
.../iotdb/consensus/natraft/utils/Timer.java | 23 ++-
distribution/distribute-ecs.sh | 31 ++++
.../plan/planner/plan/node/write/InsertNode.java | 10 ++
.../planner/plan/node/write/InsertTabletNode.java | 43 +++++
.../java/org/apache/iotdb/session/Session.java | 5 +-
20 files changed, 599 insertions(+), 99 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
index 0310a4e3d3..b05a35a561 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
@@ -39,4 +39,9 @@ public class ByteBufferConsensusRequest implements IConsensusRequest {
public ByteBuffer serializeToByteBuffer() {
return byteBuffer;
}
+
+ @Override
+ public long estimateSize() {
+ return byteBuffer.remaining();
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index ac1f88d2a1..2d4611b7be 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.consensus.natraft;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.RegisterManager;
@@ -48,6 +50,8 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
+import org.apache.iotdb.consensus.natraft.utils.NodeReport;
+import org.apache.iotdb.consensus.natraft.utils.NodeReport.RaftMemberReport;
import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
import org.apache.iotdb.consensus.natraft.utils.Timer;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -65,6 +69,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class RaftConsensus implements IConsensus {
@@ -79,6 +85,7 @@ public class RaftConsensus implements IConsensus {
private final RegisterManager registerManager = new RegisterManager();
private final RaftConfig config;
private final IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
+ private ScheduledExecutorService reportThread;
public RaftConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
@@ -109,6 +116,9 @@ public class RaftConsensus implements IConsensus {
() -> {
logger.info(Timer.Statistic.getReport());
}));
+ reportThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ reportThread, this::generateNodeReport, 5, 5, TimeUnit.SECONDS);
}
private void initAndRecover() throws IOException {
@@ -372,4 +382,25 @@ public class RaftConsensus implements IConsensus {
public TEndPoint getThisNode() {
return thisNode;
}
+
+ private void generateNodeReport() {
+ if (logger.isInfoEnabled()) {
+ try {
+ NodeReport report = new NodeReport(thisNode);
+ List<RaftMemberReport> reports = new ArrayList<>();
+ for (RaftMember value : stateMachineMap.values()) {
+ RaftMemberReport raftMemberReport = value.genMemberReport();
+ if (raftMemberReport.getPrevLastLogIndex() != raftMemberReport.getLastLogIndex()) {
+ reports.add(raftMemberReport);
+ }
+ }
+ if (!reports.isEmpty()) {
+ report.setMemberReports(reports);
+ logger.info(report.toString());
+ }
+ } catch (Exception e) {
+ logger.error("exception occurred when generating node report", e);
+ }
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 3d2991da3e..be3e56d231 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -63,6 +63,7 @@ public class RaftConfig {
private int raftLogBufferSize = 64 * 1024 * 1024;
private int maxNumberOfLogsPerFetchOnDisk = 1000;
private int maxRaftLogIndexSizeInMemory = 64 * 1024;
+ private int maxRaftLogPersistDataSizePerFile = 1024 * 1024 * 1024;
private int maxNumberOfPersistRaftLogFiles = 128;
private int maxPersistRaftLogNumberOnDisk = 10_000_000;
private int flushRaftLogThreshold = 100_000;
@@ -71,6 +72,9 @@ public class RaftConfig {
private boolean enableCompressedDispatching = true;
private boolean ignoreStateMachine = false;
private boolean onlyTestNetwork = false;
+ private boolean waitApply = true;
+ private double flowControlMinFlow = 10_000_000;
+ private double flowControlMaxFlow = 100_000_000;
private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
private RPCConfig rpcConfig;
@@ -400,6 +404,38 @@ public class RaftConfig {
this.ignoreStateMachine = ignoreStateMachine;
}
+ public double getFlowControlMinFlow() {
+ return flowControlMinFlow;
+ }
+
+ public void setFlowControlMinFlow(double flowControlMinFlow) {
+ this.flowControlMinFlow = flowControlMinFlow;
+ }
+
+ public double getFlowControlMaxFlow() {
+ return flowControlMaxFlow;
+ }
+
+ public void setFlowControlMaxFlow(double flowControlMaxFlow) {
+ this.flowControlMaxFlow = flowControlMaxFlow;
+ }
+
+ public boolean isWaitApply() {
+ return waitApply;
+ }
+
+ public void setWaitApply(boolean waitApply) {
+ this.waitApply = waitApply;
+ }
+
+ public int getMaxRaftLogPersistDataSizePerFile() {
+ return maxRaftLogPersistDataSizePerFile;
+ }
+
+ public void setMaxRaftLogPersistDataSizePerFile(int maxRaftLogPersistDataSizePerFile) {
+ this.maxRaftLogPersistDataSizePerFile = maxRaftLogPersistDataSizePerFile;
+ }
+
public void loadProperties(Properties properties) {
logger.debug("Loading properties: {}", properties);
@@ -476,6 +512,11 @@ public class RaftConfig {
properties.getProperty(
"raft_log_buffer_size", String.valueOf(this.getRaftLogBufferSize()))));
+ this.setMaxRaftLogPersistDataSizePerFile(
+ Integer.parseInt(
+ properties.getProperty(
+ "raft_log_file_size", String.valueOf(this.getMaxRaftLogPersistDataSizePerFile()))));
+
this.setLogNumInBatch(
Integer.parseInt(
properties.getProperty("log_batch_num", String.valueOf(this.getLogNumInBatch()))));
@@ -592,6 +633,20 @@ public class RaftConfig {
Boolean.parseBoolean(
properties.getProperty("only_test_network", String.valueOf(this.isOnlyTestNetwork()))));
+ this.setWaitApply(
+ Boolean.parseBoolean(
+ properties.getProperty("wait_apply", String.valueOf(this.isWaitApply()))));
+
+ this.setFlowControlMinFlow(
+ Double.parseDouble(
+ properties.getProperty(
+ "flow_control_min_flow", String.valueOf(this.getFlowControlMinFlow()))));
+
+ this.setFlowControlMaxFlow(
+ Double.parseDouble(
+ properties.getProperty(
+ "flow_control_max_flow", String.valueOf(this.getFlowControlMaxFlow()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 5271e793aa..8d84036592 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.SynchronousSeq
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
import org.apache.iotdb.consensus.natraft.utils.IOUtils;
import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.NodeReport.RaftMemberReport;
import org.apache.iotdb.consensus.natraft.utils.NodeUtils;
import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
@@ -176,6 +177,7 @@ public class RaftMember {
private ExecutorService commitLogPool;
private long lastCommitTaskTime;
+ private long lastReportIndex;
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
@@ -512,7 +514,9 @@ public class RaftMember {
}
AppendEntryResult response;
+ long startTime = Statistic.RAFT_RECEIVER_PARSE_ENTRY.getOperationStartTime();
List<Entry> entries = LogUtils.parseEntries(request.entries, stateMachine);
+ Statistic.RAFT_RECEIVER_PARSE_ENTRY.calOperationCostTimeFromStart(startTime);
response = logAppender.appendEntries(request.leaderCommit, request.term, entries);
@@ -644,7 +648,7 @@ public class RaftMember {
Statistic.LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE.add(entry.createTime - entry.receiveTime);
if (config.isUseFollowerLoadBalance()) {
- FlowMonitorManager.INSTANCE.report(thisNode, entry.estimateSize());
+ FlowMonitorManager.INSTANCE.report(thisNode.getEndpoint(), entry.estimateSize());
}
if (votingEntry == null) {
@@ -677,15 +681,15 @@ public class RaftMember {
}
}
- private TSStatus includeLogNumbersInStatus(TSStatus status, Entry entry) {
+ private TSStatus includeLogNumbersInStatus(TSStatus status, long index, long term) {
return status.setMessage(
getRaftGroupId().getType().ordinal()
+ "-"
+ getRaftGroupId().getId()
+ "-"
- + entry.getCurrLogIndex()
+ + index
+ "-"
- + entry.getCurrLogTerm());
+ + term);
}
protected AppendLogResult waitAppendResult(VotingEntry votingEntry) {
@@ -748,28 +752,32 @@ public class RaftMember {
}
long waitTime = 1;
AcceptedType acceptedType = votingLogList.computeAcceptedType(log);
- synchronized (log.getEntry()) {
- while (acceptedType == AcceptedType.NOT_ACCEPTED
- && alreadyWait < config.getWriteOperationTimeoutMS()) {
+
+ while (acceptedType == AcceptedType.NOT_ACCEPTED
+ && alreadyWait < config.getWriteOperationTimeoutMS()) {
+ long startTime = Statistic.RAFT_SENDER_LOG_APPEND_WAIT.getOperationStartTime();
+ synchronized (log.getEntry()) {
try {
log.getEntry().wait(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when sending a log", e);
}
- acceptedType = votingLogList.computeAcceptedType(log);
-
- alreadyWait = (System.nanoTime() - waitStart) / 1000000;
- if (alreadyWait > nextTimeToPrint) {
- logger.info(
- "Still not receive enough votes for {}, weakly " + "accepted {}, wait {}ms ",
- log,
- log.getWeaklyAcceptedNodes(),
- alreadyWait);
- nextTimeToPrint *= 2;
- }
+ }
+ Statistic.RAFT_SENDER_LOG_APPEND_WAIT.calOperationCostTimeFromStart(startTime);
+
+ acceptedType = votingLogList.computeAcceptedType(log);
+ alreadyWait = (System.nanoTime() - waitStart) / 1000000;
+ if (alreadyWait > nextTimeToPrint) {
+ logger.info(
+ "Still not receive enough votes for {}, weakly " + "accepted {}, wait {}ms ",
+ log,
+ log.getWeaklyAcceptedNodes(),
+ alreadyWait);
+ nextTimeToPrint *= 2;
}
}
+
if (logger.isDebugEnabled()) {
Thread.currentThread().setName(threadBaseName);
}
@@ -973,7 +981,7 @@ public class RaftMember {
logger.debug("{}: plan {} has no where to be forwarded", name, plan);
return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader to forward in: " + groupId);
}
- logger.debug("{}: Forward {} to node {}", name, plan, node);
+ logger.info("{}: Forward {} to node {}", name, plan, node);
TSStatus status;
status = forwardPlanAsync(plan, node, groupId);
@@ -1244,11 +1252,25 @@ public class RaftMember {
System.nanoTime() - votingEntry.getEntry().createTime);
switch (appendLogResult) {
case WEAK_ACCEPT:
+ Statistic.RAFT_LEADER_WEAK_ACCEPT.add(1);
return includeLogNumbersInStatus(
- StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), votingEntry.getEntry());
+ StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
+ votingEntry.getEntry().getCurrLogIndex(),
+ votingEntry.getEntry().getCurrLogTerm());
case OK:
- waitApply(votingEntry.getEntry());
- return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), votingEntry.getEntry());
+ if (config.isWaitApply()) {
+ waitApply(votingEntry.getEntry());
+ return includeLogNumbersInStatus(
+ StatusUtils.OK.deepCopy(),
+ votingEntry.getEntry().getCurrLogIndex(),
+ votingEntry.getEntry().getCurrLogTerm());
+ } else {
+ return includeLogNumbersInStatus(
+ StatusUtils.OK.deepCopy(),
+ logManager.getAppliedIndex(),
+ logManager.getAppliedTerm());
+ }
+
case TIME_OUT:
logger.debug("{}: log {} timed out...", name, votingEntry.getEntry());
break;
@@ -1345,4 +1367,21 @@ public class RaftMember {
return StatusUtils.TIME_OUT;
}
}
+
+ public RaftMemberReport genMemberReport() {
+ long prevLastLogIndex = lastReportIndex;
+ lastReportIndex = logManager.getLastLogIndex();
+ return new RaftMemberReport(
+ status.role,
+ status.getLeader().get(),
+ status.getTerm().get(),
+ logManager.getLastLogTerm(),
+ lastReportIndex,
+ logManager.getCommitLogIndex(),
+ logManager.getCommitLogTerm(),
+ readOnly,
+ heartbeatThread.getLastHeartbeatReceivedTime(),
+ prevLastLogIndex,
+ logManager.getAppliedIndex());
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index 660e6feff6..4f2bdc07b5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -368,4 +368,8 @@ public class HeartbeatThread implements Runnable {
public Object getElectionWaitObject() {
return electionWaitObject;
}
+
+ public long getLastHeartbeatReceivedTime() {
+ return lastHeartbeatReceivedTime;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 39b58cd424..9702b4b461 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -82,7 +82,7 @@ public class SlidingWindowLogAppender implements LogAppender {
// check the next entry
Entry entry = logWindow[pos];
boolean nextMismatch = false;
- if (logWindow[pos + 1] != null && pos < windowCapacity - 1) {
+ if (pos < windowCapacity - 1 && logWindow[pos + 1] != null) {
long nextPrevTerm = logWindow[pos + 1].getPrevTerm();
if (nextPrevTerm != entry.getCurrLogTerm()) {
nextMismatch = true;
@@ -180,7 +180,9 @@ public class SlidingWindowLogAppender implements LogAppender {
AppendEntryResult result = null;
for (Entry entry : entries) {
+ long startTime = Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.getOperationStartTime();
result = appendEntry(leaderCommit, entry);
+ Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.calOperationCostTimeFromStart(startTime);
if (result.status != Response.RESPONSE_AGREE
&& result.status != Response.RESPONSE_STRONG_ACCEPT
@@ -215,8 +217,10 @@ public class SlidingWindowLogAppender implements LogAppender {
checkLog(windowPos);
if (windowPos == 0) {
appended = flushWindow(result);
+ Statistic.RAFT_FOLLOWER_STRONG_ACCEPT.calOperationCostTimeFromStart(startTime);
} else {
result.status = Response.RESPONSE_WEAK_ACCEPT;
+ Statistic.RAFT_FOLLOWER_WEAK_ACCEPT.calOperationCostTimeFromStart(startTime);
}
} else {
result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 3c98156091..7d169e6e66 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -76,6 +76,9 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
member.getVotingLogList().onStronglyAccept(votingEntry, trueReceiver);
member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
+ synchronized (votingEntry.getEntry()) {
+ votingEntry.getEntry().notifyAll();
+ }
} else if (resp > 0) {
// a response > 0 is the follower's term
// the leadership is stale, wait for the new leader's heartbeat
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 463f9f0748..38ad5ff228 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -96,9 +96,10 @@ public class LogDispatcher {
}
public void updateRateLimiter() {
- logger.info("TEndPoint rates: {}", nodesRate);
+ logger.info("{}: TEndPoint rates: {}", member.getName(), nodesRate);
for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) {
- nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue());
+ nodesRateLimiter.put(
+ nodeDoubleEntry.getKey(), RateLimiter.create(nodeDoubleEntry.getValue()));
}
}
@@ -106,6 +107,7 @@ public class LogDispatcher {
BlockingQueue<VotingEntry> logBlockingQueue;
logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
nodesLogQueuesMap.put(node, logBlockingQueue);
+ nodesRate.put(node, Double.MAX_VALUE);
nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
for (int i = 0; i < bindingThreadNum; i++) {
@@ -261,6 +263,13 @@ public class LogDispatcher {
request.getAppendEntryRequest().entry = request.getEntry().serialize();
request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
+ logger.debug(
+ "{}/{}={}",
+ request.getEntry().estimateSize(),
+ request.getAppendEntryRequest().entry.remaining(),
+ request.getEntry().estimateSize()
+ * 1.0
+ / request.getAppendEntryRequest().entry.remaining());
}
}
@@ -346,12 +355,12 @@ public class LogDispatcher {
for (; logIndex < currBatch.size(); logIndex++) {
VotingEntry entry = currBatch.get(logIndex);
- long curSize = entry.getAppendEntryRequest().entry.array().length;
+ long curSize = entry.getAppendEntryRequest().entry.remaining();
if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
break;
}
logSize += curSize;
- logList.add(entry.getAppendEntryRequest().entry);
+ logList.add(entry.getAppendEntryRequest().entry.slice());
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
entry.getEntry().createTime);
}
@@ -365,7 +374,7 @@ public class LogDispatcher {
}
if (config.isUseFollowerLoadBalance()) {
- FlowMonitorManager.INSTANCE.report(receiver, logSize);
+ FlowMonitorManager.INSTANCE.report(receiver.getEndpoint(), logSize);
}
nodesRateLimiter.get(receiver).acquire((int) logSize);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 3c838f81b7..bbc9fd66ee 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -34,6 +35,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -42,22 +44,26 @@ import java.util.concurrent.TimeUnit;
public class FlowBalancer {
private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class);
- private double maxFlow = 900_000_000;
- private double minFlow = 10_000_000;
+ private double maxFlow;
+ private double minFlow;
private int windowsToUse;
private double overestimateFactor;
private int flowBalanceIntervalMS = 1000;
private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE;
private LogDispatcher logDispatcher;
private RaftMember member;
-
private ScheduledExecutorService scheduledExecutorService;
+ private volatile boolean inBurst = false;
+ private RaftConfig config;
public FlowBalancer(LogDispatcher logDispatcher, RaftMember member, RaftConfig config) {
this.logDispatcher = logDispatcher;
this.member = member;
- windowsToUse = config.getFollowerLoadBalanceWindowsToUse();
- overestimateFactor = config.getFollowerLoadBalanceOverestimateFactor();
+ this.windowsToUse = config.getFollowerLoadBalanceWindowsToUse();
+ this.overestimateFactor = config.getFollowerLoadBalanceOverestimateFactor();
+ this.minFlow = config.getFlowControlMinFlow();
+ this.maxFlow = config.getFlowControlMaxFlow();
+ this.config = config;
}
public void start() {
@@ -84,25 +90,54 @@ public class FlowBalancer {
int nodeNum = member.getAllNodes().size();
int followerNum = nodeNum - 1;
+ long flowMonitorWindowInterval = config.getFlowMonitorWindowInterval();
- double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
- double assumedFlow = thisNodeFlow * overestimateFactor;
- logger.info("Flow of this node: {}", thisNodeFlow);
+ List<FlowWindow> latestWindows =
+ flowMonitorManager.getLatestWindows(member.getThisNode().getEndpoint(), windowsToUse);
+ if (latestWindows.size() < windowsToUse) {
+ return;
+ }
+ int burstWindowNum = 0;
+ for (FlowWindow latestWindow : latestWindows) {
+ double assumedFlow =
+ latestWindow.sum * 1.0 * flowMonitorWindowInterval / 1000 * overestimateFactor;
+ if (assumedFlow * followerNum > maxFlow) {
+ burstWindowNum++;
+ }
+ }
+ double assumedFlow =
+ latestWindows.stream().mapToLong(w -> w.sum).sum()
+ * 1.0
+ / latestWindows.size()
+ * flowMonitorWindowInterval
+ * overestimateFactor;
+
+ for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) {
+ logger.info(
+ "{}: Flow of {}: {}, {}, {}",
+ member.getName(),
+ entry.getKey(),
+ entry.getValue().getLatestWindows(windowsToUse),
+ entry.getValue().averageFlow(windowsToUse),
+ inBurst);
+ }
Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap();
Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
// sort followers according to their queue length
followers.sort(Comparator.comparing(node -> nodesLogQueuesMap.get(node).size()));
- if (assumedFlow * followerNum > maxFlow) {
+ if (burstWindowNum > latestWindows.size() / 2 && !inBurst) {
enterBurst(nodesRate, nodeNum, assumedFlow, followers);
- } else {
+ logDispatcher.updateRateLimiter();
+ } else if (burstWindowNum < latestWindows.size() / 2 && inBurst) {
exitBurst(followerNum, nodesRate, followers);
+ logDispatcher.updateRateLimiter();
}
- logDispatcher.updateRateLimiter();
}
private void enterBurst(
Map<Peer, Double> nodesRate, int nodeNum, double assumedFlow, List<Peer> followers) {
+ logger.info("{}: entering burst", member.getName());
int followerNum = nodeNum - 1;
int quorumFollowerNum = nodeNum / 2;
double remainingFlow = maxFlow;
@@ -123,13 +158,16 @@ public class FlowBalancer {
Peer node = followers.get(i);
nodesRate.put(node, flowToRemaining);
}
+ inBurst = true;
}
private void exitBurst(int followerNum, Map<Peer, Double> nodesRate, List<Peer> followers) {
+ logger.info("{}: exiting burst", member.getName());
// lift flow limits
for (int i = 0; i < followerNum; i++) {
Peer node = followers.get(i);
nodesRate.put(node, maxFlow);
}
+ inBurst = false;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
index 79ff88984b..5689a1effd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
@@ -19,9 +19,8 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
-import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,24 +32,27 @@ import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
+import java.util.List;
public class FlowMonitor {
private static final Logger logger = LoggerFactory.getLogger(FlowMonitor.class);
private static final String FILE_SUFFIX = ".flow";
- private ArrayDeque<Pair<Long, Long>> windows;
+ private ArrayDeque<FlowWindow> windows;
private long currWindowStart;
private long currWindowSum;
private long windowInterval;
- private Peer node;
+ private TEndPoint node;
private int maxWindowSize;
private BufferedWriter writer;
private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private RaftConfig config;
+ private boolean recordFlow = false;
- public FlowMonitor(Peer node, RaftConfig config) throws IOException {
+ public FlowMonitor(TEndPoint node, RaftConfig config) throws IOException {
this.maxWindowSize = config.getFlowMonitorMaxWindowNum();
this.windows = new ArrayDeque<>(maxWindowSize);
this.windowInterval = config.getFlowMonitorWindowInterval();
@@ -61,17 +63,14 @@ public class FlowMonitor {
private void initSerializer() throws IOException {
String path =
- config.getStorageDir()
- + File.separator
- + node.getEndpoint().getIp()
- + "-"
- + node.getEndpoint().getPort()
- + FILE_SUFFIX;
+ config.getStorageDir() + File.separator + node.getIp() + "-" + node.getPort() + FILE_SUFFIX;
File file = new File(path);
file.delete();
- writer = new BufferedWriter(new FileWriter(file));
- writer.write("Time,FlowSum");
- writer.newLine();
+ if (recordFlow) {
+ writer = new BufferedWriter(new FileWriter(file));
+ writer.write("Time,FlowSum");
+ writer.newLine();
+ }
}
public void close() {
@@ -79,11 +78,13 @@ public class FlowMonitor {
while (windows.size() > 0) {
serializeWindow();
}
- try {
- writer.close();
- logger.info("Flow monitor {} is closed", node);
- } catch (IOException e) {
- logger.warn("Cannot close serializer of {}", node, e);
+ if (writer != null) {
+ try {
+ writer.close();
+ logger.info("Flow monitor {} is closed", node);
+ } catch (IOException e) {
+ logger.warn("Cannot close serializer of {}", node, e);
+ }
}
}
@@ -93,15 +94,17 @@ public class FlowMonitor {
}
private void serializeWindow() {
- Pair<Long, Long> window = windows.removeFirst();
- try {
- String windowString =
- String.format("%s,%d", dateFormat.format(new Date(window.left)), window.right);
- logger.debug("New window {} serialized by {}", windowString, node);
- writer.write(windowString);
- writer.newLine();
- } catch (IOException e) {
- logger.warn("Cannot serialize window {} of {}", window, node, e);
+ FlowWindow window = windows.removeFirst();
+ if (writer != null) {
+ try {
+ String windowString =
+ String.format("%s,%d", dateFormat.format(new Date(window.start)), window.sum);
+ logger.debug("New window {} serialized by {}", windowString, node);
+ writer.write(windowString);
+ writer.newLine();
+ } catch (IOException e) {
+ logger.warn("Cannot serialize window {} of {}", window, node, e);
+ }
}
}
@@ -114,7 +117,7 @@ public class FlowMonitor {
private void saveWindow() {
if (currWindowSum != 0) {
checkSize();
- windows.add(new Pair<>(currWindowStart, currWindowSum));
+ windows.add(new FlowWindow(currWindowStart, currWindowSum));
logger.debug("New window {},{} generated by {}", currWindowStart, currWindowSum, node);
}
}
@@ -134,11 +137,15 @@ public class FlowMonitor {
public double averageFlow(int windowsToUse) {
long flowSum = currWindowSum;
long intervalSum = System.currentTimeMillis() - currWindowStart;
- Iterator<Pair<Long, Long>> windowIterator = windows.descendingIterator();
+ if (intervalSum > windowInterval) {
+ intervalSum = windowInterval;
+ }
+
+ Iterator<FlowWindow> windowIterator = windows.descendingIterator();
for (int i = 1; i < windowsToUse; i++) {
if (windowIterator.hasNext()) {
- Pair<Long, Long> window = windowIterator.next();
- flowSum += window.right;
+ FlowWindow window = windowIterator.next();
+ flowSum += window.sum;
intervalSum += windowInterval;
} else {
break;
@@ -146,4 +153,19 @@ public class FlowMonitor {
}
return flowSum * 1.0 / intervalSum * 1000;
}
+
+ public List<FlowWindow> getLatestWindows(int windowNum) {
+ List<FlowWindow> result = new ArrayList<>();
+ result.add(new FlowWindow(currWindowStart, currWindowSum));
+ Iterator<FlowWindow> windowIterator = windows.descendingIterator();
+ for (int i = 1; i < windowNum; i++) {
+ if (windowIterator.hasNext()) {
+ FlowWindow window = windowIterator.next();
+ result.add(window);
+ } else {
+ break;
+ }
+ }
+ return result;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
index ea7c0f19bf..726ef093db 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
@@ -19,13 +19,15 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
-import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,7 +36,7 @@ public class FlowMonitorManager {
private static final Logger logger = LoggerFactory.getLogger(FlowMonitorManager.class);
public static final FlowMonitorManager INSTANCE = new FlowMonitorManager();
- private Map<Peer, FlowMonitor> monitorMap = new ConcurrentHashMap<>();
+ private Map<TEndPoint, FlowMonitor> monitorMap = new ConcurrentHashMap<>();
private RaftConfig config;
private FlowMonitorManager() {}
@@ -50,26 +52,36 @@ public class FlowMonitorManager {
monitorMap.clear();
}
- public void report(Peer peer, long val) {
+ public void report(TEndPoint endPoint, long val) {
FlowMonitor flowMonitor =
monitorMap.computeIfAbsent(
- peer,
+ endPoint,
p -> {
try {
return new FlowMonitor(p, config);
} catch (IOException e) {
- logger.warn("Cannot register flow monitor for {}", peer, e);
+ logger.warn("Cannot register flow monitor for {}", endPoint, e);
return null;
}
});
if (flowMonitor != null) {
flowMonitor.report(val);
} else {
- logger.warn("Flow monitor {} is not registered", peer);
+ logger.warn("Flow monitor {} is not registered", endPoint);
}
}
- public double averageFlow(Peer peer, int windowsToUse) {
- return monitorMap.get(peer).averageFlow(windowsToUse);
+ public double averageFlow(TEndPoint endPoint, int windowsToUse) {
+ FlowMonitor flowMonitor = monitorMap.get(endPoint);
+ return flowMonitor != null ? flowMonitor.averageFlow(windowsToUse) : 0.0;
+ }
+
+ public List<FlowWindow> getLatestWindows(TEndPoint endPoint, int windowNum) {
+ FlowMonitor flowMonitor = monitorMap.get(endPoint);
+ return flowMonitor != null ? flowMonitor.getLatestWindows(windowNum) : Collections.emptyList();
+ }
+
+ public Map<TEndPoint, FlowMonitor> getMonitorMap() {
+ return monitorMap;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java
similarity index 55%
copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
copy to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java
index 0310a4e3d3..157ecd398e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java
@@ -16,27 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
-package org.apache.iotdb.consensus.common.request;
+public class FlowWindow {
+ public long start;
+ public long sum;
-import java.nio.ByteBuffer;
-
-/*
-In general, for the requests from the leader, we can directly strong-cast the class to reduce
-the cost of deserialization during the execution of the leader state machine. For the requests
-received by the followers, the responsibility of deserialization can generally be transferred
-to the state machine layer
-*/
-public class ByteBufferConsensusRequest implements IConsensusRequest {
-
- private final ByteBuffer byteBuffer;
-
- public ByteBufferConsensusRequest(ByteBuffer byteBuffer) {
- this.byteBuffer = byteBuffer;
+ public FlowWindow(long start, long sum) {
+ this.start = start;
+ this.sum = sum;
}
@Override
- public ByteBuffer serializeToByteBuffer() {
- return byteBuffer;
+ public String toString() {
+ return "[" + +start + "," + sum + ']';
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index aee2fa76cc..ebb0a0f37a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -632,6 +632,9 @@ public abstract class RaftLogManager {
if (entry.createTime != 0) {
entry.committedTime = System.nanoTime();
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(entry.committedTime - entry.createTime);
+ synchronized (entry) {
+ entry.notify();
+ }
}
entry.setSerializationCache(null);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index a9aaae7d83..24b6755771 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -157,6 +157,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
maxRaftLogIndexSizeInMemory = config.getMaxRaftLogIndexSizeInMemory();
maxNumberOfPersistRaftLogFiles = config.getMaxNumberOfPersistRaftLogFiles();
maxPersistRaftLogNumberOnDisk = config.getMaxPersistRaftLogNumberOnDisk();
+ maxRaftLogPersistDataSizePerFile = config.getMaxRaftLogPersistDataSizePerFile();
this.logDataFileList = new ArrayList<>();
this.logIndexFileList = new ArrayList<>();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
new file mode 100644
index 0000000000..e6220a188d
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.utils;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A node report collects the current runtime information of the local node, which contains: 1. The
+ * MetaMemberReport of the meta member. 2. The DataMemberReports of each data member.
+ */
+@SuppressWarnings("java:S107") // reports need enough parameters
+public class NodeReport {
+
+ private TEndPoint thisNode;
+ private List<RaftMemberReport> memberReports;
+
+ public NodeReport(TEndPoint thisNode) {
+ this.thisNode = thisNode;
+ memberReports = new ArrayList<>();
+ }
+
+ public void setMemberReports(List<RaftMemberReport> memberReports) {
+ this.memberReports = memberReports;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("Report of ").append(thisNode).append(System.lineSeparator());
+ for (RaftMemberReport memberReport : memberReports) {
+ stringBuilder.append(memberReport).append(System.lineSeparator());
+ }
+ return stringBuilder.toString();
+ }
+
+ /**
+ * A RaftMemberReport contains the character, leader, term, last log term/index of a raft member.
+ */
+ public static class RaftMemberReport {
+ RaftRole character;
+ Peer leader;
+ long term;
+ long lastLogTerm;
+ long lastLogIndex;
+ long commitIndex;
+ long commitTerm;
+ boolean isReadOnly;
+ long lastHeartbeatReceivedTime;
+ long prevLastLogIndex;
+ long maxAppliedLogIndex;
+
+ public RaftMemberReport(
+ RaftRole character,
+ Peer leader,
+ long term,
+ long lastLogTerm,
+ long lastLogIndex,
+ long commitIndex,
+ long commitTerm,
+ boolean isReadOnly,
+ long lastHeartbeatReceivedTime,
+ long prevLastLogIndex,
+ long maxAppliedLogIndex) {
+ this.character = character;
+ this.leader = leader;
+ this.term = term;
+ this.lastLogTerm = lastLogTerm;
+ this.lastLogIndex = lastLogIndex;
+ this.commitIndex = commitIndex;
+ this.commitTerm = commitTerm;
+ this.isReadOnly = isReadOnly;
+ this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime;
+ this.prevLastLogIndex = prevLastLogIndex;
+ this.maxAppliedLogIndex = maxAppliedLogIndex;
+ }
+
+ @Override
+ public String toString() {
+ String transportCompressionReport = "";
+ return "RaftReport {\n"
+ + "character="
+ + character
+ + ", Leader="
+ + leader
+ + ", term="
+ + term
+ + ", lastLogTerm="
+ + lastLogTerm
+ + ", lastLogIndex="
+ + lastLogIndex
+ + ", commitIndex="
+ + commitIndex
+ + ", commitTerm="
+ + commitTerm
+ + ", appliedLogIndex="
+ + maxAppliedLogIndex
+ + ", readOnly="
+ + isReadOnly
+ + ", lastHeartbeat="
+ + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
+ + "ms ago"
+ + ", logIncrement="
+ + (lastLogIndex - prevLastLogIndex)
+ + transportCompressionReport
+ + ", \n timer: "
+ + Timer.Statistic.getReport()
+ + '}';
+ }
+
+ public RaftRole getCharacter() {
+ return character;
+ }
+
+ public Peer getLeader() {
+ return leader;
+ }
+
+ public long getTerm() {
+ return term;
+ }
+
+ public long getLastLogTerm() {
+ return lastLogTerm;
+ }
+
+ public long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ public long getCommitIndex() {
+ return commitIndex;
+ }
+
+ public long getCommitTerm() {
+ return commitTerm;
+ }
+
+ public boolean isReadOnly() {
+ return isReadOnly;
+ }
+
+ public long getLastHeartbeatReceivedTime() {
+ return lastHeartbeatReceivedTime;
+ }
+
+ public long getPrevLastLogIndex() {
+ return prevLastLogIndex;
+ }
+
+ public long getMaxAppliedLogIndex() {
+ return maxAppliedLogIndex;
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index 9631b892fd..de89657540 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -196,6 +196,12 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_PARSE_ENTRY(
+ RAFT_MEMBER_RECEIVER,
+ "receiver parse entries",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_RECEIVER_WAIT_FOR_PREV_LOG(
RAFT_MEMBER_RECEIVER,
"receiver wait for prev log",
@@ -220,6 +226,12 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_APPEND_ONE_ENTRY(
+ RAFT_MEMBER_RECEIVER,
+ "receiver append one entries",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_RECEIVER_APPEND_ENTRY(
RAFT_MEMBER_RECEIVER,
"append entrys",
@@ -344,6 +356,12 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_APPEND_WAIT(
+ LOG_DISPATCHER,
+ "wait for being appended",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_END(
LOG_DISPATCHER,
"from create to wait append end",
@@ -382,7 +400,10 @@ public class Timer {
RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
- RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
+ RAFT_LEADER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "leader weak accept", 1, true, ROOT),
+ RAFT_FOLLOWER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "follower weak accept", TIME_SCALE, true, ROOT),
+ RAFT_FOLLOWER_STRONG_ACCEPT(
+ RAFT_MEMBER_SENDER, "follower strong accept", TIME_SCALE, true, ROOT),
RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT),
RAFT_INDEX_BLOCKER(RAFT_MEMBER_SENDER, "index blocker", 1, true, ROOT),
RAFT_APPEND_BLOCKER(RAFT_MEMBER_SENDER, "append blocker", 1, true, ROOT),
diff --git a/distribution/distribute-ecs.sh b/distribution/distribute-ecs.sh
new file mode 100644
index 0000000000..434e485e31
--- /dev/null
+++ b/distribution/distribute-ecs.sh
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+src_lib_path=/d/CodeRepo/iotdb2/distribution/target/apache-iotdb-1.1.0-SNAPSHOT-all-bin/apache-iotdb-1.1.0-SNAPSHOT-all-bin/lib/iotdb*
+
+ips=(ecs1 ecs2 ecs3 ecs4)
+#ips=(dc11 dc12 dc13 dc14 dc11 dc12)
+target_lib_path=/root/jt/iotdb_expr/apache-iotdb-1.1.0-SNAPSHOT-all-bin/lib/
+
+for ip in ${ips[*]}
+ do
+ ssh root@$ip "mkdir $target_lib_path"
+ scp -r $src_lib_path root@$ip:$target_lib_path
+ done
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index ae890e6a32..4b40393b25 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -399,4 +399,14 @@ public abstract class InsertNode extends WritePlanNode {
public PartialPath conflictKey() {
return devicePath;
}
+
+ @Override
+ public long estimateSize() {
+ long size = 0;
+ size += devicePath.getFullPath().length() * 2L;
+ for (String measurement : measurements) {
+ size += measurement.length() * 2L;
+ }
+ return size;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 19e157493f..7fa6b8ba30 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -1159,4 +1159,47 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
throw new SemanticException(e);
}
}
+
+ @Override
+ public long estimateSize() {
+ long size = super.estimateSize();
+
+ size += times.length * 8L;
+
+ for (int measurementIndex = 0; measurementIndex < columns.length; measurementIndex++) {
+ switch (dataTypes[measurementIndex]) {
+ case INT32:
+ int[] intValues = (int[]) columns[measurementIndex];
+ size += intValues.length * 4L;
+ break;
+ case INT64:
+ long[] longValues = (long[]) columns[measurementIndex];
+ size += longValues.length * 8L;
+ break;
+ case FLOAT:
+ float[] floatValues = (float[]) columns[measurementIndex];
+ size += floatValues.length * 4L;
+ break;
+ case DOUBLE:
+ double[] doubleValues = (double[]) columns[measurementIndex];
+ size += doubleValues.length * 8L;
+ break;
+ case BOOLEAN:
+ boolean[] boolValues = (boolean[]) columns[measurementIndex];
+ size += boolValues.length;
+ break;
+ case TEXT:
+ Binary[] binaryValues = (Binary[]) columns[measurementIndex];
+ for (Binary binaryValue : binaryValues) {
+ size += binaryValue.getLength();
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
+ }
+ }
+
+ return size;
+ }
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index d3edee11d5..6010d8a87f 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -746,7 +746,7 @@ public class Session implements ISession {
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
- logger.debug(
+ logger.info(
"{} redirect query {} to {}",
defaultSessionConnection.getEndPoint(),
sql,
@@ -795,7 +795,7 @@ public class Session implements ISession {
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
- logger.debug("redirect query {} to {}", paths, e.getEndPoint());
+ logger.info("redirect query {} to {}", paths, e.getEndPoint());
// retry
try {
return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut);
@@ -1089,6 +1089,7 @@ public class Session implements ISession {
private void handleRedirection(String deviceId, TEndPoint endpoint) {
if (enableRedirection) {
+ logger.info("Redirect to {}", endpoint);
// no need to redirection
if (endpoint.ip.equals("0.0.0.0")) {
return;