You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/08/15 13:04:39 UTC

[iotdb] branch beyyes/remove_datanode_precedure created (now 97b38b2db8)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a change to branch beyyes/remove_datanode_precedure
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 97b38b2db8 add markDataNodeAsRemovingAndBroadCast method in RemoveDataNodeProcedure

This branch includes the following new commits:

     new 97b38b2db8 add markDataNodeAsRemovingAndBroadCast method in RemoveDataNodeProcedure

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add markDataNodeAsRemovingAndBroadCast method in RemoveDataNodeProcedure

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/remove_datanode_precedure
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 97b38b2db82951d1b800598c0bbf1ec56ebff7ac
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Mon Aug 15 21:04:20 2022 +0800

    add markDataNodeAsRemovingAndBroadCast method in RemoveDataNodeProcedure
---
 .../iotdb/confignode/conf/ConfigNodeConstant.java  |  7 +++++
 .../manager/load/LoadManagerMetrics.java           | 36 ++++++++++++----------
 .../load/heartbeat/DataNodeHeartbeatCache.java     |  8 +++--
 .../iotdb/confignode/persistence/NodeInfo.java     | 12 +++++---
 .../procedure/env/ConfigNodeProcedureEnv.java      | 17 ++++++++++
 .../procedure/impl/AddConfigNodeProcedure.java     |  4 +--
 .../procedure/impl/RemoveDataNodeProcedure.java    |  6 ++--
 .../apache/iotdb/commons/cluster/NodeStatus.java   | 14 ++++-----
 8 files changed, 70 insertions(+), 34 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
