You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/12 03:36:09 UTC
[iotdb] branch master updated: [IOTDB-2867] Response leader redirect when the current ConfigNode is not leader (#5481)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6fc6b11263 [IOTDB-2867] Response leader redirect when the current ConfigNode is not leader (#5481)
6fc6b11263 is described below
commit 6fc6b112638a33bb33c5d4745186eb2c7a6b3e28
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Apr 12 11:36:03 2022 +0800
[IOTDB-2867] Response leader redirect when the current ConfigNode is not leader (#5481)
---
.../response/DataNodeConfigurationDataSet.java | 21 +++-
.../consensus/response/DataNodesInfoDataSet.java | 21 ++++
.../consensus/response/DataPartitionDataSet.java | 127 +++++++++++----------
.../response/StorageGroupSchemaDataSet.java | 16 +++
.../iotdb/confignode/manager/ConfigManager.java | 85 +++++++++-----
.../iotdb/confignode/manager/ConsensusManager.java | 4 +
.../iotdb/confignode/manager/DataNodeManager.java | 4 +-
.../server/ConfigNodeRPCServerProcessor.java | 55 +--------
.../confignode/consensus/RatisConsensusDemo.java | 14 +++
.../server/ConfigNodeRPCServerProcessorTest.java | 5 +-
10 files changed, 206 insertions(+), 146 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
index 9e5d67f7ea..deb1a07ead 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
@@ -22,21 +22,31 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
public class DataNodeConfigurationDataSet implements DataSet {
private TSStatus status;
- private int dataNodeId;
+ private Integer dataNodeId;
private TGlobalConfig globalConfig;
public DataNodeConfigurationDataSet() {
- // Empty constructor
+ this.dataNodeId = null;
+ this.globalConfig = null;
+ }
+
+ public TSStatus getStatus() {
+ return status;
}
public void setStatus(TSStatus status) {
this.status = status;
}
+ public Integer getDataNodeId() {
+ return dataNodeId;
+ }
+
public void setDataNodeId(int dataNodeId) {
this.dataNodeId = dataNodeId;
}
@@ -47,7 +57,10 @@ public class DataNodeConfigurationDataSet implements DataSet {
public void convertToRpcDataNodeRegisterResp(TDataNodeRegisterResp resp) {
resp.setStatus(status);
- resp.setDataNodeID(dataNodeId);
- resp.setGlobalConfig(globalConfig);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || status.getCode() == TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()) {
+ resp.setDataNodeID(dataNodeId);
+ resp.setGlobalConfig(globalConfig);
+ }
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
index 11e3d2ce71..f4f38942b4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
@@ -18,11 +18,17 @@
*/
package org.apache.iotdb.confignode.consensus.response;
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class DataNodesInfoDataSet implements DataSet {
@@ -48,4 +54,19 @@ public class DataNodesInfoDataSet implements DataSet {
public List<DataNodeLocation> getDataNodeList() {
return this.dataNodeList;
}
+
+ public void convertToRPCDataNodeMessageResp(TDataNodeMessageResp resp) {
+ resp.setStatus(status);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Map<Integer, TDataNodeMessage> msgMap = new HashMap<>();
+ for (DataNodeLocation info : dataNodeList) {
+ msgMap.put(
+ info.getDataNodeId(),
+ new TDataNodeMessage(
+ info.getDataNodeId(),
+ new EndPoint(info.getEndPoint().getIp(), info.getEndPoint().getPort())));
+ resp.setDataNodeMessageMap(msgMap);
+ }
+ }
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index 222995501c..4de9443c5e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
import java.util.ArrayList;
import java.util.HashMap;
@@ -63,67 +64,69 @@ public class DataPartitionDataSet implements DataSet {
public void convertToRpcDataPartitionResp(TDataPartitionResp resp) {
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
dataPartitionMap = new HashMap<>();
-
- dataPartition
- .getDataPartitionMap()
- .forEach(
- ((storageGroup, seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap) -> {
- // Extract StorageGroupName
- dataPartitionMap.putIfAbsent(storageGroup, new HashMap<>());
-
- seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap.forEach(
- ((seriesPartitionSlot, timePartitionSlotReplicaSetListMap) -> {
- // Extract TSeriesPartitionSlot
- TSeriesPartitionSlot tSeriesPartitionSlot =
- new TSeriesPartitionSlot(seriesPartitionSlot.getSlotId());
- dataPartitionMap
- .get(storageGroup)
- .putIfAbsent(tSeriesPartitionSlot, new HashMap<>());
-
- // Extract Map<TimePartitionSlot, List<RegionReplicaSet>>
- timePartitionSlotReplicaSetListMap.forEach(
- ((timePartitionSlot, regionReplicaSets) -> {
- // Extract TTimePartitionSlot
- TTimePartitionSlot tTimePartitionSlot =
- new TTimePartitionSlot(timePartitionSlot.getStartTime());
- dataPartitionMap
- .get(storageGroup)
- .get(tSeriesPartitionSlot)
- .putIfAbsent(tTimePartitionSlot, new ArrayList<>());
-
- // Extract TRegionReplicaSets
- regionReplicaSets.forEach(
- regionReplicaSet -> {
- TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
-
- // Set TRegionReplicaSet's RegionId
- tRegionReplicaSet.setRegionId(regionReplicaSet.getId().getId());
-
- // Set TRegionReplicaSet's GroupType
- tRegionReplicaSet.setGroupType("DataRegion");
-
- // Set TRegionReplicaSet's EndPoints
- List<EndPoint> endPointList = new ArrayList<>();
- regionReplicaSet
- .getDataNodeList()
- .forEach(
- dataNodeLocation ->
- endPointList.add(
- new EndPoint(
- dataNodeLocation.getEndPoint().getIp(),
- dataNodeLocation.getEndPoint().getPort())));
- tRegionReplicaSet.setEndpoint(endPointList);
-
- dataPartitionMap
- .get(storageGroup)
- .get(tSeriesPartitionSlot)
- .get(tTimePartitionSlot)
- .add(tRegionReplicaSet);
- });
- }));
- }));
- }));
-
- resp.setDataPartitionMap(dataPartitionMap);
+ resp.setStatus(status);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataPartition
+ .getDataPartitionMap()
+ .forEach(
+ ((storageGroup, seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap) -> {
+ // Extract StorageGroupName
+ dataPartitionMap.putIfAbsent(storageGroup, new HashMap<>());
+
+ seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap.forEach(
+ ((seriesPartitionSlot, timePartitionSlotReplicaSetListMap) -> {
+ // Extract TSeriesPartitionSlot
+ TSeriesPartitionSlot tSeriesPartitionSlot =
+ new TSeriesPartitionSlot(seriesPartitionSlot.getSlotId());
+ dataPartitionMap
+ .get(storageGroup)
+ .putIfAbsent(tSeriesPartitionSlot, new HashMap<>());
+
+ // Extract Map<TimePartitionSlot, List<RegionReplicaSet>>
+ timePartitionSlotReplicaSetListMap.forEach(
+ ((timePartitionSlot, regionReplicaSets) -> {
+ // Extract TTimePartitionSlot
+ TTimePartitionSlot tTimePartitionSlot =
+ new TTimePartitionSlot(timePartitionSlot.getStartTime());
+ dataPartitionMap
+ .get(storageGroup)
+ .get(tSeriesPartitionSlot)
+ .putIfAbsent(tTimePartitionSlot, new ArrayList<>());
+
+ // Extract TRegionReplicaSets
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
+
+ // Set TRegionReplicaSet's RegionId
+ tRegionReplicaSet.setRegionId(regionReplicaSet.getId().getId());
+
+ // Set TRegionReplicaSet's GroupType
+ tRegionReplicaSet.setGroupType("DataRegion");
+
+ // Set TRegionReplicaSet's EndPoints
+ List<EndPoint> endPointList = new ArrayList<>();
+ regionReplicaSet
+ .getDataNodeList()
+ .forEach(
+ dataNodeLocation ->
+ endPointList.add(
+ new EndPoint(
+ dataNodeLocation.getEndPoint().getIp(),
+ dataNodeLocation.getEndPoint().getPort())));
+ tRegionReplicaSet.setEndpoint(endPointList);
+
+ dataPartitionMap
+ .get(storageGroup)
+ .get(tSeriesPartitionSlot)
+ .get(tTimePartitionSlot)
+ .add(tRegionReplicaSet);
+ });
+ }));
+ }));
+ }));
+
+ resp.setDataPartitionMap(dataPartitionMap);
+ }
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
index 700d4acfb5..245cd58d91 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
@@ -20,9 +20,14 @@ package org.apache.iotdb.confignode.consensus.response;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class StorageGroupSchemaDataSet implements DataSet {
@@ -47,4 +52,15 @@ public class StorageGroupSchemaDataSet implements DataSet {
public void setSchemaList(List<StorageGroupSchema> schemaList) {
this.schemaList = schemaList;
}
+
+ public void convertToRPCStorageGroupMessageResp(TStorageGroupMessageResp resp) {
+ resp.setStatus(status);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Map<String, TStorageGroupMessage> storageGroupMessageMap = new HashMap<>();
+ for (StorageGroupSchema schema : schemaList) {
+ storageGroupMessageMap.put(schema.getName(), new TStorageGroupMessage(schema.getName()));
+ }
+ resp.setStorageGroupMessageMap(storageGroupMessageMap);
+ }
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 8457275a88..d84ace336e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
+import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
@@ -75,43 +78,48 @@ public class ConfigManager implements Manager {
@Override
public DataSet registerDataNode(PhysicalPlan physicalPlan) {
-
- // TODO: Only leader can register DataNode
-
- if (physicalPlan instanceof RegisterDataNodePlan) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return dataNodeManager.registerDataNode((RegisterDataNodePlan) physicalPlan);
+ } else {
+ DataNodeConfigurationDataSet dataSet = new DataNodeConfigurationDataSet();
+ dataSet.setStatus(status);
+ return dataSet;
}
- return new DataNodeConfigurationDataSet();
}
@Override
public DataSet getDataNodeInfo(PhysicalPlan physicalPlan) {
-
- // TODO: Only leader can get DataNodeInfo
-
- if (physicalPlan instanceof QueryDataNodeInfoPlan) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return dataNodeManager.getDataNodeInfo((QueryDataNodeInfoPlan) physicalPlan);
+ } else {
+ DataNodesInfoDataSet dataSet = new DataNodesInfoDataSet();
+ dataSet.setStatus(status);
+ return dataSet;
}
- return new DataNodesInfoDataSet();
}
@Override
public DataSet getStorageGroupSchema() {
-
- // TODO: Only leader can get StorageGroupSchema
-
- return regionManager.getStorageGroupSchema();
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return regionManager.getStorageGroupSchema();
+ } else {
+ StorageGroupSchemaDataSet dataSet = new StorageGroupSchemaDataSet();
+ dataSet.setStatus(status);
+ return dataSet;
+ }
}
@Override
public TSStatus setStorageGroup(PhysicalPlan physicalPlan) {
-
- // TODO: Only leader can set StorageGroup
-
- if (physicalPlan instanceof SetStorageGroupPlan) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return regionManager.setStorageGroup((SetStorageGroupPlan) physicalPlan);
+ } else {
+ return status;
}
- return ERROR_TSSTATUS;
}
@Override
@@ -139,24 +147,43 @@ public class ConfigManager implements Manager {
@Override
public DataSet getDataPartition(PhysicalPlan physicalPlan) {
-
- // TODO: Only leader can query DataPartition
-
- if (physicalPlan instanceof GetOrCreateDataPartitionPlan) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return partitionManager.getDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
+ } else {
+ DataPartitionDataSet dataSet = new DataPartitionDataSet();
+ dataSet.setStatus(status);
+ return dataSet;
}
- return new DataPartitionDataSet();
}
@Override
public DataSet getOrCreateDataPartition(PhysicalPlan physicalPlan) {
-
- // TODO: only leader can apply DataPartition
-
- if (physicalPlan instanceof GetOrCreateDataPartitionPlan) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return partitionManager.getOrCreateDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
+ } else {
+ DataPartitionDataSet dataSet = new DataPartitionDataSet();
+ dataSet.setStatus(status);
+ return dataSet;
+ }
+ }
+
+ private TSStatus confirmLeader() {
+ if (getConsensusManager().isLeader()) {
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } else {
+ Endpoint endpoint = getConsensusManager().getLeader();
+ if (endpoint == null) {
+ return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
+ .setMessage(
+ "The current ConfigNode is not leader. And ConfigNodeGroup is in leader election. Please redirect with a random ConfigNode.");
+ } else {
+ return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
+ .setRedirectNode(new EndPoint(endpoint.getIp(), endpoint.getPort()))
+ .setMessage("The current ConfigNode is not leader. Please redirect.");
+ }
}
- return new DataPartitionDataSet();
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 51d2bf7898..a7c5205caf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -133,5 +133,9 @@ public class ConsensusManager {
return consensusImpl.isLeader(consensusGroupId);
}
+ public Endpoint getLeader() {
+ return consensusImpl.getLeader(consensusGroupId).getEndpoint();
+ }
+
// TODO: Interfaces for LoadBalancer control
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
index 9e0d5a7633..fb150b405c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
@@ -77,7 +77,9 @@ public class DataNodeManager {
DataNodeConfigurationDataSet dataSet = new DataNodeConfigurationDataSet();
if (DataNodeInfoPersistence.getInstance().containsValue(plan.getInfo())) {
- dataSet.setStatus(new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()));
+ TSStatus status = new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
+ status.setMessage("DataNode already registered.");
+ dataSet.setStatus(status);
} else {
plan.getInfo().setDataNodeId(DataNodeInfoPersistence.getInstance().generateNextDataNodeId());
ConsensusWriteResponse resp = getConsensusManager().write(plan);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 1e17b4d76e..a4392a1716 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.confignode.service.thrift.server;
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
@@ -36,7 +35,6 @@ import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -46,18 +44,14 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
import org.apache.iotdb.db.auth.AuthException;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
@@ -82,12 +76,9 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
-1, new Endpoint(req.getEndPoint().getIp(), req.getEndPoint().getPort())));
DataNodeConfigurationDataSet dataSet =
(DataNodeConfigurationDataSet) configManager.registerDataNode(plan);
+
TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
dataSet.convertToRpcDataNodeRegisterResp(resp);
- LOGGER.info(
- "Register DataNode successful. DataNodeID: {}, {}",
- resp.getDataNodeID(),
- req.getEndPoint().toString());
return resp;
}
@@ -97,19 +88,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
DataNodesInfoDataSet dataSet = (DataNodesInfoDataSet) configManager.getDataNodeInfo(plan);
TDataNodeMessageResp resp = new TDataNodeMessageResp();
- resp.setStatus(dataSet.getStatus());
- if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- Map<Integer, TDataNodeMessage> msgMap = new HashMap<>();
- for (DataNodeLocation info : dataSet.getDataNodeList()) {
- msgMap.put(
- info.getDataNodeId(),
- new TDataNodeMessage(
- info.getDataNodeId(),
- new EndPoint(info.getEndPoint().getIp(), info.getEndPoint().getPort())));
- resp.setDataNodeMessageMap(msgMap);
- }
- }
-
+ dataSet.convertToRPCDataNodeMessageResp(resp);
return resp;
}
@@ -118,13 +97,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
SetStorageGroupPlan plan =
new SetStorageGroupPlan(new StorageGroupSchema(req.getStorageGroup()));
- TSStatus resp = configManager.setStorageGroup(plan);
- if (resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("Set StorageGroup {} successful.", req.getStorageGroup());
- } else {
- LOGGER.error("Set StorageGroup {} failed. {}", req.getStorageGroup(), resp.getMessage());
- }
- return resp;
+ return configManager.setStorageGroup(plan);
}
@Override
@@ -139,15 +112,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
(StorageGroupSchemaDataSet) configManager.getStorageGroupSchema();
TStorageGroupMessageResp resp = new TStorageGroupMessageResp();
- resp.setStatus(dataSet.getStatus());
- if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- Map<String, TStorageGroupMessage> storageGroupMessageMap = new HashMap<>();
- for (StorageGroupSchema schema : dataSet.getSchemaList()) {
- storageGroupMessageMap.put(schema.getName(), new TStorageGroupMessage(schema.getName()));
- }
- resp.setStorageGroupMessageMap(storageGroupMessageMap);
- }
-
+ dataSet.convertToRPCStorageGroupMessageResp(resp);
return resp;
}
@@ -195,11 +160,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
(DataPartitionDataSet) configManager.getDataPartition(getDataPartitionPlan);
TDataPartitionResp resp = new TDataPartitionResp();
- resp.setStatus(dataset.getStatus());
- if (dataset.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataset.convertToRpcDataPartitionResp(resp);
- }
-
+ dataset.convertToRpcDataPartitionResp(resp);
return resp;
}
@@ -212,11 +173,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
(DataPartitionDataSet) configManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
TDataPartitionResp resp = new TDataPartitionResp();
- resp.setStatus(dataset.getStatus());
- if (dataset.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataset.convertToRpcDataPartitionResp(resp);
- }
-
+ dataset.convertToRpcDataPartitionResp(resp);
return resp;
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
index 90efb510ee..9a5bdf83e3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
@@ -70,6 +70,11 @@ public class RatisConsensusDemo {
queryStorageGroups();
}
+ public void ratisConsensusLeaderRedirectDemo() throws TException {
+ createClients();
+ registerDataNodeOnLeader();
+ }
+
private void createClients() throws TTransportException {
// Create clients for these three ConfigNodes
// to simulate DataNodes to send RPC requests
@@ -134,4 +139,13 @@ public class RatisConsensusDemo {
System.out.println("}");
}
}
+
+ private void registerDataNodeOnLeader() throws TException {
+ for (int i = 0; i < 3; i++) {
+ EndPoint endPoint = new EndPoint("0.0.0.0", 6667);
+ TDataNodeRegisterReq req = new TDataNodeRegisterReq(endPoint);
+ TDataNodeRegisterResp resp = clients[i].registerDataNode(req);
+ System.out.println(resp);
+ }
+ }
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index d4f3608920..6db4d61a20 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -54,14 +54,17 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
public class ConfigNodeRPCServerProcessorTest {
ConfigNodeRPCServerProcessor processor;
@Before
- public void before() throws IOException {
+ public void before() throws IOException, InterruptedException {
processor = new ConfigNodeRPCServerProcessor();
+ // Sleep 1s to make sure the Consensus group has done leader election
+ TimeUnit.SECONDS.sleep(1);
}
@After