You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/04/07 01:16:58 UTC
[iotdb] 03/03: add relay_first_level_size config
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1da27c431b059cc1c6184075436a13334e381582
Author: jt <jt...@163.com>
AuthorDate: Thu Apr 7 09:15:49 2022 +0800
add relay_first_level_size config
---
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 14 +--
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 ++
.../iotdb/cluster/config/ClusterDescriptor.java | 5 +
.../iotdb/cluster/log/IndirectLogDispatcher.java | 109 ++++++++++++---------
.../org/apache/iotdb/cluster/log/LogAckSender.java | 8 +-
.../apache/iotdb/cluster/log/LogDispatcher.java | 6 +-
.../org/apache/iotdb/cluster/log/LogRelay.java | 81 ++++++++++++++-
.../cluster/server/heartbeat/HeartbeatThread.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 37 ++++---
.../iotdb/cluster/server/member/RaftMember.java | 90 +++--------------
.../iotdb/cluster/server/monitor/NodeReport.java | 79 ++++++++-------
.../iotdb/cluster/server/monitor/NodeStatus.java | 20 ++++
.../cluster/server/monitor/NodeStatusManager.java | 9 +-
.../apache/iotdb/cluster/server/monitor/Timer.java | 28 +++++-
.../cluster/server/service/MetaSyncService.java | 4 +-
.../iotdb/cluster/utils/WindowStatistic.java | 55 +++++++++++
16 files changed, 364 insertions(+), 193 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index a626b88425..f399c78b2b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -45,9 +45,8 @@ import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.monitor.NodeReport;
-import org.apache.iotdb.cluster.server.monitor.NodeStatus;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
import org.apache.iotdb.cluster.server.raft.DataRaftService;
import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
@@ -82,7 +81,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -216,6 +214,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
report.setMetaMemberReport(metaGroupMember.genMemberReport());
report.setDataMemberReportList(dataGroupEngine.genMemberReports());
logger.info(report.toString());
+ NodeStatusManager.getINSTANCE().report();
} catch (Exception e) {
logger.error("exception occurred when generating node report", e);
}
@@ -296,13 +295,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
@Override
public void run() {
- logger.info(
- "Total request fanout: {}",
- Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt() + Statistic.RAFT_SENDER_SEND_LOG.getCnt());
- for (Entry<Node, NodeStatus> nodeNodeStatusEntry :
- NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet()) {
- logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
- }
+ logger.info(Timer.getReport());
+ NodeStatusManager.getINSTANCE().report();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 187509b2ef..0f265dee76 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -192,6 +192,8 @@ public class ClusterConfig {
private int relaySenderNum = 8;
+ private int relayFirstLevelSize = 1;
+
private boolean optimizeIndirectBroadcasting = false;
/**
@@ -600,4 +602,12 @@ public class ClusterConfig {
public void setOptimizeIndirectBroadcasting(boolean optimizeIndirectBroadcasting) {
this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
}
+
+ public int getRelayFirstLevelSize() {
+ return relayFirstLevelSize;
+ }
+
+ public void setRelayFirstLevelSize(int relayFirstLevelSize) {
+ this.relayFirstLevelSize = relayFirstLevelSize;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index a089e2d75a..8e9f8934bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -356,6 +356,11 @@ public class ClusterDescriptor {
properties.getProperty(
"relay_sender_number", String.valueOf(config.getRelaySenderNum()))));
+ config.setRelayFirstLevelSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "relay_first_level_size", String.valueOf(config.getRelayFirstLevelSize()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index d32fb405e3..ecbd1dbabe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClusterUtils;
@@ -38,6 +39,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.iotdb.cluster.server.monitor.Timer.Statistic.RAFT_RELAYED_LEVEL1_NUM;
/**
* IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
@@ -47,7 +51,9 @@ public class IndirectLogDispatcher extends LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(IndirectLogDispatcher.class);
- private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
+ private Map<Node, List<Node>> directToIndirectFollowerMap = new ConcurrentHashMap<>();
+ private long dispatchedEntryNum;
+ private int recalculateMapInterval = 1;
public IndirectLogDispatcher(RaftMember member) {
super(member);
@@ -74,7 +80,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
@Override
public void offer(SendLogRequest request) {
super.offer(request);
- recalculateDirectFollowerMap();
+ dispatchedEntryNum++;
+ if (dispatchedEntryNum % recalculateMapInterval == 0) {
+ recalculateDirectFollowerMap();
+ }
}
@Override
@@ -87,6 +96,15 @@ public class IndirectLogDispatcher extends LogDispatcher {
return newRequest;
}
+ private double getNodeWeight(Node node, double maxLatency) {
+ NodeStatus status = NodeStatusManager.getINSTANCE().getNodeStatus(node, false);
+ // return 1.0
+ // / (status.getStatus().fanoutRequestNum + 1);
+ double pow = Math.pow(100.0, maxLatency / status.getSendEntryLatencyStatistic().getAvg());
+ status.setRelayWeight(pow);
+ return pow;
+ }
+
public void recalculateDirectFollowerMap() {
List<Node> allNodes = new ArrayList<>(member.getAllNodes());
allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
@@ -96,28 +114,31 @@ public class IndirectLogDispatcher extends LogDispatcher {
nodesEnabled.clear();
directToIndirectFollowerMap.clear();
+ int firstLevelSize = ClusterDescriptor.getInstance().getConfig().getRelayFirstLevelSize();
+ List<Node> firstLevelNodes;
+
if (ClusterDescriptor.getInstance().getConfig().isOptimizeIndirectBroadcasting()) {
QueryCoordinator instance = QueryCoordinator.getINSTANCE();
orderedNodes = instance.reorderNodes(allNodes);
- long thisLoad =
- Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt() + 1;
- long minLoad =
+ long thisLoad = Statistic.getTotalFanout() + 1;
+ double maxLatency =
NodeStatusManager.getINSTANCE()
- .getNodeStatus(orderedNodes.get(0), false)
- .getStatus()
- .fanoutRequestNum
- + 1;
- double loadFactor = 1.05;
+ .getNodeStatus(orderedNodes.get(0), false)
+ .getSendEntryLatencyStatistic()
+ .getAvg();
+ for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
+ maxLatency =
+ Double.max(
+ maxLatency,
+ NodeStatusManager.getINSTANCE()
+ .getNodeStatus(orderedNodes.get(i), false)
+ .getSendEntryLatencyStatistic()
+ .getAvg());
+ }
+
WeightedList<Node> firstLevelCandidates = new WeightedList<>();
firstLevelCandidates.insert(
- orderedNodes.get(0),
- 1.0
- / (NodeStatusManager.getINSTANCE()
- .getNodeStatus(orderedNodes.get(0), false)
- .getStatus()
- .fanoutRequestNum
- + 1));
- int firstLevelSize = 1;
+ orderedNodes.get(0), getNodeWeight(orderedNodes.get(0), maxLatency));
for (int i = 1, orderedNodesSize = orderedNodes.size(); i < orderedNodesSize; i++) {
Node orderedNode = orderedNodes.get(i);
@@ -127,9 +148,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
.getStatus()
.fanoutRequestNum
+ 1;
- if (nodeLoad * 1.0 <= minLoad * loadFactor) {
- firstLevelCandidates.insert(orderedNode, 1.0 / nodeLoad);
- }
+ firstLevelCandidates.insert(orderedNode, getNodeWeight(orderedNode, maxLatency));
if (nodeLoad > thisLoad) {
firstLevelSize = (int) Math.max(firstLevelSize, nodeLoad / thisLoad);
}
@@ -139,36 +158,34 @@ public class IndirectLogDispatcher extends LogDispatcher {
firstLevelSize = firstLevelCandidates.size();
}
- List<Node> firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
-
- Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
- orderedNodes.removeAll(firstLevelNodes);
- for (int i = 0; i < orderedNodes.size(); i++) {
- Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
- secondLevelNodeMap
- .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
- .add(orderedNodes.get(i));
+ firstLevelNodes = firstLevelCandidates.select(firstLevelSize);
+ } else {
+ firstLevelNodes = new ArrayList<>(orderedNodes.subList(0, firstLevelSize));
+ if (firstLevelSize > orderedNodes.size()) {
+ firstLevelSize = orderedNodes.size();
}
+ }
- for (Node firstLevelNode : firstLevelNodes) {
- directToIndirectFollowerMap.put(
- firstLevelNode,
- secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
- nodesEnabled.put(firstLevelNode, true);
- }
+ Map<Node, List<Node>> secondLevelNodeMap = new HashMap<>();
+ orderedNodes.removeAll(firstLevelNodes);
+ for (int i = 0; i < orderedNodes.size(); i++) {
+ Node firstLevelNode = firstLevelNodes.get(i % firstLevelSize);
+ secondLevelNodeMap
+ .computeIfAbsent(firstLevelNode, n -> new ArrayList<>())
+ .add(orderedNodes.get(i));
+ }
- } else {
- for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
- if (i != j) {
- directToIndirectFollowerMap.put(
- orderedNodes.get(i), Collections.singletonList(orderedNodes.get(j)));
- } else {
- directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
- }
- nodesEnabled.put(orderedNodes.get(i), true);
- }
+ for (Node firstLevelNode : firstLevelNodes) {
+ directToIndirectFollowerMap.put(
+ firstLevelNode, secondLevelNodeMap.getOrDefault(firstLevelNode, Collections.emptyList()));
+ nodesEnabled.put(firstLevelNode, true);
}
+ RAFT_RELAYED_LEVEL1_NUM.add(directToIndirectFollowerMap.size());
logger.debug("New relay map: {}", directToIndirectFollowerMap);
}
+
+ public Map<Node, List<Node>> getDirectToIndirectFollowerMap() {
+ return Collections.unmodifiableMap(directToIndirectFollowerMap);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
index 3441a01432..3a15dca6d4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogAckSender.java
@@ -57,8 +57,10 @@ public class LogAckSender {
public LogAckSender(RaftMember member) {
this.member = member;
this.header = member.getHeader();
- ackSenderPool = IoTDBThreadPoolFactory.newSingleThreadExecutor(member.getName() + "-ACKSender");
- ackSenderPool.submit(this::appendAckLeaderTask);
+ ackSenderPool = IoTDBThreadPoolFactory.newFixedThreadPool(4, member.getName() + "-ACKSender");
+ for (int i = 0; i < 4; i++) {
+ ackSenderPool.submit(this::appendAckLeaderTask);
+ }
}
public static class AckRequest {
@@ -93,7 +95,7 @@ public class LogAckSender {
}
appendAckLeader(ackRequestList);
- Thread.sleep(10);
+ // Thread.sleep(10);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 3e659a1468..f1c50952bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -193,7 +193,6 @@ public class LogDispatcher {
public static class SendLogRequest {
- private AppendNodeEntryHandler handler;
private VotingLog votingLog;
private AtomicBoolean leaderShipStale;
private AtomicLong newLeaderTerm;
@@ -469,6 +468,10 @@ public class LogDispatcher {
logRequest.newLeaderTerm,
logRequest.quorumSize);
// TODO add async interface
+ if (syncClient == null) {
+ syncClient = member.getSyncClient(receiver);
+ }
+
int retries = 5;
try {
long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -486,6 +489,7 @@ public class LogDispatcher {
NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(receiver, false);
nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
nodeStatus.getSendEntryNum().incrementAndGet();
+ nodeStatus.getSendEntryLatencyStatistic().add(sendLogTime);
long handleStart = Statistic.RAFT_SENDER_HANDLE_SEND_RESULT.getOperationStartTime();
handler.onComplete(result);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 842f2eb3ca..350e3fed0b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -23,10 +23,16 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +42,8 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
+import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
+
/** LogRelay is used by followers to forward entries from the leader to other followers. */
public class LogRelay {
@@ -51,8 +59,8 @@ public class LogRelay {
this.raftMember = raftMember;
relaySenders =
IoTDBThreadPoolFactory.newFixedThreadPool(
- RELAY_NUMBER, raftMember.getName() + "-RelaySender");
- for (int i = 0; i < RELAY_NUMBER; i++) {
+ RELAY_NUMBER + 4, raftMember.getName() + "-RelaySender");
+ for (int i = 0; i < 4; i++) {
relaySenders.submit(new RelayThread());
}
}
@@ -116,7 +124,7 @@ public class LogRelay {
+ (relayEntry.singleRequest.prevLogIndex + 1)
+ "-"
+ relayEntry.receivers);
- raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+ sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
} else if (relayEntry.batchRequest != null) {
Thread.currentThread()
.setName(
@@ -125,7 +133,7 @@ public class LogRelay {
+ (relayEntry.batchRequest.prevLogIndex + 1)
+ "-"
+ relayEntry.receivers);
- raftMember.sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
+ sendLogsToSubFollowers(relayEntry.batchRequest, relayEntry.receivers);
}
Statistic.RAFT_RELAYED_ENTRY.add(1);
@@ -133,6 +141,71 @@ public class LogRelay {
}
}
+ public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
+ request.setIsFromLeader(false);
+ request.setSubReceiversIsSet(false);
+ for (Node subFollower : subFollowers) {
+ relaySenders.submit(
+ () -> {
+ Client syncClient = null;
+ try {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ raftMember
+ .getAsyncClient(subFollower)
+ .appendEntry(request, new IndirectAppendHandler(subFollower, request));
+ } else {
+ long operationStartTime = Statistic.RAFT_RECEIVER_RELAY_LOG.getOperationStartTime();
+ syncClient = raftMember.getSyncClient(subFollower);
+
+ int concurrentSender = concurrentSenderNum.incrementAndGet();
+ Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
+ syncClient.appendEntry(request);
+ concurrentSenderNum.decrementAndGet();
+
+ long sendLogTime =
+ Statistic.RAFT_RECEIVER_RELAY_LOG.calOperationCostTimeFromStart(
+ operationStartTime);
+ NodeStatus nodeStatus =
+ NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
+ nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
+ nodeStatus.getSendEntryNum().incrementAndGet();
+ nodeStatus.getSendEntryLatencyStatistic().add(sendLogTime);
+ }
+ } catch (TException e) {
+ logger.error("Cannot send {} to {}", request, subFollower, e);
+ } finally {
+ if (syncClient != null) {
+ ClientUtils.putBackSyncClient(syncClient);
+ }
+ }
+ });
+ }
+ }
+
+ public void sendLogsToSubFollowers(AppendEntriesRequest request, List<Node> subFollowers) {
+ request.setIsFromLeader(false);
+ request.setSubReceiversIsSet(false);
+ for (Node subFollower : subFollowers) {
+ Client syncClient = null;
+ try {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ raftMember
+ .getAsyncClient(subFollower)
+ .appendEntries(request, new IndirectAppendHandler(subFollower, request));
+ } else {
+ syncClient = raftMember.getSyncClient(subFollower);
+ syncClient.appendEntries(request);
+ }
+ } catch (TException e) {
+ logger.error("Cannot send {} to {}", request, subFollower, e);
+ } finally {
+ if (syncClient != null) {
+ ClientUtils.putBackSyncClient(syncClient);
+ }
+ }
+ }
+ }
+
public static class RelayEntry implements Comparable<RelayEntry> {
private AppendEntryRequest singleRequest;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index acc34cf6c0..266f767b96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -268,7 +268,7 @@ public class HeartbeatThread implements Runnable {
if (!ClusterUtils.isNodeEquals(
localMember.getThisNode(), localMember.getPartitionGroup().getHeader().node)) {
- long electionWait = getElectionRandomWaitMs();
+ long electionWait = getElectionRandomWaitMs() + 5000;
logger.info(
"{}: Sleep {}ms before the first election as this node is not the preferred " + "leader",
memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 3114182960..11f0cf0a4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.IndirectLogDispatcher;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.LogParser;
@@ -993,21 +994,27 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
public DataMemberReport genReport() {
long prevLastLogIndex = lastReportedLogIndex;
lastReportedLogIndex = logManager.getLastLogIndex();
- return new DataMemberReport(
- character,
- leader.get(),
- term.get(),
- logManager.getLastLogTerm(),
- lastReportedLogIndex,
- logManager.getCommitLogIndex(),
- logManager.getCommitLogTerm(),
- getHeader(),
- readOnly,
- NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
- lastHeartbeatReceivedTime,
- prevLastLogIndex,
- logManager.getMaxHaveAppliedCommitIndex(),
- logRelay != null ? logRelay.first() : null);
+ DataMemberReport dataMemberReport =
+ new DataMemberReport(
+ character,
+ leader.get(),
+ term.get(),
+ logManager.getLastLogTerm(),
+ lastReportedLogIndex,
+ logManager.getCommitLogIndex(),
+ logManager.getCommitLogTerm(),
+ getHeader(),
+ readOnly,
+ NodeStatusManager.getINSTANCE().getLastResponseLatency(getHeader().getNode()),
+ lastHeartbeatReceivedTime,
+ prevLastLogIndex,
+ logManager.getMaxHaveAppliedCommitIndex(),
+ logRelay != null ? logRelay.first() : null);
+ if (character == NodeCharacter.LEADER && config.isUseIndirectBroadcasting()) {
+ dataMemberReport.setDirectToIndirectFollowerMap(
+ ((IndirectLogDispatcher) getLogDispatcher()).getDirectToIndirectFollowerMap());
+ }
+ return dataMemberReport;
}
@TestOnly
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 0d605d3f46..f554a69048 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -72,8 +72,6 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
-import org.apache.iotdb.cluster.server.monitor.NodeStatus;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -133,7 +131,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
-import static org.apache.iotdb.cluster.log.LogDispatcher.concurrentSenderNum;
/**
* RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
@@ -631,63 +628,6 @@ public abstract class RaftMember implements RaftMemberMBean {
return result;
}
- public void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
- request.setIsFromLeader(false);
- request.setSubReceiversIsSet(false);
- for (Node subFollower : subFollowers) {
- Client syncClient = null;
- try {
- if (config.isUseAsyncServer()) {
- getAsyncClient(subFollower)
- .appendEntry(request, new IndirectAppendHandler(subFollower, request));
- } else {
- long operationStartTime = Statistic.RAFT_RECEIVER_RELAY_LOG.getOperationStartTime();
- syncClient = getSyncClient(subFollower);
-
- int concurrentSender = concurrentSenderNum.incrementAndGet();
- Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
- syncClient.appendEntry(request);
- concurrentSenderNum.decrementAndGet();
-
- long sendLogTime =
- Statistic.RAFT_RECEIVER_RELAY_LOG.calOperationCostTimeFromStart(operationStartTime);
- NodeStatus nodeStatus = NodeStatusManager.getINSTANCE().getNodeStatus(subFollower, false);
- nodeStatus.getSendEntryLatencySum().addAndGet(sendLogTime);
- nodeStatus.getSendEntryNum().incrementAndGet();
- }
- } catch (TException e) {
- logger.error("Cannot send {} to {}", request, subFollower, e);
- } finally {
- if (syncClient != null) {
- ClientUtils.putBackSyncClient(syncClient);
- }
- }
- }
- }
-
- public void sendLogsToSubFollowers(AppendEntriesRequest request, List<Node> subFollowers) {
- request.setIsFromLeader(false);
- request.setSubReceiversIsSet(false);
- for (Node subFollower : subFollowers) {
- Client syncClient = null;
- try {
- if (config.isUseAsyncServer()) {
- getAsyncClient(subFollower)
- .appendEntries(request, new IndirectAppendHandler(subFollower, request));
- } else {
- syncClient = getSyncClient(subFollower);
- syncClient.appendEntries(request);
- }
- } catch (TException e) {
- logger.error("Cannot send {} to {}", request, subFollower, e);
- } finally {
- if (syncClient != null) {
- ClientUtils.putBackSyncClient(syncClient);
- }
- }
- }
- }
-
/** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
public AppendEntryResult appendEntries(AppendEntriesRequest request)
throws UnknownLogTypeException {
@@ -1860,20 +1800,22 @@ public abstract class RaftMember implements RaftMemberMBean {
@SuppressWarnings("java:S2445")
protected void commitLog(Log log) throws LogExecutionException {
long startTime;
- if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
- startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
- synchronized (logManager) {
- Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
- startTime);
- if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
- startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
- logManager.commitTo(log.getCurrLogIndex());
- Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
- }
- startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
- }
- Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
- }
+ // if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+ // startTime =
+ // Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
+ // synchronized (logManager) {
+ // Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+ // startTime);
+ // if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+ // startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+ // logManager.commitTo(log.getCurrLogIndex());
+ //
+ // Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
+ // }
+ // startTime = Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.getOperationStartTime();
+ // }
+ // Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
+ // }
// when using async applier, the log here may not be applied. To return the execution
// result, we must wait until the log is applied.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index cc284f0bb4..2734b19b7b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.rpc.RpcTransportFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* A node report collects the current runtime information of the local node, which contains: 1. The
@@ -204,6 +205,7 @@ public class NodeReport {
public static class DataMemberReport extends RaftMemberReport {
RaftNode header;
long headerLatency;
+ private Map<Node, List<Node>> directToIndirectFollowerMap;
public DataMemberReport(
NodeCharacter character,
@@ -239,40 +241,49 @@ public class NodeReport {
@Override
public String toString() {
- return "DataMemberReport{"
- + "header="
- + header.getNode()
- + ", raftId="
- + header.getRaftId()
- + ", character="
- + character
- + ", Leader="
- + leader
- + ", term="
- + term
- + ", lastLogTerm="
- + lastLogTerm
- + ", lastLogIndex="
- + lastLogIndex
- + ", commitIndex="
- + commitIndex
- + ", commitTerm="
- + commitTerm
- + ", appliedLogIndex="
- + maxAppliedLogIndex
- + ", readOnly="
- + isReadOnly
- + ", nextToRelay="
- + nextToRelay
- + ", headerLatency="
- + headerLatency
- + "ns"
- + ", lastHeartbeat="
- + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
- + "ms ago"
- + ", logIncrement="
- + (lastLogIndex - prevLastLogIndex)
- + '}';
+ String s =
+ "DataMemberReport{"
+ + "header="
+ + header.getNode()
+ + ", raftId="
+ + header.getRaftId()
+ + ", character="
+ + character
+ + ", Leader="
+ + leader
+ + ", term="
+ + term
+ + ", lastLogTerm="
+ + lastLogTerm
+ + ", lastLogIndex="
+ + lastLogIndex
+ + ", commitIndex="
+ + commitIndex
+ + ", commitTerm="
+ + commitTerm
+ + ", appliedLogIndex="
+ + maxAppliedLogIndex
+ + ", readOnly="
+ + isReadOnly
+ + ", nextToRelay="
+ + nextToRelay
+ + ", headerLatency="
+ + headerLatency
+ + "ns"
+ + ", lastHeartbeat="
+ + (System.currentTimeMillis() - lastHeartbeatReceivedTime)
+ + "ms ago"
+ + ", logIncrement="
+ + (lastLogIndex - prevLastLogIndex);
+ if (directToIndirectFollowerMap != null) {
+ s = s + ", relayMap=" + directToIndirectFollowerMap;
+ }
+ s = s + '}';
+ return s;
+ }
+
+ public void setDirectToIndirectFollowerMap(Map<Node, List<Node>> directToIndirectFollowerMap) {
+ this.directToIndirectFollowerMap = directToIndirectFollowerMap;
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 6f4039456b..e1456e307b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.cluster.server.monitor;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
+import org.apache.iotdb.cluster.utils.WindowStatistic;
import java.util.Date;
import java.util.Objects;
@@ -57,6 +58,9 @@ public class NodeStatus implements Comparable<NodeStatus> {
private AtomicLong sendEntryNum = new AtomicLong();
private AtomicLong sendEntryLatencySum = new AtomicLong();
+ private WindowStatistic sendEntryLatencyStatistic = new WindowStatistic();
+
+ private double relayWeight;
// TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
@Override
@@ -107,6 +111,14 @@ public class NodeStatus implements Comparable<NodeStatus> {
this.lastResponseLatency = lastResponseLatency;
}
+ public double getRelayWeight() {
+ return relayWeight;
+ }
+
+ public void setRelayWeight(double relayWeight) {
+ this.relayWeight = relayWeight;
+ }
+
public void activate() {
isActivated = true;
}
@@ -129,6 +141,10 @@ public class NodeStatus implements Comparable<NodeStatus> {
return sendEntryLatencySum;
}
+ public WindowStatistic getSendEntryLatencyStatistic() {
+ return sendEntryLatencyStatistic;
+ }
+
@Override
public String toString() {
return "NodeStatus{"
@@ -148,6 +164,10 @@ public class NodeStatus implements Comparable<NodeStatus> {
+ sendEntryLatencySum
+ ", sendEntryLatencyAvg="
+ (sendEntryLatencySum.get() * 1.0 / sendEntryNum.get())
+ + ", latestSendEntryLatencyAvg="
+ + sendEntryLatencyStatistic.getAvg()
+ + ", nodeRelayWeight="
+ + relayWeight
+ '}';
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index eb79a84d2a..4edcbb8d57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.util.Collections;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -47,7 +48,7 @@ public class NodeStatusManager {
private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
// a status is considered stale if it is older than one minute and should be updated
- private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 10L;
+ private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 1 * 1000L;
private static final NodeStatusManager INSTANCE = new NodeStatusManager();
private MetaGroupMember metaGroupMember;
@@ -185,4 +186,10 @@ public class NodeStatusManager {
public Map<Node, NodeStatus> getNodeStatusMap() {
return Collections.unmodifiableMap(nodeStatusMap);
}
+
+ public void report() {
+ for (Entry<Node, NodeStatus> nodeNodeStatusEntry : getNodeStatusMap().entrySet()) {
+ logger.info("{}: {}", nodeNodeStatusEntry.getKey(), nodeNodeStatusEntry.getValue());
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 72892bdb56..7ded0c910e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server.monitor;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.utils.WindowStatistic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -334,6 +335,7 @@ public class Timer {
RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
+ RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
@@ -344,6 +346,7 @@ public class Timer {
String blockName;
AtomicLong sum = new AtomicLong(0);
AtomicLong counter = new AtomicLong(0);
+ private WindowStatistic latestWindow = new WindowStatistic();
long max;
double scale;
boolean valid;
@@ -370,6 +373,7 @@ public class Timer {
sum.addAndGet(val);
counter.incrementAndGet();
max = Math.max(max, val);
+ latestWindow.add(val);
}
}
@@ -399,6 +403,7 @@ public class Timer {
sum.set(0);
counter.set(0);
max = 0;
+ latestWindow.reset();
}
/** WARN: no current safety guarantee. */
@@ -413,12 +418,28 @@ public class Timer {
double s = sum.get() / scale;
long cnt = counter.get();
double avg = s / cnt;
- return String.format("%s - %s: %.2f, %d, %.2f, %d", className, blockName, s, cnt, avg, max);
+ return String.format(
+ "%s - %s: %.2f, %d, %.2f, %d, %.2f",
+ className, blockName, s, cnt, avg, max, latestWindow.getAvg());
}
public long getCnt() {
return counter.get();
}
+
+ public long getSum() {
+ return sum.get();
+ }
+
+ public static long getTotalFanout() {
+ return Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt();
+ }
+
+ public static double getSendLatency() {
+ return (Statistic.RAFT_SENDER_SEND_LOG.getSum() + Statistic.RAFT_RECEIVER_RELAY_LOG.getSum())
+ * 1.0
+ / (Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt());
+ }
}
public static String getReport() {
@@ -427,6 +448,11 @@ public class Timer {
}
StringBuilder result = new StringBuilder();
printTo(Statistic.ROOT, result);
+ result.append(System.lineSeparator());
+ result.append(
+ String.format(
+ "Total request fanout: %d, send entry latency: %f",
+ Statistic.getTotalFanout(), Statistic.getSendLatency()));
return result.toString();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index f8df9d8d83..7887cc8084 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -161,9 +161,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
*/
@Override
public TNodeStatus queryNodeStatus() {
- return new TNodeStatus()
- .setFanoutRequestNum(
- Statistic.RAFT_SENDER_SEND_LOG.getCnt() + Statistic.RAFT_RECEIVER_RELAY_LOG.getCnt());
+ return new TNodeStatus().setFanoutRequestNum(Statistic.getTotalFanout());
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java
new file mode 100644
index 0000000000..854d974a95
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/WindowStatistic.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.utils;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+public class WindowStatistic {
+
+ private static final int DEFAULT_LENGTH = 1000;
+
+ private Queue<Long> values = new ArrayDeque<>();
+ private volatile long sum;
+ private volatile int cnt;
+ private int windowLength = DEFAULT_LENGTH;
+
+ public synchronized void add(long val) {
+ values.add(val);
+ sum += val;
+ cnt++;
+
+ if (cnt > windowLength) {
+ Long poll = values.poll();
+ sum -= poll;
+ cnt--;
+ }
+ }
+
+ public double getAvg() {
+ return cnt == 0 ? 0.0 : sum * 1.0 / cnt;
+ }
+
+ public synchronized void reset() {
+ values.clear();
+ cnt = 0;
+ sum = 0;
+ }
+}