You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/12/30 07:28:18 UTC

[iotdb] branch master updated: [IOTDB-4630] Implement observer pattern in LoadBalancing framework (#8483)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6410ffad81 [IOTDB-4630] Implement observer pattern in LoadBalancing framework (#8483)
6410ffad81 is described below

commit 6410ffad81e33e8a5ca1d5ca93be03e3c0cf478f
Author: Itami Sho <42...@users.noreply.github.com>
AuthorDate: Fri Dec 30 15:28:12 2022 +0800

    [IOTDB-4630] Implement observer pattern in LoadBalancing framework (#8483)
---
 .../confignode/manager/ClusterSchemaManager.java   | 15 +++++++++
 .../iotdb/confignode/manager/ConfigManager.java    |  2 +-
 .../iotdb/confignode/manager/SyncManager.java      | 15 +++++++++
 .../iotdb/confignode/manager/load/LoadManager.java | 35 +++++++++++++++----
 .../manager/node/heartbeat/BaseNodeCache.java      |  4 +++
 .../iotdb/confignode/manager/observer/IEvent.java  | 21 ++++++++++++
 .../manager/observer/NodeStatisticsEvent.java      | 39 ++++++++++++++++++++++
 7 files changed, 124 insertions(+), 7 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 49f1254cd0..48fc65ae91 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.confignode.consensus.response.TemplateInfoResp;
 import org.apache.iotdb.confignode.consensus.response.TemplateSetInfoResp;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
@@ -78,6 +79,8 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -713,6 +716,18 @@ public class ClusterSchemaManager {
     return getConsensusManager().write(new DropSchemaTemplatePlan(templateName)).getStatus();
   }
 
+  /**
+   * When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post
+   * the different NodeStatstics event to SyncManager and ClusterSchemaManager.
+   *
+   * @param nodeStatisticsEvent nodeStatistics that changed in a heartbeat loop
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) {
+    // TODO
+  }
+
   private NodeManager getNodeManager() {
     return configManager.getNodeManager();
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 379c0f32bd..168a35af12 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -250,9 +250,9 @@ public class ConfigManager implements IManager {
     this.procedureManager = new ProcedureManager(this, procedureInfo);
     this.udfManager = new UDFManager(this, udfInfo);
     this.triggerManager = new TriggerManager(this, triggerInfo);
-    this.loadManager = new LoadManager(this);
     this.syncManager = new SyncManager(this, syncInfo);
     this.cqManager = new CQManager(this);
+    this.loadManager = new LoadManager(this);
 
     this.retryFailedTasksThread = new RetryFailedTasksThread(this);
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
index e0a642c357..92b6fe2b7f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/SyncManager.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlan;
 import org.apache.iotdb.confignode.consensus.response.PipeResp;
 import org.apache.iotdb.confignode.consensus.response.PipeSinkResp;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
 import org.apache.iotdb.confignode.persistence.sync.ClusterSyncInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
@@ -52,6 +53,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TOperatePipeOnDataNodeReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -281,6 +284,18 @@ public class SyncManager {
     return clientHandler.getResponseMap();
   }
 
+  /**
+   * When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post
+   * the different NodeStatstics event to SyncManager and ClusterSchemaManager.
+   *
+   * @param nodeStatisticsEvent nodeStatistics that changed in a heartbeat loop
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) {
+    // TODO
+  }
+
   // endregion
 
   private ConsensusManager getConsensusManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 59f28aa529..b23d2fa11f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -48,18 +48,23 @@ import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupStatistics;
 import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionStatistics;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.tsfile.utils.Pair;
 
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -91,12 +96,19 @@ public class LoadManager {
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LoadStatistics-Service");
   private final Object scheduleMonitor = new Object();
 
+  private final EventBus eventBus =
+      new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5));
+
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
 
     this.regionBalancer = new RegionBalancer(configManager);
     this.partitionBalancer = new PartitionBalancer(configManager);
     this.routeBalancer = new RouteBalancer(configManager);
+
+    eventBus.register(configManager.getClusterSchemaManager());
+    eventBus.register(configManager.getSyncManager());
+
     MetricService.getInstance().addMetricSet(new LoadManagerMetrics(configManager));
   }
 
@@ -185,20 +197,26 @@ public class LoadManager {
     // Broadcast the RegionRouteMap if some LoadStatistics has changed
     boolean isNeedBroadcast = false;
 
-    // Update NodeStatistics
-    Map<Integer, NodeStatistics> differentNodeStatisticsMap = new ConcurrentHashMap<>();
+    // Update NodeStatistics:
+    // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one
+    // means the previous NodeStatistics
+    Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap =
+        new ConcurrentHashMap<>();
     getNodeManager()
         .getNodeCacheMap()
         .forEach(
             (nodeId, nodeCache) -> {
+              NodeStatistics preNodeStatistics = nodeCache.getPreviousStatistics().deepCopy();
               if (nodeCache.periodicUpdate()) {
                 // Update and record the changed NodeStatistics
-                differentNodeStatisticsMap.put(nodeId, nodeCache.getStatistics());
+                differentNodeStatisticsMap.put(
+                    nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics));
               }
             });
     if (!differentNodeStatisticsMap.isEmpty()) {
       isNeedBroadcast = true;
       recordNodeStatistics(differentNodeStatisticsMap);
+      eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
     }
 
     // Update RegionGroupStatistics
@@ -230,14 +248,15 @@ public class LoadManager {
     }
   }
 
-  private void recordNodeStatistics(Map<Integer, NodeStatistics> differentNodeStatisticsMap) {
+  private void recordNodeStatistics(
+      Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) {
     LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
-    for (Map.Entry<Integer, NodeStatistics> nodeCacheEntry :
+    for (Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> nodeCacheEntry :
         differentNodeStatisticsMap.entrySet()) {
       LOGGER.info(
           "[UpdateLoadStatistics]\t {}={}",
           "nodeId{" + nodeCacheEntry.getKey() + "}",
-          nodeCacheEntry.getValue());
+          nodeCacheEntry.getValue().left);
     }
   }
 
@@ -329,4 +348,8 @@ public class LoadManager {
   private PartitionManager getPartitionManager() {
     return configManager.getPartitionManager();
   }
+
+  public EventBus getEventBus() {
+    return eventBus;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
index 3559c47898..ca185f7039 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/heartbeat/BaseNodeCache.java
@@ -137,4 +137,8 @@ public abstract class BaseNodeCache {
   public NodeStatistics getStatistics() {
     return currentStatistics;
   }
+
+  public NodeStatistics getPreviousStatistics() {
+    return previousStatistics;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
new file mode 100644
index 0000000000..d9e8445e74
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.observer;
+
+public interface IEvent {}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
new file mode 100644
index 0000000000..a38adafd74
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.observer;
+
+import org.apache.iotdb.confignode.manager.node.heartbeat.NodeStatistics;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Map;
+
+public class NodeStatisticsEvent implements IEvent {
+
+  // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one means
+  // the previous NodeStatistics
+  private Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap;
+
+  public NodeStatisticsEvent(Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap) {
+    this.nodeStatisticsMap = nodeStatisticsMap;
+  }
+
+  public Map<Integer, Pair<NodeStatistics, NodeStatistics>> getNodeStatisticsMap() {
+    return nodeStatisticsMap;
+  }
+}