You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/04/29 01:32:16 UTC
[iotdb] branch expr_plus updated: add weightbase in config
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_plus by this push:
new 3b79e31588 add weightbase in config
3b79e31588 is described below
commit 3b79e3158849332ebd7bab70f31854aea1cde459
Author: jt <jt...@163.com>
AuthorDate: Fri Apr 29 09:31:30 2022 +0800
add weightbase in config
---
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 28 ++++++++++++++++++++++
.../apache/iotdb/cluster/config/ClusterConfig.java | 16 ++++++++++---
.../iotdb/cluster/config/ClusterDescriptor.java | 7 +++++-
.../iotdb/cluster/log/IndirectLogDispatcher.java | 18 ++++++++++----
.../cluster/server/monitor/NodeStatusManager.java | 9 ++++++-
5 files changed, 68 insertions(+), 10 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index f399c78b2b..d7c519c21d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
@@ -80,11 +81,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
@@ -297,6 +303,28 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
public void run() {
logger.info(Timer.getReport());
NodeStatusManager.getINSTANCE().report();
+ List<Entry<Node, NodeStatus>> nodeStatusEntryList =
+ new ArrayList<>(NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet());
+ nodeStatusEntryList.sort(Comparator.comparing(e -> e.getKey().getInternalIp()));
+ List<Long> sendNums =
+ nodeStatusEntryList.stream()
+ .map(e -> e.getValue().getSendEntryNum().get())
+ .collect(Collectors.toList());
+ List<Long> sendLatencySums =
+ nodeStatusEntryList.stream()
+ .map(e -> e.getValue().getSendEntryLatencySum().get())
+ .collect(Collectors.toList());
+ List<Double> sendLatencyAvg =
+ nodeStatusEntryList.stream()
+ .map(
+ e ->
+ e.getValue().getSendEntryLatencySum().get()
+ * 1.0
+ / e.getValue().getSendEntryNum().get())
+ .collect(Collectors.toList());
+ logger.info("Send nums: {}", sendNums);
+ logger.info("Send latency sum: {}", sendLatencySums);
+ logger.info("Send latency avg: {}", sendLatencyAvg);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 0f265dee76..2a2a35a554 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -192,7 +192,9 @@ public class ClusterConfig {
private int relaySenderNum = 8;
- private int relayFirstLevelSize = 1;
+ private double relayFirstLevelSize = 1.0;
+
+ private double relayWeightBase = 1.0;
private boolean optimizeIndirectBroadcasting = false;
@@ -603,11 +605,19 @@ public class ClusterConfig {
this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
}
- public int getRelayFirstLevelSize() {
+ public double getRelayFirstLevelSize() {
return relayFirstLevelSize;
}
- public void setRelayFirstLevelSize(int relayFirstLevelSize) {
+ public void setRelayFirstLevelSize(double relayFirstLevelSize) {
this.relayFirstLevelSize = relayFirstLevelSize;
}
+
+ public double getRelayWeightBase() {
+ return relayWeightBase;
+ }
+
+ public void setRelayWeightBase(double relayWeightBase) {
+ this.relayWeightBase = relayWeightBase;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 8e9f8934bc..bd8675d742 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -357,10 +357,15 @@ public class ClusterDescriptor {
"relay_sender_number", String.valueOf(config.getRelaySenderNum()))));
config.setRelayFirstLevelSize(
- Integer.parseInt(
+ Double.parseDouble(
properties.getProperty(
"relay_first_level_size", String.valueOf(config.getRelayFirstLevelSize()))));
+ config.setRelayWeightBase(
+ Double.parseDouble(
+ properties.getProperty(
+ "relay_weight_base", String.valueOf(config.getRelayWeightBase()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index ecbd1dbabe..7102fd799a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.cluster.log;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -38,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -54,6 +54,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
private Map<Node, List<Node>> directToIndirectFollowerMap = new ConcurrentHashMap<>();
private long dispatchedEntryNum;
private int recalculateMapInterval = 1;
+ private Random random = new Random();
public IndirectLogDispatcher(RaftMember member) {
super(member);
@@ -100,7 +101,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
NodeStatus status = NodeStatusManager.getINSTANCE().getNodeStatus(node, false);
// return 1.0
// / (status.getStatus().fanoutRequestNum + 1);
- double pow = Math.pow(100.0, maxLatency / status.getSendEntryLatencyStatistic().getAvg());
+ double pow =
+ Math.pow(
+ ClusterDescriptor.getInstance().getConfig().getRelayWeightBase(),
+ maxLatency / status.getSendEntryLatencyStatistic().getAvg());
status.setRelayWeight(pow);
return pow;
}
@@ -114,12 +118,16 @@ public class IndirectLogDispatcher extends LogDispatcher {
nodesEnabled.clear();
directToIndirectFollowerMap.clear();
- int firstLevelSize = ClusterDescriptor.getInstance().getConfig().getRelayFirstLevelSize();
+ double firstLevelSizeDouble =
+ ClusterDescriptor.getInstance().getConfig().getRelayFirstLevelSize();
+ int firstLevelSize = (int) firstLevelSizeDouble;
+ double firstLevelRemain = firstLevelSizeDouble - firstLevelSize;
+ if (random.nextDouble() < firstLevelRemain) {
+ firstLevelSize++;
+ }
List<Node> firstLevelNodes;
if (ClusterDescriptor.getInstance().getConfig().isOptimizeIndirectBroadcasting()) {
- QueryCoordinator instance = QueryCoordinator.getINSTANCE();
- orderedNodes = instance.reorderNodes(allNodes);
long thisLoad = Statistic.getTotalFanout() + 1;
double maxLatency =
NodeStatusManager.getINSTANCE()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index 4edcbb8d57..f77e56eb2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -115,7 +115,14 @@ public class NodeStatusManager {
*/
public NodeStatus getNodeStatus(Node node, boolean tryUpdate) {
// avoid duplicated computing of concurrent queries
- NodeStatus nodeStatus = nodeStatusMap.computeIfAbsent(node, n -> new NodeStatus());
+ NodeStatus nodeStatus =
+ nodeStatusMap.computeIfAbsent(
+ node,
+ n -> {
+ NodeStatus status = new NodeStatus();
+ status.setStatus(new TNodeStatus());
+ return status;
+ });
if (metaGroupMember == null || node.equals(metaGroupMember.getThisNode())) {
return nodeStatus;
}