You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/01 11:05:47 UTC

[iotdb] branch master updated: [IOTDB-3710] Get latest RegionRouteMap (#6554)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a38063fcd6 [IOTDB-3710] Get latest RegionRouteMap (#6554)
a38063fcd6 is described below

commit a38063fcd6d57e3abdb0a5a8dce2f57e967d5a16
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Fri Jul 1 19:05:41 2022 +0800

    [IOTDB-3710] Get latest RegionRouteMap (#6554)
---
 .../iotdb/confignode/manager/ConfigManager.java    | 23 ++++++--
 .../apache/iotdb/confignode/manager/IManager.java  |  8 +++
 .../thrift/ConfigNodeRPCServiceProcessor.java      | 25 ++++-----
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 65 +++++-----------------
 thrift-commons/src/main/thrift/common.thrift       | 16 +++---
 .../src/main/thrift/confignode.thrift              | 14 +++++
 6 files changed, 75 insertions(+), 76 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 28fc582619..bff49a01ea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -204,9 +205,8 @@ public class ConfigManager implements IManager {
       getLoadManager()
           .getHeartbeatCacheMap()
           .forEach(
-              (nodeId, heartbeatCache) -> {
-                nodeStatus.put(nodeId, heartbeatCache.getNodeStatus().getStatus());
-              });
+              (nodeId, heartbeatCache) ->
+                  nodeStatus.put(nodeId, heartbeatCache.getNodeStatus().getStatus()));
       return new TClusterNodeInfos(status, configNodeLocations, dataNodeInfoLocations, nodeStatus);
     } else {
       return new TClusterNodeInfos(status, new ArrayList<>(), new ArrayList<>(), new HashMap<>());
@@ -758,16 +758,29 @@ public class ConfigManager implements IManager {
         : status;
   }
 
+  @Override
+  public TRegionRouteMapResp getLatestRegionRouteMap() {
+    TSStatus status = confirmLeader();
+    TRegionRouteMapResp resp = new TRegionRouteMapResp(status);
+
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      resp.setTimestamp(System.currentTimeMillis());
+      resp.setRegionRouteMap(getLoadManager().genRealTimeRoutingPolicy());
+    }
+
+    return resp;
+  }
+
   @Override
   public UDFManager getUDFManager() {
     return udfManager;
   }
 
   @Override
-  public DataSet showRegion(GetRegionInfoListPlan getRegionsinfoReq) {
+  public DataSet showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
     TSStatus status = confirmLeader();
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return partitionManager.getRegionInfoList(getRegionsinfoReq);
+      return partitionManager.getRegionInfoList(getRegionInfoListPlan);
     } else {
       RegionInfoListResp regionResp = new RegionInfoListResp();
       regionResp.setStatus(status);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 85464ee875..8ea24c841e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -240,6 +241,13 @@ public interface IManager {
 
   TSStatus flush(TFlushReq req);
 
+  /**
+   * Get the latest RegionRouteMap
+   *
+   * @return TRegionRouteMapResp
+   */
+  TRegionRouteMapResp getLatestRegionRouteMap();
+
   void addMetrics();
 
   /** Show (data/schema) regions */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index bae0da1c98..a5012a695d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
@@ -386,12 +387,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return StatusUtils.OK;
   }
 
-  /**
-   * For leader to remove ConfigNode configuration in consensus layer
-   *
-   * @param configNodeLocation
-   * @return
-   */
+  /** For leader to remove ConfigNode configuration in consensus layer */
   @Override
   public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
     RemoveConfigNodePlan removeConfigNodePlan = new RemoveConfigNodePlan(configNodeLocation);
@@ -407,12 +403,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
     return status;
   }
 
-  /**
-   * For leader to stop ConfigNode
-   *
-   * @param configNodeLocation
-   * @return
-   */
+  /** For leader to stop ConfigNode */
   @Override
   public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
     if (!configManager.getNodeManager().getRegisteredConfigNodes().contains(configNodeLocation)) {
@@ -470,15 +461,21 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
 
   @Override
   public TShowRegionResp showRegion(TShowRegionReq showRegionReq) throws TException {
-    GetRegionInfoListPlan getRegionsinfoPlan =
+    GetRegionInfoListPlan getRegionInfoListPlan =
         new GetRegionInfoListPlan(showRegionReq.getConsensusGroupType());
-    RegionInfoListResp dataSet = (RegionInfoListResp) configManager.showRegion(getRegionsinfoPlan);
+    RegionInfoListResp dataSet =
+        (RegionInfoListResp) configManager.showRegion(getRegionInfoListPlan);
     TShowRegionResp showRegionResp = new TShowRegionResp();
     showRegionResp.setStatus(dataSet.getStatus());
     showRegionResp.setRegionInfoList(dataSet.getRegionInfoList());
     return showRegionResp;
   }
 
+  @Override
+  public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
+    return configManager.getLatestRegionRouteMap();
+  }
+
   @Override
   public long getConfigNodeHeartBeat(long timestamp) throws TException {
     return timestamp;
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 7ba740c9b7..84021bdb1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
@@ -608,34 +609,12 @@ public class ConfigNodeClient
 
   @Override
   public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) throws TException {
-    for (int i = 0; i < RETRY_NUM; i++) {
-      try {
-        TConfigNodeRegisterResp resp = client.registerConfigNode(req);
-        if (!updateConfigNodeLeader(resp.status)) {
-          return resp;
-        }
-      } catch (TException e) {
-        configLeader = null;
-      }
-      reconnect();
-    }
-    throw new TException(MSG_RECONNECTION_FAIL);
+    throw new TException("DataNode to ConfigNode client doesn't support registerConfigNode.");
   }
 
   @Override
   public TSStatus addConsensusGroup(TConfigNodeRegisterResp registerResp) throws TException {
-    for (int i = 0; i < RETRY_NUM; i++) {
-      try {
-        TSStatus status = client.addConsensusGroup(registerResp);
-        if (!updateConfigNodeLeader(status)) {
-          return status;
-        }
-      } catch (TException e) {
-        configLeader = null;
-      }
-      reconnect();
-    }
-    throw new TException(MSG_RECONNECTION_FAIL);
+    throw new TException("DataNode to ConfigNode client doesn't support addConsensusGroup.");
   }
 
   @Override
@@ -645,34 +624,12 @@ public class ConfigNodeClient
 
   @Override
   public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
-    for (int i = 0; i < RETRY_NUM; i++) {
-      try {
-        TSStatus status = client.removeConfigNode(configNodeLocation);
-        if (!updateConfigNodeLeader(status)) {
-          return status;
-        }
-      } catch (TException e) {
-        configLeader = null;
-      }
-      reconnect();
-    }
-    throw new TException(MSG_RECONNECTION_FAIL);
+    throw new TException("DataNode to ConfigNode client doesn't support removeConfigNode.");
   }
 
   @Override
   public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
