You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/14 18:10:27 UTC
[iotdb] branch master updated: Open Optimize ConfigNode redirect and create region process (#5550)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0d83f7a894 Open Optimize ConfigNode redirect and create region process (#5550)
0d83f7a894 is described below
commit 0d83f7a894ad6c4d3359ff7892569f57eb2dd1c2
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Fri Apr 15 02:10:21 2022 +0800
Open Optimize ConfigNode redirect and create region process (#5550)
---
.../iotdb/confignode/cli/TemporaryClient.java | 5 ++-
.../iotdb/confignode/manager/ConfigManager.java | 17 ++------
.../iotdb/confignode/cli/TemporaryClientDemo.java | 47 +++++++++++++++-------
3 files changed, 40 insertions(+), 29 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java b/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
index 8e966434c5..787d07b579 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/cli/TemporaryClient.java
@@ -47,7 +47,8 @@ public class TemporaryClient {
private static final Logger LOGGER = LoggerFactory.getLogger(TemporaryClient.class);
- private static final int timeOutInMS = 2000;
+ // TODO: Add timeout parameter
+ private static final int timeOutInMS = 10000;
private static final int retryWait = 10;
private static final int retryNum = 3;
@@ -115,6 +116,7 @@ public class TemporaryClient {
status);
}
} catch (TException e) {
+ // TODO: Handler SocketTimeOutException
LOGGER.error(
"Create SchemaRegion on DataNode: {} failed, {}. Retrying...",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint(),
@@ -167,6 +169,7 @@ public class TemporaryClient {
status);
}
} catch (TException e) {
+ // TODO: Handler SocketTimeOutException
LOGGER.error(
"Create DataRegion on DataNode: {} failed, {}. Retrying...",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint(),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 54b0e294d5..c605d418b5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.manager;
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
@@ -36,7 +35,6 @@ import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -250,18 +248,9 @@ public class ConfigManager implements Manager {
if (getConsensusManager().isLeader()) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
- Peer peer = getConsensusManager().getLeader();
- if (peer == null) {
- return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
- .setMessage(
- "The current ConfigNode is not leader. And ConfigNodeGroup is in leader election. Please redirect with a random ConfigNode.");
- } else {
- // TODO Get rpc port of leader
- return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
- .setRedirectNode(
- new EndPoint(peer.getEndpoint().getIp(), peer.getEndpoint().getPort() - 1))
- .setMessage("The current ConfigNode is not leader. Please redirect.");
- }
+ return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
+ .setMessage(
+ "The current ConfigNode is not leader. And ConfigNodeGroup is in leader election. Please redirect with a random ConfigNode.");
}
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/cli/TemporaryClientDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/cli/TemporaryClientDemo.java
index c340a53670..245c0b0acc 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/cli/TemporaryClientDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/cli/TemporaryClientDemo.java
@@ -18,38 +18,57 @@
*/
package org.apache.iotdb.confignode.cli;
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
public class TemporaryClientDemo {
private static final int timeOutInMS = 10000;
- public void setStorageGroupAndCreateRegionsDemo() throws TException, InterruptedException {
- TTransport transport = RpcTransportFactory.INSTANCE.getTransport("0.0.0.0", 22277, timeOutInMS);
- transport.open();
- ConfigIService.Client client = new ConfigIService.Client(new TBinaryProtocol(transport));
+ private final Random random = new Random();
+ private Map<Integer, ConfigIService.Client> clients;
+ private ConfigIService.Client defaultClient;
- TDataNodeRegisterReq registerReq = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
- TDataNodeRegisterResp registerResp = client.registerDataNode(registerReq);
- System.out.println(registerResp);
- TimeUnit.MILLISECONDS.sleep(200);
+ public void setStorageGroupsDemo() throws TException {
+ createClients();
+ defaultClient = clients.get(22277);
for (int i = 0; i < 5; i++) {
TSetStorageGroupReq setReq = new TSetStorageGroupReq("root.sg" + i);
- TSStatus status = client.setStorageGroup(setReq);
- System.out.println(status.toString());
+ while (true) {
+ TSStatus status = defaultClient.setStorageGroup(setReq);
+ System.out.println(status.toString());
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ break;
+ } else if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ int port = random.nextInt(3) * 2 + 22277;
+ if (status.getRedirectNode() != null) {
+ port = status.getRedirectNode().getPort();
+ }
+ defaultClient = clients.get(port);
+ }
+ }
+ }
+ }
+
+ private void createClients() throws TTransportException {
+ clients = new HashMap<>();
+ for (int i = 22277; i <= 22281; i += 2) {
+ TTransport transport = RpcTransportFactory.INSTANCE.getTransport("0.0.0.0", i, timeOutInMS);
+ transport.open();
+ clients.put(i, new ConfigIService.Client(new TBinaryProtocol(transport)));
}
}
}