index 425becaad4..4f58c58331 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
@@ -38,6 +38,13 @@ public class ConfigNodeConstant {
 
   public static final int MIN_SUPPORTED_JDK_VERSION = 8;
 
+  /** These variables are only used for cluster gauge metrics */
+  public static final String METRIC_TAG_TOTAL = "total";
+
+  public static final String METRIC_STATUS_REGISTER = "Registered";
+  public static final String METRIC_STATUS_ONLINE = "Online";
+  public static final String METRIC_STATUS_UNKNOWN = "Unknown";
+
   private ConfigNodeConstant() {
     // empty constructor
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
index d1547f8e76..f359435fb2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManagerMetrics.java
@@ -36,6 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_STATUS_ONLINE;
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_STATUS_UNKNOWN;
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_TAG_TOTAL;
+
 /** This class collates metrics about loadManager */
 public class LoadManagerMetrics {
 
@@ -148,9 +152,9 @@ public class LoadManagerMetrics {
             this,
             o -> getRunningConfigNodesNum(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Online.toString());
+            METRIC_STATUS_ONLINE);
 
     MetricService.getInstance()
         .getOrCreateAutoGauge(
@@ -159,9 +163,9 @@ public class LoadManagerMetrics {
             this,
             o -> getRunningDataNodesNum(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Online.toString());
+            METRIC_STATUS_ONLINE);
 
     MetricService.getInstance()
         .getOrCreateAutoGauge(
@@ -170,9 +174,9 @@ public class LoadManagerMetrics {
             this,
             o -> getUnknownConfigNodesNum(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Unknown.toString());
+            METRIC_STATUS_UNKNOWN);
 
     MetricService.getInstance()
         .getOrCreateAutoGauge(
@@ -181,9 +185,9 @@ public class LoadManagerMetrics {
             this,
             o -> getUnknownDataNodesNum(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Unknown.toString());
+            METRIC_STATUS_UNKNOWN);
   }
 
   /**
@@ -227,33 +231,33 @@ public class LoadManagerMetrics {
             MetricType.GAUGE,
             Metric.CONFIG_NODE.toString(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Online.toString());
+            METRIC_STATUS_ONLINE);
     MetricService.getInstance()
         .remove(
             MetricType.GAUGE,
             Metric.DATA_NODE.toString(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Online.toString());
+            METRIC_STATUS_ONLINE);
     MetricService.getInstance()
         .remove(
             MetricType.GAUGE,
             Metric.CONFIG_NODE.toString(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Unknown.toString());
+            METRIC_STATUS_UNKNOWN);
     MetricService.getInstance()
         .remove(
             MetricType.GAUGE,
             Metric.DATA_NODE.toString(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Unknown.toString());
+            METRIC_STATUS_UNKNOWN);
   }
 
   private NodeManager getNodeManager() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index 6e83e7a27d..c3f299b888 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -28,7 +28,7 @@ public class DataNodeHeartbeatCache implements INodeCache {
   // TODO: This class might be split into DataNodeCache and ConfigNodeCache
 
   // Cache heartbeat samples
-  private static final int maximumWindowSize = 100;
+  private static final int MAXIMUM_WINDOW_SIZE = 100;
   private final LinkedList<NodeHeartbeatSample> slidingWindow;
 
   // For guiding queries, the higher the score the higher the load
@@ -43,6 +43,10 @@ public class DataNodeHeartbeatCache implements INodeCache {
     this.status = NodeStatus.Unknown;
   }
 
+  public void setNodeStatus(NodeStatus status) {
+    this.status = status;
+  }
+
   @Override
   public void cacheHeartbeatSample(NodeHeartbeatSample newHeartbeatSample) {
     synchronized (slidingWindow) {
@@ -53,7 +57,7 @@ public class DataNodeHeartbeatCache implements INodeCache {
         slidingWindow.add(newHeartbeatSample);
       }
 
-      if (slidingWindow.size() > maximumWindowSize) {
+      if (slidingWindow.size() > MAXIMUM_WINDOW_SIZE) {
         slidingWindow.removeFirst();
       }
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index c749f2d5b2..7a0bfa72ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -67,6 +66,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_STATUS_REGISTER;
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.METRIC_TAG_TOTAL;
+
 /**
  * The NodeInfo stores cluster node information. The cluster node information including: 1. DataNode
  * information 2. ConfigNode information
@@ -109,9 +111,9 @@ public class NodeInfo implements SnapshotProcessor {
             registeredConfigNodes,
             o -> getRegisteredConfigNodeCount(),
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Registered.toString());
+            METRIC_STATUS_REGISTER);
     MetricService.getInstance()
         .getOrCreateAutoGauge(
             Metric.DATA_NODE.toString(),
@@ -119,9 +121,9 @@ public class NodeInfo implements SnapshotProcessor {
             registeredDataNodes,
             Map::size,
             Tag.NAME.toString(),
-            "total",
+            METRIC_TAG_TOTAL,
             Tag.STATUS.toString(),
-            NodeStatus.Registered.toString());
+            METRIC_STATUS_REGISTER);
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 9daa2d4ea7..553f5c02c5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.confignode.procedure.env;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
@@ -34,6 +36,7 @@ import org.apache.iotdb.confignode.exception.AddConsensusGroupException;
 import org.apache.iotdb.confignode.exception.AddPeerException;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
@@ -288,6 +291,20 @@ public class ConfigNodeProcedureEnv {
             configManager.getNodeManager().getRegisteredConfigNodes());
   }
 
+  /**
+   * Mark the given datanode as removing status, and broadcast the region map, to avoid read or
+   * write request routing to this node.
+   *
+   * @param dataNodeLocation the datanode to be marked as removing status
+   */
+  public void markDataNodeAsRemovingAndBroadCast(TDataNodeLocation dataNodeLocation) {
+    int dataNodeId = dataNodeLocation.getDataNodeId();
+    DataNodeHeartbeatCache c =
+        (DataNodeHeartbeatCache) configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
+    c.setNodeStatus(NodeStatus.Removing);
+    configManager.getLoadManager().broadcastLatestRegionRouteMap();
+  }
+
   public LockQueue getNodeLock() {
     return nodeLock;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
index cf244fb52d..82131683f6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
 /** add config node procedure */
 public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeState> {
   private static final Logger LOG = LoggerFactory.getLogger(AddConfigNodeProcedure.class);
-  private static final int retryThreshold = 5;
+  private static final int RETRY_THRESHOLD = 5;
 
   private TConfigNodeLocation tConfigNodeLocation;
 
@@ -85,7 +85,7 @@ public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeS
             tConfigNodeLocation,
             state,
             e);
-        if (getCycles() > retryThreshold) {
+        if (getCycles() > RETRY_THRESHOLD) {
           setFailure(new ProcedureException("State stuck at " + state));
         }
       }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
index cf327ec8f8..588e5cc1ed 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
@@ -40,7 +40,7 @@ import java.util.List;
 /** remove data node procedure */
 public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNodeState> {
   private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
-  private static final int retryThreshold = 5;
+  private static final int RETRY_THRESHOLD = 5;
 
   private TDataNodeLocation disableDataNodeLocation;
 
@@ -63,6 +63,8 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
     try {
       switch (state) {
         case REMOVE_DATA_NODE_PREPARE:
+          // TODO: Invalid the DataNode as removing status and broadcast
+          env.markDataNodeAsRemovingAndBroadCast(disableDataNodeLocation);
           execDataNodeRegionIds =
               env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation);
           LOG.info("DataNode region id is {}", execDataNodeRegionIds);
@@ -90,7 +92,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
             disableDataNodeLocation,
             state,
             e);
-        if (getCycles() > retryThreshold) {
+        if (getCycles() > RETRY_THRESHOLD) {
           setFailure(new ProcedureException("State stuck at " + state));
         }
       }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
index 8d2a3577a2..82cc285cf3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
@@ -20,14 +20,14 @@ package org.apache.iotdb.commons.cluster;
 
 /** Node status for showing cluster */
 public enum NodeStatus {
-  // Node registered
-  Registered("Registered"),
-  // Node online ,right now Online is Running
-  Online("Online"),
-  // Node running properly
+  /** Node running properly */
   Running("Running"),
-  // Node connection failure
-  Unknown("Unknown");
+
+  /** Node connection failure */
+  Unknown("Unknown"),
+
+  /** Node is in removing */
+  Removing("Removing");
 
   private final String status;