-    for (int i = 0; i < RETRY_NUM; i++) {
-      try {
-        TSStatus status = client.stopConfigNode(configNodeLocation);
-        if (!updateConfigNodeLeader(status)) {
-          return status;
-        }
-      } catch (TException e) {
-        configLeader = null;
-      }
-      reconnect();
-    }
-    throw new TException(MSG_RECONNECTION_FAIL);
+    throw new TException("DataNode to ConfigNode client doesn't support stopConfigNode.");
   }
 
   @Override
@@ -724,10 +681,13 @@ public class ConfigNodeClient
   }
 
   @Override
-  public long getConfigNodeHeartBeat(long timestamp) throws TException {
+  public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
       try {
-        return client.getConfigNodeHeartBeat(timestamp);
+        TRegionRouteMapResp regionRouteMapResp = client.getLatestRegionRouteMap();
+        if (!updateConfigNodeLeader(regionRouteMapResp.getStatus())) {
+          return regionRouteMapResp;
+        }
       } catch (TException e) {
         configLeader = null;
       }
@@ -736,6 +696,11 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
+  @Override
+  public long getConfigNodeHeartBeat(long timestamp) throws TException {
+    throw new TException("DataNode to ConfigNode client doesn't support getConfigNodeHeartBeat.");
+  }
+
   @Override
   public TSStatus dropFunction(TDropFunctionReq req) throws TException {
     for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 5f2f0b168f..338927ff72 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -20,6 +20,7 @@
 namespace java org.apache.iotdb.common.rpc.thrift
 namespace py iotdb.thrift.common
 
+// Define a set of ip:port address
 struct TEndPoint {
   1: required string ip
   2: required i32 port
@@ -67,7 +68,7 @@ struct TDataNodeLocation {
   1: required i32 dataNodeId
   // TEndPoint for DataNode's client rpc
   2: required TEndPoint clientRpcEndPoint
-  // TEndPoint for DataNode's internal rpc
+  // TEndPoint for DataNode's cluster internal rpc
   3: required TEndPoint internalEndPoint
   // TEndPoint for exchange data between DataNodes
   4: required TEndPoint mPPDataExchangeEndPoint
@@ -77,6 +78,13 @@ struct TDataNodeLocation {
   6: required TEndPoint schemaRegionConsensusEndPoint
 }
 
+struct TDataNodeInfo {
+  1: required TDataNodeLocation location
+  2: required i32 cpuCoreNum
+  3: required i64 maxMemory
+}
+
+// For show regions
 struct TRegionInfo {
   1: required TConsensusGroupId consensusGroupId
   2: required string storageGroup
@@ -88,12 +96,6 @@ struct TRegionInfo {
   8: optional string status
 }
 
-struct TDataNodeInfo {
-  1: required TDataNodeLocation location
-  2: required i32 cpuCoreNum
-  3: required i64 maxMemory
-}
-
 struct TFlushReq {
    1: optional string isSeq
    2: optional list<string> storageGroups
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 5a81e7e082..21a3b6dff9 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -236,6 +236,15 @@ struct TShowRegionResp {
   2: optional list<common.TRegionInfo> regionInfoList;
 }
 
+struct TRegionRouteMapResp {
+  1: required common.TSStatus status
+  // For version stamp
+  2: optional i64 timestamp
+  // The routing policy of read/write requests for each RegionGroup is based on the order in the TRegionReplicaSet.
+  // The replica with higher sorting result in TRegionReplicaSet will have higher priority.
+  3: optional map<common.TConsensusGroupId, common.TRegionReplicaSet> regionRouteMap
+}
+
 service IConfigNodeRPCService {
 
   /* DataNode */
@@ -321,7 +330,12 @@ service IConfigNodeRPCService {
 
   TShowRegionResp showRegion(TShowRegionReq req)
 
+  /* Routing */
+
+  TRegionRouteMapResp getLatestRegionRouteMap()
+
   /* Get confignode heartbeat */
+
   i64 getConfigNodeHeartBeat(i64 timestamp)
 
 }