You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/12 03:36:09 UTC

[iotdb] branch master updated: [IOTDB-2867] Response leader redirect when the current ConfigNode is not leader (#5481)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fc6b11263 [IOTDB-2867] Response leader redirect when the current ConfigNode is not leader  (#5481)
6fc6b11263 is described below

commit 6fc6b112638a33bb33c5d4745186eb2c7a6b3e28
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue Apr 12 11:36:03 2022 +0800

    [IOTDB-2867] Response leader redirect when the current ConfigNode is not leader  (#5481)
---
 .../response/DataNodeConfigurationDataSet.java     |  21 +++-
 .../consensus/response/DataNodesInfoDataSet.java   |  21 ++++
 .../consensus/response/DataPartitionDataSet.java   | 127 +++++++++++----------
 .../response/StorageGroupSchemaDataSet.java        |  16 +++
 .../iotdb/confignode/manager/ConfigManager.java    |  85 +++++++++-----
 .../iotdb/confignode/manager/ConsensusManager.java |   4 +
 .../iotdb/confignode/manager/DataNodeManager.java  |   4 +-
 .../server/ConfigNodeRPCServerProcessor.java       |  55 +--------
 .../confignode/consensus/RatisConsensusDemo.java   |  14 +++
 .../server/ConfigNodeRPCServerProcessorTest.java   |   5 +-
 10 files changed, 206 insertions(+), 146 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
index 9e5d67f7ea..deb1a07ead 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
@@ -22,21 +22,31 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 public class DataNodeConfigurationDataSet implements DataSet {
 
   private TSStatus status;
-  private int dataNodeId;
+  private Integer dataNodeId;
   private TGlobalConfig globalConfig;
 
   public DataNodeConfigurationDataSet() {
-    // Empty constructor
+    this.dataNodeId = null;
+    this.globalConfig = null;
+  }
+
+  public TSStatus getStatus() {
+    return status;
   }
 
   public void setStatus(TSStatus status) {
     this.status = status;
   }
 
+  public Integer getDataNodeId() {
+    return dataNodeId;
+  }
+
   public void setDataNodeId(int dataNodeId) {
     this.dataNodeId = dataNodeId;
   }
@@ -47,7 +57,10 @@ public class DataNodeConfigurationDataSet implements DataSet {
 
   public void convertToRpcDataNodeRegisterResp(TDataNodeRegisterResp resp) {
     resp.setStatus(status);
-    resp.setDataNodeID(dataNodeId);
-    resp.setGlobalConfig(globalConfig);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        || status.getCode() == TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()) {
+      resp.setDataNodeID(dataNodeId);
+      resp.setGlobalConfig(globalConfig);
+    }
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
index 11e3d2ce71..f4f38942b4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
@@ -18,11 +18,17 @@
  */
 package org.apache.iotdb.confignode.consensus.response;
 
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class DataNodesInfoDataSet implements DataSet {
 
@@ -48,4 +54,19 @@ public class DataNodesInfoDataSet implements DataSet {
   public List<DataNodeLocation> getDataNodeList() {
     return this.dataNodeList;
   }
+
+  public void convertToRPCDataNodeMessageResp(TDataNodeMessageResp resp) {
+    resp.setStatus(status);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      Map<Integer, TDataNodeMessage> msgMap = new HashMap<>();
+      for (DataNodeLocation info : dataNodeList) {
+        msgMap.put(
+            info.getDataNodeId(),
+            new TDataNodeMessage(
+                info.getDataNodeId(),
+                new EndPoint(info.getEndPoint().getIp(), info.getEndPoint().getPort())));
+        resp.setDataNodeMessageMap(msgMap);
+      }
+    }
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index 222995501c..4de9443c5e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -63,67 +64,69 @@ public class DataPartitionDataSet implements DataSet {
   public void convertToRpcDataPartitionResp(TDataPartitionResp resp) {
     Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
         dataPartitionMap = new HashMap<>();
-
-    dataPartition
-        .getDataPartitionMap()
-        .forEach(
-            ((storageGroup, seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap) -> {
-              // Extract StorageGroupName
-              dataPartitionMap.putIfAbsent(storageGroup, new HashMap<>());
-
-              seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap.forEach(
-                  ((seriesPartitionSlot, timePartitionSlotReplicaSetListMap) -> {
-                    // Extract TSeriesPartitionSlot
-                    TSeriesPartitionSlot tSeriesPartitionSlot =
-                        new TSeriesPartitionSlot(seriesPartitionSlot.getSlotId());
-                    dataPartitionMap
-                        .get(storageGroup)
-                        .putIfAbsent(tSeriesPartitionSlot, new HashMap<>());
-
-                    // Extract Map<TimePartitionSlot, List<RegionReplicaSet>>
-                    timePartitionSlotReplicaSetListMap.forEach(
-                        ((timePartitionSlot, regionReplicaSets) -> {
-                          // Extract TTimePartitionSlot
-                          TTimePartitionSlot tTimePartitionSlot =
-                              new TTimePartitionSlot(timePartitionSlot.getStartTime());
-                          dataPartitionMap
-                              .get(storageGroup)
-                              .get(tSeriesPartitionSlot)
-                              .putIfAbsent(tTimePartitionSlot, new ArrayList<>());
-
-                          // Extract TRegionReplicaSets
-                          regionReplicaSets.forEach(
-                              regionReplicaSet -> {
-                                TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
-
-                                // Set TRegionReplicaSet's RegionId
-                                tRegionReplicaSet.setRegionId(regionReplicaSet.getId().getId());
-
-                                // Set TRegionReplicaSet's GroupType
-                                tRegionReplicaSet.setGroupType("DataRegion");
-
-                                // Set TRegionReplicaSet's EndPoints
-                                List<EndPoint> endPointList = new ArrayList<>();
-                                regionReplicaSet
-                                    .getDataNodeList()
-                                    .forEach(
-                                        dataNodeLocation ->
-                                            endPointList.add(
-                                                new EndPoint(
-                                                    dataNodeLocation.getEndPoint().getIp(),
-                                                    dataNodeLocation.getEndPoint().getPort())));
-                                tRegionReplicaSet.setEndpoint(endPointList);
-
-                                dataPartitionMap
-                                    .get(storageGroup)
-                                    .get(tSeriesPartitionSlot)
-                                    .get(tTimePartitionSlot)
-                                    .add(tRegionReplicaSet);
-                              });
-                        }));
-                  }));
-            }));
-
-    resp.setDataPartitionMap(dataPartitionMap);
+    resp.setStatus(status);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataPartition
+          .getDataPartitionMap()
+          .forEach(
+              ((storageGroup, seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap) -> {
+                // Extract StorageGroupName
+                dataPartitionMap.putIfAbsent(storageGroup, new HashMap<>());
+
+                seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap.forEach(
+                    ((seriesPartitionSlot, timePartitionSlotReplicaSetListMap) -> {
+                      // Extract TSeriesPartitionSlot
+                      TSeriesPartitionSlot tSeriesPartitionSlot =
+                          new TSeriesPartitionSlot(seriesPartitionSlot.getSlotId());
+                      dataPartitionMap
+                          .get(storageGroup)
+                          .putIfAbsent(tSeriesPartitionSlot, new HashMap<>());
+
+                      // Extract Map<TimePartitionSlot, List<RegionReplicaSet>>
+                      timePartitionSlotReplicaSetListMap.forEach(
+                          ((timePartitionSlot, regionReplicaSets) -> {
+                            // Extract TTimePartitionSlot
+                            TTimePartitionSlot tTimePartitionSlot =
+                                new TTimePartitionSlot(timePartitionSlot.getStartTime());
+                            dataPartitionMap
+                                .get(storageGroup)
+                                .get(tSeriesPartitionSlot)
+                                .putIfAbsent(tTimePartitionSlot, new ArrayList<>());
+
+                            // Extract TRegionReplicaSets
+                            regionReplicaSets.forEach(
+                                regionReplicaSet -> {
+                                  TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
+
+                                  // Set TRegionReplicaSet's RegionId
+                                  tRegionReplicaSet.setRegionId(regionReplicaSet.getId().getId());
+
+                                  // Set TRegionReplicaSet's GroupType
+                                  tRegionReplicaSet.setGroupType("DataRegion");
+
+                                  // Set TRegionReplicaSet's EndPoints
+                                  List<EndPoint> endPointList = new ArrayList<>();
+                                  regionReplicaSet
+                                      .getDataNodeList()
+                                      .forEach(
+                                          dataNodeLocation ->
+                                              endPointList.add(
+                                                  new EndPoint(
+                                                      dataNodeLocation.getEndPoint().getIp(),
+                                                      dataNodeLocation.getEndPoint().getPort())));
+                                  tRegionReplicaSet.setEndpoint(endPointList);
+
+                                  dataPartitionMap
+                                      .get(storageGroup)
+                                      .get(tSeriesPartitionSlot)
+                                      .get(tTimePartitionSlot)
+                                      .add(tRegionReplicaSet);
+                                });
+                          }));
+                    }));
+              }));
+
+      resp.setDataPartitionMap(dataPartitionMap);
+    }
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
index 700d4acfb5..245cd58d91 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
@@ -20,9 +20,14 @@ package org.apache.iotdb.confignode.consensus.response;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class StorageGroupSchemaDataSet implements DataSet {
 
@@ -47,4 +52,15 @@ public class StorageGroupSchemaDataSet implements DataSet {
   public void setSchemaList(List<StorageGroupSchema> schemaList) {
     this.schemaList = schemaList;
   }
+
+  public void convertToRPCStorageGroupMessageResp(TStorageGroupMessageResp resp) {
+    resp.setStatus(status);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      Map<String, TStorageGroupMessage> storageGroupMessageMap = new HashMap<>();
+      for (StorageGroupSchema schema : schemaList) {
+        storageGroupMessageMap.put(schema.getName(), new TStorageGroupMessage(schema.getName()));
+      }
+      resp.setStorageGroupMessageMap(storageGroupMessageMap);
+    }
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 8457275a88..d84ace336e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -19,11 +19,14 @@
 
 package org.apache.iotdb.confignode.manager;
 
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
 import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
 import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
+import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
 import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
 import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
@@ -75,43 +78,48 @@ public class ConfigManager implements Manager {
 
   @Override
   public DataSet registerDataNode(PhysicalPlan physicalPlan) {
-
-    // TODO: Only leader can register DataNode
-
-    if (physicalPlan instanceof RegisterDataNodePlan) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return dataNodeManager.registerDataNode((RegisterDataNodePlan) physicalPlan);
+    } else {
+      DataNodeConfigurationDataSet dataSet = new DataNodeConfigurationDataSet();
+      dataSet.setStatus(status);
+      return dataSet;
     }
-    return new DataNodeConfigurationDataSet();
   }
 
   @Override
   public DataSet getDataNodeInfo(PhysicalPlan physicalPlan) {
-
-    // TODO: Only leader can get DataNodeInfo
-
-    if (physicalPlan instanceof QueryDataNodeInfoPlan) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return dataNodeManager.getDataNodeInfo((QueryDataNodeInfoPlan) physicalPlan);
+    } else {
+      DataNodesInfoDataSet dataSet = new DataNodesInfoDataSet();
+      dataSet.setStatus(status);
+      return dataSet;
     }
-    return new DataNodesInfoDataSet();
   }
 
   @Override
   public DataSet getStorageGroupSchema() {
-
-    // TODO: Only leader can get StorageGroupSchema
-
-    return regionManager.getStorageGroupSchema();
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return regionManager.getStorageGroupSchema();
+    } else {
+      StorageGroupSchemaDataSet dataSet = new StorageGroupSchemaDataSet();
+      dataSet.setStatus(status);
+      return dataSet;
+    }
   }
 
   @Override
   public TSStatus setStorageGroup(PhysicalPlan physicalPlan) {
-
-    // TODO: Only leader can set StorageGroup
-
-    if (physicalPlan instanceof SetStorageGroupPlan) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return regionManager.setStorageGroup((SetStorageGroupPlan) physicalPlan);
+    } else {
+      return status;
     }
-    return ERROR_TSSTATUS;
   }
 
   @Override
@@ -139,24 +147,43 @@ public class ConfigManager implements Manager {
 
   @Override
   public DataSet getDataPartition(PhysicalPlan physicalPlan) {
-
-    // TODO: Only leader can query DataPartition
-
-    if (physicalPlan instanceof GetOrCreateDataPartitionPlan) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return partitionManager.getDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
+    } else {
+      DataPartitionDataSet dataSet = new DataPartitionDataSet();
+      dataSet.setStatus(status);
+      return dataSet;
     }
-    return new DataPartitionDataSet();
   }
 
   @Override
   public DataSet getOrCreateDataPartition(PhysicalPlan physicalPlan) {
-
-    // TODO: only leader can apply DataPartition
-
-    if (physicalPlan instanceof GetOrCreateDataPartitionPlan) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return partitionManager.getOrCreateDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
+    } else {
+      DataPartitionDataSet dataSet = new DataPartitionDataSet();
+      dataSet.setStatus(status);
+      return dataSet;
+    }
+  }
+
+  private TSStatus confirmLeader() {
+    if (getConsensusManager().isLeader()) {
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } else {
+      Endpoint endpoint = getConsensusManager().getLeader();
+      if (endpoint == null) {
+        return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
+            .setMessage(
+                "The current ConfigNode is not leader. And ConfigNodeGroup is in leader election. Please redirect with a random ConfigNode.");
+      } else {
+        return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
+            .setRedirectNode(new EndPoint(endpoint.getIp(), endpoint.getPort()))
+            .setMessage("The current ConfigNode is not leader. Please redirect.");
+      }
     }
-    return new DataPartitionDataSet();
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 51d2bf7898..a7c5205caf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -133,5 +133,9 @@ public class ConsensusManager {
     return consensusImpl.isLeader(consensusGroupId);
   }
 
+  public Endpoint getLeader() {
+    return consensusImpl.getLeader(consensusGroupId).getEndpoint();
+  }
+
   // TODO: Interfaces for LoadBalancer control
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
index 9e0d5a7633..fb150b405c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
@@ -77,7 +77,9 @@ public class DataNodeManager {
     DataNodeConfigurationDataSet dataSet = new DataNodeConfigurationDataSet();
 
     if (DataNodeInfoPersistence.getInstance().containsValue(plan.getInfo())) {
-      dataSet.setStatus(new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()));
+      TSStatus status = new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
+      status.setMessage("DataNode already registered.");
+      dataSet.setStatus(status);
     } else {
       plan.getInfo().setDataNodeId(DataNodeInfoPersistence.getInstance().generateNextDataNodeId());
       ConsensusWriteResponse resp = getConsensusManager().write(plan);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 1e17b4d76e..a4392a1716 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.confignode.service.thrift.server;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
@@ -36,7 +35,6 @@ import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
@@ -46,18 +44,14 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
 import org.apache.iotdb.db.auth.AuthException;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 /** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
 public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
@@ -82,12 +76,9 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
                 -1, new Endpoint(req.getEndPoint().getIp(), req.getEndPoint().getPort())));
     DataNodeConfigurationDataSet dataSet =
         (DataNodeConfigurationDataSet) configManager.registerDataNode(plan);
+
     TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
     dataSet.convertToRpcDataNodeRegisterResp(resp);
-    LOGGER.info(
-        "Register DataNode successful. DataNodeID: {}, {}",
-        resp.getDataNodeID(),
-        req.getEndPoint().toString());
     return resp;
   }
 
@@ -97,19 +88,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
     DataNodesInfoDataSet dataSet = (DataNodesInfoDataSet) configManager.getDataNodeInfo(plan);
 
     TDataNodeMessageResp resp = new TDataNodeMessageResp();
-    resp.setStatus(dataSet.getStatus());
-    if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      Map<Integer, TDataNodeMessage> msgMap = new HashMap<>();
-      for (DataNodeLocation info : dataSet.getDataNodeList()) {
-        msgMap.put(
-            info.getDataNodeId(),
-            new TDataNodeMessage(
-                info.getDataNodeId(),
-                new EndPoint(info.getEndPoint().getIp(), info.getEndPoint().getPort())));
-        resp.setDataNodeMessageMap(msgMap);
-      }
-    }
-
+    dataSet.convertToRPCDataNodeMessageResp(resp);
     return resp;
   }
 
@@ -118,13 +97,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
     SetStorageGroupPlan plan =
         new SetStorageGroupPlan(new StorageGroupSchema(req.getStorageGroup()));
 
-    TSStatus resp = configManager.setStorageGroup(plan);
-    if (resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.info("Set StorageGroup {} successful.", req.getStorageGroup());
-    } else {
-      LOGGER.error("Set StorageGroup {} failed. {}", req.getStorageGroup(), resp.getMessage());
-    }
-    return resp;
+    return configManager.setStorageGroup(plan);
   }
 
   @Override
@@ -139,15 +112,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
         (StorageGroupSchemaDataSet) configManager.getStorageGroupSchema();
 
     TStorageGroupMessageResp resp = new TStorageGroupMessageResp();
-    resp.setStatus(dataSet.getStatus());
-    if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      Map<String, TStorageGroupMessage> storageGroupMessageMap = new HashMap<>();
-      for (StorageGroupSchema schema : dataSet.getSchemaList()) {
-        storageGroupMessageMap.put(schema.getName(), new TStorageGroupMessage(schema.getName()));
-      }
-      resp.setStorageGroupMessageMap(storageGroupMessageMap);
-    }
-
+    dataSet.convertToRPCStorageGroupMessageResp(resp);
     return resp;
   }
 
@@ -195,11 +160,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
         (DataPartitionDataSet) configManager.getDataPartition(getDataPartitionPlan);
 
     TDataPartitionResp resp = new TDataPartitionResp();
-    resp.setStatus(dataset.getStatus());
-    if (dataset.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataset.convertToRpcDataPartitionResp(resp);
-    }
-
+    dataset.convertToRpcDataPartitionResp(resp);
     return resp;
   }
 
@@ -212,11 +173,7 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
         (DataPartitionDataSet) configManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
 
     TDataPartitionResp resp = new TDataPartitionResp();
-    resp.setStatus(dataset.getStatus());
-    if (dataset.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataset.convertToRpcDataPartitionResp(resp);
-    }
-
+    dataset.convertToRpcDataPartitionResp(resp);
     return resp;
   }
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
index 90efb510ee..9a5bdf83e3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
@@ -70,6 +70,11 @@ public class RatisConsensusDemo {
     queryStorageGroups();
   }
 
+  public void ratisConsensusLeaderRedirectDemo() throws TException {
+    createClients();
+    registerDataNodeOnLeader();
+  }
+
   private void createClients() throws TTransportException {
     // Create clients for these three ConfigNodes
     // to simulate DataNodes to send RPC requests
@@ -134,4 +139,13 @@ public class RatisConsensusDemo {
       System.out.println("}");
     }
   }
+
+  private void registerDataNodeOnLeader() throws TException {
+    for (int i = 0; i < 3; i++) {
+      EndPoint endPoint = new EndPoint("0.0.0.0", 6667);
+      TDataNodeRegisterReq req = new TDataNodeRegisterReq(endPoint);
+      TDataNodeRegisterResp resp = clients[i].registerDataNode(req);
+      System.out.println(resp);
+    }
+  }
 }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index d4f3608920..6db4d61a20 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -54,14 +54,17 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class ConfigNodeRPCServerProcessorTest {
 
   ConfigNodeRPCServerProcessor processor;
 
   @Before
-  public void before() throws IOException {
+  public void before() throws IOException, InterruptedException {
     processor = new ConfigNodeRPCServerProcessor();
+    // Sleep 1s to make sure the Consensus group has done leader election
+    TimeUnit.SECONDS.sleep(1);
   }
 
   @After