You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/14 18:10:27 UTC

[iotdb] branch master updated: Open Optimize ConfigNode redirect and create region process (#5550)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d83f7a894  Open Optimize ConfigNode redirect and create region process (#5550)
0d83f7a894 is described below

commit 0d83f7a894ad6c4d3359ff7892569f57eb2dd1c2
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Fri Apr 15 02:10:21 2022 +0800

     Open Optimize ConfigNode redirect and create region process (#5550)
---
 .../iotdb/confignode/cli/TemporaryClient.java      |  5 ++-
 .../iotdb/confignode/manager/ConfigManager.java    | 17 ++------
 .../iotdb/confignode/cli/TemporaryClientDemo.java  | 47 +++++++++++++++-------
 3 files changed, 40 insertions(+), 29 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java b/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
index 8e966434c5..787d07b579 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
@@ -47,7 +47,8 @@ public class TemporaryClient {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TemporaryClient.class);
 
-  private static final int timeOutInMS = 2000;
+  // TODO: Add timeout parameter
+  private static final int timeOutInMS = 10000;
   private static final int retryWait = 10;
   private static final int retryNum = 3;
 
@@ -115,6 +116,7 @@ public class TemporaryClient {
                 status);
           }
         } catch (TException e) {
+          // TODO: Handler SocketTimeOutException
           LOGGER.error(
               "Create SchemaRegion on DataNode: {} failed, {}. Retrying...",
               DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint(),
@@ -167,6 +169,7 @@ public class TemporaryClient {
                 status);
           }
         } catch (TException e) {
+          // TODO: Handler SocketTimeOutException
           LOGGER.error(
               "Create DataRegion on DataNode: {} failed, {}. Retrying...",
               DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 54b0e294d5..c605d418b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.confignode.manager;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
 import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
@@ -36,7 +35,6 @@ import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
 import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -250,18 +248,9 @@ public class ConfigManager implements Manager {
     if (getConsensusManager().isLeader()) {
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } else {
-      Peer peer = getConsensusManager().getLeader();
-      if (peer == null) {
-        return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
-            .setMessage(
-                "The current ConfigNode is not leader. And ConfigNodeGroup is in leader election. Please redirect with a random ConfigNode.");
-      } else {
-        // TODO Get rpc port of leader
-        return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
-            .setRedirectNode(
-                new EndPoint(peer.getEndpoint().getIp(), peer.getEndpoint().getPort() - 1))
-            .setMessage("The current ConfigNode is not leader. Please redirect.");
-      }
+      return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
+          .setMessage(
+              "The current ConfigNode is not leader. And ConfigNodeGroup is in leader election. Please redirect with a random ConfigNode.");
     }
   }
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/cli/TemporaryClientDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/cli/TemporaryClientDemo.java
index c340a53670..245c0b0acc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/cli/TemporaryClientDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/cli/TemporaryClientDemo.java
@@ -18,38 +18,57 @@
  */
 package org.apache.iotdb.confignode.cli;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
 
 public class TemporaryClientDemo {
 
   private static final int timeOutInMS = 10000;
 
-  public void setStorageGroupAndCreateRegionsDemo() throws TException, InterruptedException {
-    TTransport transport = RpcTransportFactory.INSTANCE.getTransport("0.0.0.0", 22277, timeOutInMS);
-    transport.open();
-    ConfigIService.Client client = new ConfigIService.Client(new TBinaryProtocol(transport));
+  private final Random random = new Random();
+  private Map<Integer, ConfigIService.Client> clients;
+  private ConfigIService.Client defaultClient;
 
-    TDataNodeRegisterReq registerReq = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
-    TDataNodeRegisterResp registerResp = client.registerDataNode(registerReq);
-    System.out.println(registerResp);
-    TimeUnit.MILLISECONDS.sleep(200);
+  public void setStorageGroupsDemo() throws TException {
+    createClients();
+    defaultClient = clients.get(22277);
 
     for (int i = 0; i < 5; i++) {
       TSetStorageGroupReq setReq = new TSetStorageGroupReq("root.sg" + i);
-      TSStatus status = client.setStorageGroup(setReq);
-      System.out.println(status.toString());
+      while (true) {
+        TSStatus status = defaultClient.setStorageGroup(setReq);
+        System.out.println(status.toString());
+        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          break;
+        } else if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+          int port = random.nextInt(3) * 2 + 22277;
+          if (status.getRedirectNode() != null) {
+            port = status.getRedirectNode().getPort();
+          }
+          defaultClient = clients.get(port);
+        }
+      }
+    }
+  }
+
+  private void createClients() throws TTransportException {
+    clients = new HashMap<>();
+    for (int i = 22277; i <= 22281; i += 2) {
+      TTransport transport = RpcTransportFactory.INSTANCE.getTransport("0.0.0.0", i, timeOutInMS);
+      transport.open();
+      clients.put(i, new ConfigIService.Client(new TBinaryProtocol(transport)));
     }
   }
 }