You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/01 11:05:47 UTC
[iotdb] branch master updated: [IOTDB-3710] Get latest RegionRouteMap (#6554)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a38063fcd6 [IOTDB-3710] Get latest RegionRouteMap (#6554)
a38063fcd6 is described below
commit a38063fcd6d57e3abdb0a5a8dce2f57e967d5a16
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Fri Jul 1 19:05:41 2022 +0800
[IOTDB-3710] Get latest RegionRouteMap (#6554)
---
.../iotdb/confignode/manager/ConfigManager.java | 23 ++++++--
.../apache/iotdb/confignode/manager/IManager.java | 8 +++
.../thrift/ConfigNodeRPCServiceProcessor.java | 25 ++++-----
.../apache/iotdb/db/client/ConfigNodeClient.java | 65 +++++-----------------
thrift-commons/src/main/thrift/common.thrift | 16 +++---
.../src/main/thrift/confignode.thrift | 14 +++++
6 files changed, 75 insertions(+), 76 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 28fc582619..bff49a01ea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -77,6 +77,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -204,9 +205,8 @@ public class ConfigManager implements IManager {
getLoadManager()
.getHeartbeatCacheMap()
.forEach(
- (nodeId, heartbeatCache) -> {
- nodeStatus.put(nodeId, heartbeatCache.getNodeStatus().getStatus());
- });
+ (nodeId, heartbeatCache) ->
+ nodeStatus.put(nodeId, heartbeatCache.getNodeStatus().getStatus()));
return new TClusterNodeInfos(status, configNodeLocations, dataNodeInfoLocations, nodeStatus);
} else {
return new TClusterNodeInfos(status, new ArrayList<>(), new ArrayList<>(), new HashMap<>());
@@ -758,16 +758,29 @@ public class ConfigManager implements IManager {
: status;
}
+ @Override
+ public TRegionRouteMapResp getLatestRegionRouteMap() {
+ TSStatus status = confirmLeader();
+ TRegionRouteMapResp resp = new TRegionRouteMapResp(status);
+
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ resp.setTimestamp(System.currentTimeMillis());
+ resp.setRegionRouteMap(getLoadManager().genRealTimeRoutingPolicy());
+ }
+
+ return resp;
+ }
+
@Override
public UDFManager getUDFManager() {
return udfManager;
}
@Override
- public DataSet showRegion(GetRegionInfoListPlan getRegionsinfoReq) {
+ public DataSet showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return partitionManager.getRegionInfoList(getRegionsinfoReq);
+ return partitionManager.getRegionInfoList(getRegionInfoListPlan);
} else {
RegionInfoListResp regionResp = new RegionInfoListResp();
regionResp.setStatus(status);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 85464ee875..8ea24c841e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
@@ -240,6 +241,13 @@ public interface IManager {
TSStatus flush(TFlushReq req);
+ /**
+ * Get the latest RegionRouteMap
+ *
+ * @return TRegionRouteMapResp
+ */
+ TRegionRouteMapResp getLatestRegionRouteMap();
+
void addMetrics();
/** Show (data/schema) regions */
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index bae0da1c98..a5012a695d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
@@ -386,12 +387,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return StatusUtils.OK;
}
- /**
- * For leader to remove ConfigNode configuration in consensus layer
- *
- * @param configNodeLocation
- * @return
- */
+ /** For leader to remove ConfigNode configuration in consensus layer */
@Override
public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
RemoveConfigNodePlan removeConfigNodePlan = new RemoveConfigNodePlan(configNodeLocation);
@@ -407,12 +403,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return status;
}
- /**
- * For leader to stop ConfigNode
- *
- * @param configNodeLocation
- * @return
- */
+ /** For leader to stop ConfigNode */
@Override
public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
if (!configManager.getNodeManager().getRegisteredConfigNodes().contains(configNodeLocation)) {
@@ -470,15 +461,21 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
@Override
public TShowRegionResp showRegion(TShowRegionReq showRegionReq) throws TException {
- GetRegionInfoListPlan getRegionsinfoPlan =
+ GetRegionInfoListPlan getRegionInfoListPlan =
new GetRegionInfoListPlan(showRegionReq.getConsensusGroupType());
- RegionInfoListResp dataSet = (RegionInfoListResp) configManager.showRegion(getRegionsinfoPlan);
+ RegionInfoListResp dataSet =
+ (RegionInfoListResp) configManager.showRegion(getRegionInfoListPlan);
TShowRegionResp showRegionResp = new TShowRegionResp();
showRegionResp.setStatus(dataSet.getStatus());
showRegionResp.setRegionInfoList(dataSet.getRegionInfoList());
return showRegionResp;
}
+ @Override
+ public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
+ return configManager.getLatestRegionRouteMap();
+ }
+
@Override
public long getConfigNodeHeartBeat(long timestamp) throws TException {
return timestamp;
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 7ba740c9b7..84021bdb1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -51,6 +51,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
@@ -608,34 +609,12 @@ public class ConfigNodeClient
@Override
public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) throws TException {
- for (int i = 0; i < RETRY_NUM; i++) {
- try {
- TConfigNodeRegisterResp resp = client.registerConfigNode(req);
- if (!updateConfigNodeLeader(resp.status)) {
- return resp;
- }
- } catch (TException e) {
- configLeader = null;
- }
- reconnect();
- }
- throw new TException(MSG_RECONNECTION_FAIL);
+ throw new TException("DataNode to ConfigNode client doesn't support registerConfigNode.");
}
@Override
public TSStatus addConsensusGroup(TConfigNodeRegisterResp registerResp) throws TException {
- for (int i = 0; i < RETRY_NUM; i++) {
- try {
- TSStatus status = client.addConsensusGroup(registerResp);
- if (!updateConfigNodeLeader(status)) {
- return status;
- }
- } catch (TException e) {
- configLeader = null;
- }
- reconnect();
- }
- throw new TException(MSG_RECONNECTION_FAIL);
+ throw new TException("DataNode to ConfigNode client doesn't support addConsensusGroup.");
}
@Override
@@ -645,34 +624,12 @@ public class ConfigNodeClient
@Override
public TSStatus removeConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
- for (int i = 0; i < RETRY_NUM; i++) {
- try {
- TSStatus status = client.removeConfigNode(configNodeLocation);
- if (!updateConfigNodeLeader(status)) {
- return status;
- }
- } catch (TException e) {
- configLeader = null;
- }
- reconnect();
- }
- throw new TException(MSG_RECONNECTION_FAIL);
+ throw new TException("DataNode to ConfigNode client doesn't support removeConfigNode.");
}
@Override
public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) throws TException {
- for (int i = 0; i < RETRY_NUM; i++) {
- try {
- TSStatus status = client.stopConfigNode(configNodeLocation);
- if (!updateConfigNodeLeader(status)) {
- return status;
- }
- } catch (TException e) {
- configLeader = null;
- }
- reconnect();
- }
- throw new TException(MSG_RECONNECTION_FAIL);
+ throw new TException("DataNode to ConfigNode client doesn't support stopConfigNode.");
}
@Override
@@ -724,10 +681,13 @@ public class ConfigNodeClient
}
@Override
- public long getConfigNodeHeartBeat(long timestamp) throws TException {
+ public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
- return client.getConfigNodeHeartBeat(timestamp);
+ TRegionRouteMapResp regionRouteMapResp = client.getLatestRegionRouteMap();
+ if (!updateConfigNodeLeader(regionRouteMapResp.getStatus())) {
+ return regionRouteMapResp;
+ }
} catch (TException e) {
configLeader = null;
}
@@ -736,6 +696,11 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public long getConfigNodeHeartBeat(long timestamp) throws TException {
+ throw new TException("DataNode to ConfigNode client doesn't support getConfigNodeHeartBeat.");
+ }
+
@Override
public TSStatus dropFunction(TDropFunctionReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 5f2f0b168f..338927ff72 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -20,6 +20,7 @@
namespace java org.apache.iotdb.common.rpc.thrift
namespace py iotdb.thrift.common
+// Define a set of ip:port address
struct TEndPoint {
1: required string ip
2: required i32 port
@@ -67,7 +68,7 @@ struct TDataNodeLocation {
1: required i32 dataNodeId
// TEndPoint for DataNode's client rpc
2: required TEndPoint clientRpcEndPoint
- // TEndPoint for DataNode's internal rpc
+ // TEndPoint for DataNode's cluster internal rpc
3: required TEndPoint internalEndPoint
// TEndPoint for exchange data between DataNodes
4: required TEndPoint mPPDataExchangeEndPoint
@@ -77,6 +78,13 @@ struct TDataNodeLocation {
6: required TEndPoint schemaRegionConsensusEndPoint
}
+struct TDataNodeInfo {
+ 1: required TDataNodeLocation location
+ 2: required i32 cpuCoreNum
+ 3: required i64 maxMemory
+}
+
+// For show regions
struct TRegionInfo {
1: required TConsensusGroupId consensusGroupId
2: required string storageGroup
@@ -88,12 +96,6 @@ struct TRegionInfo {
8: optional string status
}
-struct TDataNodeInfo {
- 1: required TDataNodeLocation location
- 2: required i32 cpuCoreNum
- 3: required i64 maxMemory
-}
-
struct TFlushReq {
1: optional string isSeq
2: optional list<string> storageGroups
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 5a81e7e082..21a3b6dff9 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -236,6 +236,15 @@ struct TShowRegionResp {
2: optional list<common.TRegionInfo> regionInfoList;
}
+struct TRegionRouteMapResp {
+ 1: required common.TSStatus status
+ // For version stamp
+ 2: optional i64 timestamp
+ // The routing policy of read/write requests for each RegionGroup is based on the order in the TRegionReplicaSet.
+ // The replica with higher sorting result in TRegionReplicaSet will have higher priority.
+ 3: optional map<common.TConsensusGroupId, common.TRegionReplicaSet> regionRouteMap
+}
+
service IConfigNodeRPCService {
/* DataNode */
@@ -321,7 +330,12 @@ service IConfigNodeRPCService {
TShowRegionResp showRegion(TShowRegionReq req)
+ /* Routing */
+
+ TRegionRouteMapResp getLatestRegionRouteMap()
+
/* Get confignode heartbeat */
+
i64 getConfigNodeHeartBeat(i64 timestamp)
}