You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/04/29 01:32:16 UTC

[iotdb] branch expr_plus updated: add weightbase in config

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_plus by this push:
     new 3b79e31588 add weightbase in config
3b79e31588 is described below

commit 3b79e3158849332ebd7bab70f31854aea1cde459
Author: jt <jt...@163.com>
AuthorDate: Fri Apr 29 09:31:30 2022 +0800

    add weightbase in config
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     | 28 ++++++++++++++++++++++
 .../apache/iotdb/cluster/config/ClusterConfig.java | 16 ++++++++++---
 .../iotdb/cluster/config/ClusterDescriptor.java    |  7 +++++-
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 18 ++++++++++----
 .../cluster/server/monitor/NodeStatusManager.java  |  9 ++++++-
 5 files changed, 68 insertions(+), 10 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index f399c78b2b..d7c519c21d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
@@ -80,11 +81,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
 import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
@@ -297,6 +303,28 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     public void run() {
       logger.info(Timer.getReport());
       NodeStatusManager.getINSTANCE().report();
+      List<Entry<Node, NodeStatus>> nodeStatusEntryList =
+          new ArrayList<>(NodeStatusManager.getINSTANCE().getNodeStatusMap().entrySet());
+      nodeStatusEntryList.sort(Comparator.comparing(e -> e.getKey().getInternalIp()));
+      List<Long> sendNums =
+          nodeStatusEntryList.stream()
+              .map(e -> e.getValue().getSendEntryNum().get())
+              .collect(Collectors.toList());
+      List<Long> sendLatencySums =
+          nodeStatusEntryList.stream()
+              .map(e -> e.getValue().getSendEntryLatencySum().get())
+              .collect(Collectors.toList());
+      List<Double> sendLatencyAvg =
+          nodeStatusEntryList.stream()
+              .map(
+                  e ->
+                      e.getValue().getSendEntryLatencySum().get()
+                          * 1.0
+                          / e.getValue().getSendEntryNum().get())
+              .collect(Collectors.toList());
+      logger.info("Send nums: {}", sendNums);
+      logger.info("Send latency sum: {}", sendLatencySums);
+      logger.info("Send latency avg: {}", sendLatencyAvg);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 0f265dee76..2a2a35a554 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -192,7 +192,9 @@ public class ClusterConfig {
 
   private int relaySenderNum = 8;
 
-  private int relayFirstLevelSize = 1;
+  private double relayFirstLevelSize = 1.0;
+
+  private double relayWeightBase = 1.0;
 
   private boolean optimizeIndirectBroadcasting = false;
 
@@ -603,11 +605,19 @@ public class ClusterConfig {
     this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting;
   }
 
-  public int getRelayFirstLevelSize() {
+  public double getRelayFirstLevelSize() {
     return relayFirstLevelSize;
   }
 
-  public void setRelayFirstLevelSize(int relayFirstLevelSize) {
+  public void setRelayFirstLevelSize(double relayFirstLevelSize) {
     this.relayFirstLevelSize = relayFirstLevelSize;
   }
+
+  public double getRelayWeightBase() {
+    return relayWeightBase;
+  }
+
+  public void setRelayWeightBase(double relayWeightBase) {
+    this.relayWeightBase = relayWeightBase;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 8e9f8934bc..bd8675d742 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -357,10 +357,15 @@ public class ClusterDescriptor {
                 "relay_sender_number", String.valueOf(config.getRelaySenderNum()))));
 
     config.setRelayFirstLevelSize(
-        Integer.parseInt(
+        Double.parseDouble(
             properties.getProperty(
                 "relay_first_level_size", String.valueOf(config.getRelayFirstLevelSize()))));
 
+    config.setRelayWeightBase(
+        Double.parseDouble(
+            properties.getProperty(
+                "relay_weight_base", String.valueOf(config.getRelayWeightBase()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index ecbd1dbabe..7102fd799a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.cluster.log;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -38,6 +37,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -54,6 +54,7 @@ public class IndirectLogDispatcher extends LogDispatcher {
   private Map<Node, List<Node>> directToIndirectFollowerMap = new ConcurrentHashMap<>();
   private long dispatchedEntryNum;
   private int recalculateMapInterval = 1;
+  private Random random = new Random();
 
   public IndirectLogDispatcher(RaftMember member) {
     super(member);
@@ -100,7 +101,10 @@ public class IndirectLogDispatcher extends LogDispatcher {
     NodeStatus status = NodeStatusManager.getINSTANCE().getNodeStatus(node, false);
     //    return 1.0
     //        / (status.getStatus().fanoutRequestNum + 1);
-    double pow = Math.pow(100.0, maxLatency / status.getSendEntryLatencyStatistic().getAvg());
+    double pow =
+        Math.pow(
+            ClusterDescriptor.getInstance().getConfig().getRelayWeightBase(),
+            maxLatency / status.getSendEntryLatencyStatistic().getAvg());
     status.setRelayWeight(pow);
     return pow;
   }
@@ -114,12 +118,16 @@ public class IndirectLogDispatcher extends LogDispatcher {
     nodesEnabled.clear();
     directToIndirectFollowerMap.clear();
 
-    int firstLevelSize = ClusterDescriptor.getInstance().getConfig().getRelayFirstLevelSize();
+    double firstLevelSizeDouble =
+        ClusterDescriptor.getInstance().getConfig().getRelayFirstLevelSize();
+    int firstLevelSize = (int) firstLevelSizeDouble;
+    double firstLevelRemain = firstLevelSizeDouble - firstLevelSize;
+    if (random.nextDouble() < firstLevelRemain) {
+      firstLevelSize++;
+    }
     List<Node> firstLevelNodes;
 
     if (ClusterDescriptor.getInstance().getConfig().isOptimizeIndirectBroadcasting()) {
-      QueryCoordinator instance = QueryCoordinator.getINSTANCE();
-      orderedNodes = instance.reorderNodes(allNodes);
       long thisLoad = Statistic.getTotalFanout() + 1;
       double maxLatency =
           NodeStatusManager.getINSTANCE()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index 4edcbb8d57..f77e56eb2c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -115,7 +115,14 @@ public class NodeStatusManager {
    */
   public NodeStatus getNodeStatus(Node node, boolean tryUpdate) {
     // avoid duplicated computing of concurrent queries
-    NodeStatus nodeStatus = nodeStatusMap.computeIfAbsent(node, n -> new NodeStatus());
+    NodeStatus nodeStatus =
+        nodeStatusMap.computeIfAbsent(
+            node,
+            n -> {
+              NodeStatus status = new NodeStatus();
+              status.setStatus(new TNodeStatus());
+              return status;
+            });
     if (metaGroupMember == null || node.equals(metaGroupMember.getThisNode())) {
       return nodeStatus;
     }