You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/12/11 02:18:35 UTC
[iotdb] branch master updated: [IOTDB-5108] Added region migration sql (#8341)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c8b7948a94 [IOTDB-5108] Added region migration sql (#8341)
c8b7948a94 is described below
commit c8b7948a94e11561c441ed294bad23756fb8e762
Author: Caideyipi <87...@users.noreply.github.com>
AuthorDate: Sun Dec 11 10:18:27 2022 +0800
[IOTDB-5108] Added region migration sql (#8341)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 7 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 8 ++
.../iotdb/confignode/manager/ConfigManager.java | 13 ++-
.../apache/iotdb/confignode/manager/IManager.java | 3 +
.../iotdb/confignode/manager/ProcedureManager.java | 93 ++++++++++++++++++++++
.../iotdb/confignode/manager/node/NodeManager.java | 5 +-
.../partition/heartbeat/RegionGroupCache.java | 2 +-
.../statemachine/CreateRegionGroupsProcedure.java | 2 +-
.../impl/statemachine/RegionMigrateProcedure.java | 15 ++--
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 ++
docs/UserGuide/Cluster/Cluster-Maintenance.md | 65 ++++++++++++++-
.../UserGuide/Cluster/Deployment-Recommendation.md | 49 ++++++------
docs/UserGuide/QuickStart/QuickStart.md | 2 +-
docs/zh/UserGuide/Cluster/Cluster-Maintenance.md | 63 +++++++++++++++
.../apache/iotdb/commons/client/ClientManager.java | 6 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 26 +++++-
.../plan/execution/config/ConfigTaskVisitor.java | 8 ++
.../config/executor/ClusterConfigTaskExecutor.java | 49 ++++++++----
.../config/executor/IConfigTaskExecutor.java | 3 +
.../executor/StandaloneConfigTaskExecutor.java | 12 +++
.../config/metadata/MigrateRegionTask.java | 43 ++++++++++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 9 +++
.../db/mpp/plan/statement/StatementVisitor.java | 5 ++
.../statement/metadata/GetRegionIdStatement.java | 7 +-
.../statement/metadata/MigrateRegionStatement.java | 80 +++++++++++++++++++
.../src/main/thrift/confignode.thrift | 10 ++-
26 files changed, 527 insertions(+), 64 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 338a478e78..24e183c51b 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -46,7 +46,7 @@ ddlStatement
| showSchemaTemplates | showNodesInSchemaTemplate
| showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
| countStorageGroup | countDevices | countTimeseries | countNodes
- | getRegionId | getTimeSlotList | getSeriesSlotList
+ | getRegionId | getTimeSlotList | getSeriesSlotList | migrateRegion
;
dmlStatement
@@ -255,6 +255,11 @@ getSeriesSlotList
: SHOW (DATA|SCHEMA)? SERIESSLOTID OF path=prefixPath
;
+// Migrate Region
+migrateRegion
+ : MIGRATE REGION regionId=INTEGER_LITERAL FROM fromId=INTEGER_LITERAL TO toId=INTEGER_LITERAL
+ ;
+
// Set TTL
setTTL
: SET TTL TO path=prefixPath time=INTEGER_LITERAL
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index c0438cf6ca..972ba814b9 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -362,6 +362,10 @@ METADATA
: M E T A D A T A
;
+MIGRATE
+ : M I G R A T E
+ ;
+
NODES
: N O D E S
;
@@ -478,6 +482,10 @@ REGEXP
: R E G E X P
;
+REGION
+ : R E G I O N
+ ;
+
REGIONID
: R E G I O N I D
;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 40df7c5d39..6ee94f3a40 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -117,6 +117,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
@@ -1318,6 +1319,14 @@ public class ConfigManager implements IManager {
: new TGetSeriesSlotListResp(status);
}
+ @Override
+ public TSStatus migrateRegion(TMigrateRegionReq req) {
+ TSStatus status = confirmLeader();
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ ? procedureManager.migrateRegion(req)
+ : status;
+ }
+
@Override
public TSStatus createCQ(TCreateCQReq req) {
TSStatus status = confirmLeader();
@@ -1432,7 +1441,7 @@ public class ConfigManager implements IManager {
dataNodeConfiguration.getLocation().getDataNodeId(),
dataNodeConfiguration.getLocation()));
if (runningDataNodeLocationMap.isEmpty()) {
- // no running DataNode, will not transfer and print log
+ // No running DataNode, will not transfer and print log
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
}
@@ -1440,7 +1449,7 @@ public class ConfigManager implements IManager {
dataNodeLocation -> runningDataNodeLocationMap.remove(dataNodeLocation.getDataNodeId()));
LOGGER.info("Start transfer of {}", newUnknownDataList);
- // transfer trigger
+ // Transfer trigger
TSStatus transferResult =
triggerManager.transferTrigger(newUnknownDataList, runningDataNodeLocationMap);
if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 693622bf12..73d6671742 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -75,6 +75,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
@@ -533,6 +534,8 @@ public interface IManager {
TGetSeriesSlotListResp getSeriesSlotList(GetSeriesSlotListPlan plan);
+ TSStatus migrateRegion(TMigrateRegionReq req);
+
TSStatus createCQ(TCreateCQReq req);
TSStatus dropCQ(TDropCQReq req);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 9e42c753c0..579f3216d8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
@@ -35,6 +37,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
@@ -66,6 +69,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.db.metadata.template.Template;
@@ -82,6 +86,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -331,6 +336,94 @@ public class ProcedureManager {
return true;
}
+ public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
+ // TODO: Whether to guarantee the check high consistency, i.e, use consensus read to check
+ Map<TConsensusGroupId, RegionGroupCache> regionReplicaMap =
+ configManager.getPartitionManager().getRegionGroupCacheMap();
+ Optional<TConsensusGroupId> regionId =
+ regionReplicaMap.keySet().stream()
+ .filter(id -> id.getId() == migrateRegionReq.getRegionId())
+ .findAny();
+ TDataNodeLocation originalDataNode =
+ configManager
+ .getNodeManager()
+ .getRegisteredDataNode(migrateRegionReq.getFromId())
+ .getLocation();
+ TDataNodeLocation destDataNode =
+ configManager
+ .getNodeManager()
+ .getRegisteredDataNode(migrateRegionReq.getToId())
+ .getLocation();
+ if (!regionId.isPresent()) {
+ LOGGER.warn(
+ "Submit RegionMigrateProcedure failed, because no Region {}",
+ migrateRegionReq.getRegionId());
+ TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(
+ "Submit RegionMigrateProcedure failed, because no region Group "
+ + migrateRegionReq.getRegionId());
+ return status;
+ } else if (originalDataNode == null) {
+ LOGGER.warn(
+ "Submit RegionMigrateProcedure failed, because no original DataNode {}",
+ migrateRegionReq.getFromId());
+ TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(
+ "Submit RegionMigrateProcedure failed, because no original DataNode "
+ + migrateRegionReq.getFromId());
+ return status;
+ } else if (destDataNode == null) {
+ LOGGER.warn(
+ "Submit RegionMigrateProcedure failed, because no target DataNode {}",
+ migrateRegionReq.getToId());
+ TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(
+ "Submit RegionMigrateProcedure failed, because no target DataNode "
+ + migrateRegionReq.getToId());
+ return status;
+ } else if (!regionReplicaMap
+ .get(regionId.get())
+ .getStatistics()
+ .getRegionStatisticsMap()
+ .containsKey(migrateRegionReq.getFromId())) {
+ LOGGER.warn(
+ "Submit RegionMigrateProcedure failed, because the original DataNode {} doesn't contain Region {}",
+ migrateRegionReq.getFromId(),
+ migrateRegionReq.getRegionId());
+ TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(
+ "Submit RegionMigrateProcedure failed, because the original DataNode "
+ + migrateRegionReq.getFromId()
+ + " doesn't contain Region "
+ + migrateRegionReq.getRegionId());
+ return status;
+ } else if (regionReplicaMap
+ .get(regionId.get())
+ .getStatistics()
+ .getRegionStatisticsMap()
+ .containsKey(migrateRegionReq.getToId())) {
+ LOGGER.warn(
+ "Submit RegionMigrateProcedure failed, because the target DataNode {} already contains Region {}",
+ migrateRegionReq.getToId(),
+ migrateRegionReq.getRegionId());
+ TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(
+ "Submit RegionMigrateProcedure failed, because the target DataNode "
+ + migrateRegionReq.getToId()
+ + " already contains Region "
+ + migrateRegionReq.getRegionId());
+ return status;
+ }
+ this.executor.submitProcedure(
+ new RegionMigrateProcedure(regionId.get(), originalDataNode, destDataNode));
+ LOGGER.info(
+ "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}",
+ migrateRegionReq.getRegionId(),
+ migrateRegionReq.getFromId(),
+ migrateRegionReq.getToId());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
/**
* Generate CreateRegionGroupsProcedure and wait for it finished
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 0026de9710..ad7fac53c8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -300,10 +300,9 @@ public class NodeManager {
}
// Add request to queue, then return to client
- boolean registerSucceed =
- configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
+ boolean removeSucceed = configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
- if (registerSucceed) {
+ if (removeSucceed) {
status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("Server accepted the request");
} else {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
index acd3da603f..7c7ddfdb03 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/heartbeat/RegionGroupCache.java
@@ -30,7 +30,7 @@ public class RegionGroupCache {
private final TConsensusGroupId consensusGroupId;
- // Map<DataNodeId(where a RegionReplica resides), RegionCache>
+ // Map<DataNodeId(where a RegionReplica resides in), RegionCache>
private final Map<Integer, RegionCache> regionCacheMap;
// The previous RegionGroupStatistics, used for comparing with
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index dd15ad1117..999d6895cc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -232,7 +232,7 @@ public class CreateRegionGroupsProcedure
@Override
public void serialize(DataOutputStream stream) throws IOException {
- // must serialize CREATE_REGION_GROUPS.getTypeCode() firstly
+ // Must serialize CREATE_REGION_GROUPS.getTypeCode() firstly
stream.writeShort(ProcedureType.CREATE_REGION_GROUPS.getTypeCode());
super.serialize(stream);
stream.writeInt(consensusGroupType.getValue());
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 07a498c7a1..a9bfe61fa4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -44,7 +44,7 @@ import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANOD
import static org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
-/** region migrate procedure */
+/** Region migrate procedure */
public class RegionMigrateProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> {
private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class);
@@ -264,13 +264,14 @@ public class RegionMigrateProcedure
if (!migrateSuccess) {
throw new ProcedureException(
- String.format("Region migrate failed, regionId: %s", consensusGroupId));
+ String.format("Region migration failed, regionId: %s", consensusGroupId));
}
} catch (InterruptedException e) {
- LOG.error("{}, region migrate {} interrupt", REMOVE_DATANODE_PROCESS, consensusGroupId, e);
+ LOG.error(
+ "{}, region migration {} interrupt", REMOVE_DATANODE_PROCESS, consensusGroupId, e);
Thread.currentThread().interrupt();
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("wait region migrate interrupt," + e.getMessage());
+ status.setMessage("Waiting for region migration interruption," + e.getMessage());
}
}
return status;
@@ -280,17 +281,17 @@ public class RegionMigrateProcedure
public void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq req) {
LOG.info(
- "{}, ConfigNode received region migrate result reported by DataNode: {}",
+ "{}, ConfigNode received region migration result reported by DataNode: {}",
REMOVE_DATANODE_PROCESS,
req);
// TODO the req is used in roll back
synchronized (regionMigrateLock) {
TSStatus migrateStatus = req.getMigrateResult();
- // migrate failed
+ // Migration failed
if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
LOG.info(
- "{}, Region migrate failed in DataNode, migrateStatus: {}",
+ "{}, Region migration failed in DataNode, migrateStatus: {}",
REMOVE_DATANODE_PROCESS,
migrateStatus);
migrateSuccess = false;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 6277c2075a..2af5115e6e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -110,6 +110,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
@@ -750,6 +751,11 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
return configManager.getSeriesSlotList(plan);
}
+ @Override
+ public TSStatus migrateRegion(TMigrateRegionReq req) {
+ return configManager.migrateRegion(req);
+ }
+
@Override
public TSStatus createCQ(TCreateCQReq req) {
return configManager.createCQ(req);
diff --git a/docs/UserGuide/Cluster/Cluster-Maintenance.md b/docs/UserGuide/Cluster/Cluster-Maintenance.md
index 21f5df1772..311f35b201 100644
--- a/docs/UserGuide/Cluster/Cluster-Maintenance.md
+++ b/docs/UserGuide/Cluster/Cluster-Maintenance.md
@@ -362,7 +362,7 @@ Total line number = 1
It costs 0.007s
```
-### Show time slots of a series slot
+### Show the time slots of a series slot
Show the time slots under a particular series slot.
- `SHOW TIMESLOTID OF root.sg WHERE SERIESLOTID=s0 (AND STARTTIME=t1) (AND ENDTIME=t2)`
@@ -413,4 +413,67 @@ IoTDB> show seriesslotid of root.sg
+------------+
Total line number = 1
It costs 0.006s
+```
+
+## Migrate Region
+The following sql can be applied to manually migrate a region, for load balancing or other purposes.
+```
+MIGRATE REGION <Region-id> FROM <original-DataNodeId> TO <dest-DataNodeId>
+```
+Eg:
+```
+IoTDB> SHOW REGIONS
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+|RegionId| Type| Status| Database|SeriesSlotId|TimeSlotId|DataNodeId|RpcAddress|RpcPort| Role|
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 3| 127.0.0.1| 6670| Leader|
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 4| 127.0.0.1| 6681|Follower|
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 5| 127.0.0.1| 6668|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 1| 127.0.0.1| 6667|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 3| 127.0.0.1| 6670|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 7| 127.0.0.1| 6669| Leader|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 3| 127.0.0.1| 6670| Leader|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 4| 127.0.0.1| 6681|Follower|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 5| 127.0.0.1| 6668|Follower|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 1| 127.0.0.1| 6667|Follower|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 5| 127.0.0.1| 6668| Leader|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 7| 127.0.0.1| 6669|Follower|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 3| 127.0.0.1| 6670|Follower|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 4| 127.0.0.1| 6681| Leader|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 7| 127.0.0.1| 6669|Follower|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 1| 127.0.0.1| 6667| Leader|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 4| 127.0.0.1| 6681|Follower|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 5| 127.0.0.1| 6668|Follower|
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+Total line number = 18
+It costs 0.161s
+
+IoTDB> MIGRATE REGION 1 FROM 3 TO 4
+Msg: The statement is executed successfully.
+
+IoTDB> SHOW REGIONS
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+|RegionId| Type| Status| Database|SeriesSlotId|TimeSlotId|DataNodeId|RpcAddress|RpcPort| Role|
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 3| 127.0.0.1| 6670| Leader|
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 4| 127.0.0.1| 6681|Follower|
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 5| 127.0.0.1| 6668|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 1| 127.0.0.1| 6667|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 4| 127.0.0.1| 6681|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 7| 127.0.0.1| 6669| Leader|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 3| 127.0.0.1| 6670| Leader|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 4| 127.0.0.1| 6681|Follower|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 5| 127.0.0.1| 6668|Follower|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 1| 127.0.0.1| 6667|Follower|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 5| 127.0.0.1| 6668| Leader|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 7| 127.0.0.1| 6669|Follower|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 3| 127.0.0.1| 6670|Follower|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 4| 127.0.0.1| 6681| Leader|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 7| 127.0.0.1| 6669|Follower|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 1| 127.0.0.1| 6667| Leader|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 4| 127.0.0.1| 6681|Follower|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 5| 127.0.0.1| 6668|Follower|
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+Total line number = 18
+It costs 0.165s
```
\ No newline at end of file
diff --git a/docs/UserGuide/Cluster/Deployment-Recommendation.md b/docs/UserGuide/Cluster/Deployment-Recommendation.md
index fd21477a92..16be276a5d 100644
--- a/docs/UserGuide/Cluster/Deployment-Recommendation.md
+++ b/docs/UserGuide/Cluster/Deployment-Recommendation.md
@@ -20,18 +20,18 @@
-->
# IoTDB Deployment Recommendation
-## Backgrands
+## Backgrounds
System Abilities
-- Performance: write and read performance, compression ratio
-- Extensibility: system has ability to manage data with multiple nodes, essentially data can be manage by partitions
-- High availability(HA): system has ability to tolerate node disconnected, essentially whether the data has replicas
-- Consistency:when data with multiple copies, whether the replicas consistent, essentially treat the database as a single node
+- Performance: writing and reading performance, compression ratio
+- Extensibility: system has the ability to manage data with multiple nodes, and is essentially that data can be managed by partitions
+- High availability(HA): system has the ability to tolerate the nodes disconnected, and is essentially that the data has replicas
+- Consistency:when data is with multiple copies, whether the replicas are consistent, and is essentially that the system treats the whole database as a single node
-Abbreviation
+Abbreviations
- C: ConfigNode
- D: DataNode
-- aCbD:cluster with a ConfigNodes and b DataNodes
+- nCmD:cluster with n ConfigNodes and m DataNodes
## Deployment mode
@@ -43,15 +43,15 @@ Abbreviation
| Strong consistency cluster mode | Medium | High | High | High |
-| Config | Lightweight standalone mode | Scalable single node mode | High performance mode | trong consistency cluster mode |
-|:--------------------------------------:|:----------------------------|:--------------------------|:----------------------|:-------------------------------|
-| ConfigNode number | 1 | ≥1 (odd number) | ≥1 (odd number) | ≥1 (odd number) |
-| DataNode number | 1 | ≥1 | ≥3 | ≥3 |
-| schema_replication_factor | 1 | 1 | 3 | 3 |
-| data_replication_factor | 1 | 1 | 2 | 3 |
-| config_node_consensus_protocol_class | Simple | Ratis | Ratis | Ratis |
-| schema_region_consensus_protocol_class | Simple | Ratis | Ratis | Ratis |
-| data_region_consensus_protocol_class | Simple | IoT | IoT | Ratis |
+| Config | Lightweight standalone mode | Scalable single node mode | High performance mode | strong consistency cluster mode |
+|:--------------------------------------:|:----------------------------|:--------------------------|:----------------------|:--------------------------------|
+| ConfigNode number | 1 | ≥1 (odd number) | ≥1 (odd number) | ≥1 (odd number) |
+| DataNode number | 1 | ≥1 | ≥3 | ≥3 |
+| schema_replication_factor | 1 | 1 | 3 | 3 |
+| data_replication_factor | 1 | 1 | 2 | 3 |
+| config_node_consensus_protocol_class | Simple | Ratis | Ratis | Ratis |
+| schema_region_consensus_protocol_class | Simple | Ratis | Ratis | Ratis |
+| data_region_consensus_protocol_class | Simple | IoT | IoT | Ratis |
## Deployment Recommendation
@@ -74,7 +74,7 @@ Configuration modification:
- for 1C1D standalone mode: use virtual_storage_group_num in v0.13
Data migration:
-After modified the configuration, use load-tsfile tool to load the TsFiles of v0.13 to v1.0.
+After modifying the configuration, use load-tsfile tool to load the TsFiles of v0.13 to v1.0.
### Use v1.0 directly
@@ -98,8 +98,8 @@ Cluster DataNode total heap size(B) = 20 * (180 + 2 * average character num
Heap size of each DataNode = Cluster DataNode total heap size / DataNode number
-> Example: use 3C3D to manage 1 million timeseries, use 3schema replicas, series name such as root.sg_1.d_10.s_100(20 chars)
-> - Cluster DataNode total heap size: 20 * (180 + 2 * 20)* 1,000,000 * 3 = 13.2 GB
+> Example: use 3C3D to manage 1 million timeseries, use 3 schema replicas, series name such as root.sg_1.d_10.s_100(20 chars)
+> - Cluster DataNode total heap size: 20 * (180 + 2 * 20) * 1,000,000 * 3 = 13.2 GB
> - Heap size of each DataNode: 13.2 GB / 3 = 4.4 GB
#### Disk estimation
@@ -123,8 +123,8 @@ Series number * Sampling frequency * Data point size * Storage duration * data_r
##### Schema storage size
-One series uses the path charactor byte size + 20 bytes.
-If the series has tag, add the tag charactor byte size.
+One series uses the path character byte size + 20 bytes.
+If the series has tag, add the tag character byte size.
##### Temp storage size
@@ -136,7 +136,7 @@ max wal storage size = memtable memory size ÷ wal_min_effective_info_ratio
- memtable memory size is decided by storage_query_schema_consensus_free_memory_proportion, storage_engine_memory_proportion and write_memory_proportion
- wal_min_effective_info_ratio is decided by wal_min_effective_info_ratio configuration
-> Example: allocate 16G memory for Datanode, config as below:
+> Example: allocate 16G memory for DataNode, config is as below:
> storage_query_schema_consensus_free_memory_proportion=3:3:1:1:2
> storage_engine_memory_proportion=8:2
> write_memory_proportion=19:1
@@ -146,7 +146,8 @@ max wal storage size = memtable memory size ÷ wal_min_effective_info_ratio
2. Consensus
Ratis consensus
-When using ratis consensus protocol, we need extra storage for Raft Log. Raft Log will be deleted after state machine takes snapshot.
+
+When using ratis consensus protocol, we need extra storage for Raft Log, which will be deleted after the state machine takes snapshot.
We can adjust `trigger_snapshot_threshold` to control the maximum Raft Log disk usage.
@@ -174,7 +175,7 @@ By default, data_region_ratis_log_max_size=20G, which guarantees that Raft Log s
The overlap of out-of-order data = overlapped data amount / total out-of-order data amount
Disk space for temporary file = Total ordered Disk space of origin files + Total out-of-order disk space of origin files *(1 - overlap)
- > Example: 10 sequence files, 10 out-of-order files, 100M for each sequence file, 50M for each out-of-order file, half of data is overlapped with sequence file
+ > Example: 10 ordered files, 10 out-of-order files, 100M for each ordered file, 50M for each out-of-order file, half of data is overlapped with sequence file
> The overlap of out-of-order data = 25M/50M * 100% = 50%
> Disk space for temporary files = 10 * 100 + 10 * 50 * 50% = 1250M
diff --git a/docs/UserGuide/QuickStart/QuickStart.md b/docs/UserGuide/QuickStart/QuickStart.md
index 56cd9349a2..57844bfc87 100644
--- a/docs/UserGuide/QuickStart/QuickStart.md
+++ b/docs/UserGuide/QuickStart/QuickStart.md
@@ -46,7 +46,7 @@ You can download the binary file from:
## Configurations
-configuration files are under "conf" folder
+Configuration files are under "conf" folder
* environment config module (`datanode-env.bat`, `datanode-env.sh`),
* system config module (`iotdb-datanode.properties`)
diff --git a/docs/zh/UserGuide/Cluster/Cluster-Maintenance.md b/docs/zh/UserGuide/Cluster/Cluster-Maintenance.md
index 5fdcc02b4b..acf8e028af 100644
--- a/docs/zh/UserGuide/Cluster/Cluster-Maintenance.md
+++ b/docs/zh/UserGuide/Cluster/Cluster-Maintenance.md
@@ -412,4 +412,67 @@ IoTDB> show seriesslotid of root.sg
+------------+
Total line number = 1
It costs 0.006s
+```
+
+## 迁移 Region
+以下 SQL 语句可以被用于手动迁移一个 region, 可用于负载均衡或其他目的。
+```
+MIGRATE REGION <Region-id> FROM <original-DataNodeId> TO <dest-DataNodeId>
+```
+示例:
+```
+IoTDB> SHOW REGIONS
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+|RegionId| Type| Status| Database|SeriesSlotId|TimeSlotId|DataNodeId|RpcAddress|RpcPort| Role|
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 3| 127.0.0.1| 6670| Leader|
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 4| 127.0.0.1| 6681|Follower|
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 5| 127.0.0.1| 6668|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 1| 127.0.0.1| 6667|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 3| 127.0.0.1| 6670|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 7| 127.0.0.1| 6669| Leader|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 3| 127.0.0.1| 6670| Leader|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 4| 127.0.0.1| 6681|Follower|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 5| 127.0.0.1| 6668|Follower|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 1| 127.0.0.1| 6667|Follower|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 5| 127.0.0.1| 6668| Leader|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 7| 127.0.0.1| 6669|Follower|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 3| 127.0.0.1| 6670|Follower|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 4| 127.0.0.1| 6681| Leader|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 7| 127.0.0.1| 6669|Follower|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 1| 127.0.0.1| 6667| Leader|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 4| 127.0.0.1| 6681|Follower|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 5| 127.0.0.1| 6668|Follower|
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+Total line number = 18
+It costs 0.161s
+
+IoTDB> MIGRATE REGION 1 FROM 3 TO 4
+Msg: The statement is executed successfully.
+
+IoTDB> SHOW REGIONS
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+|RegionId| Type| Status| Database|SeriesSlotId|TimeSlotId|DataNodeId|RpcAddress|RpcPort| Role|
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 3| 127.0.0.1| 6670| Leader|
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 4| 127.0.0.1| 6681|Follower|
+| 0|SchemaRegion|Running|root.test.g_0| 500| 0| 5| 127.0.0.1| 6668|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 1| 127.0.0.1| 6667|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 4| 127.0.0.1| 6681|Follower|
+| 1| DataRegion|Running|root.test.g_0| 183| 200| 7| 127.0.0.1| 6669| Leader|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 3| 127.0.0.1| 6670| Leader|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 4| 127.0.0.1| 6681|Follower|
+| 2| DataRegion|Running|root.test.g_0| 181| 200| 5| 127.0.0.1| 6668|Follower|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 1| 127.0.0.1| 6667|Follower|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 5| 127.0.0.1| 6668| Leader|
+| 3| DataRegion|Running|root.test.g_0| 180| 200| 7| 127.0.0.1| 6669|Follower|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 3| 127.0.0.1| 6670|Follower|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 4| 127.0.0.1| 6681| Leader|
+| 4| DataRegion|Running|root.test.g_0| 179| 200| 7| 127.0.0.1| 6669|Follower|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 1| 127.0.0.1| 6667| Leader|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 4| 127.0.0.1| 6681|Follower|
+| 5| DataRegion|Running|root.test.g_0| 179| 200| 5| 127.0.0.1| 6668|Follower|
++--------+------------+-------+-------------+------------+----------+----------+----------+-------+--------+
+Total line number = 18
+It costs 0.165s
```
\ No newline at end of file
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 6c826aa22e..ba094a54ee 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -49,13 +49,13 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
try {
client = pool.borrowObject(node);
} catch (TTransportException e) {
- // external needs to check transport related exception
+ // External needs to check transport related exception
throw new IOException(e);
} catch (IOException e) {
- // external needs the IOException to check connection
+ // External needs the IOException to check connection
throw e;
} catch (Exception e) {
- // external doesn't care of other exceptions
+ // External doesn't care of other exceptions
String errorMessage =
String.format(
"Borrow client from pool for node %s failed, you need to increase dn_max_connection_for_internal_service.",
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index a569857df9..a9b10f1117 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -79,6 +79,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
@@ -186,7 +187,7 @@ public class ConfigNodeClient
try {
tryToConnect();
} catch (TException e) {
- // can not connect to each config node
+ // Can not connect to each config node
syncLatestConfigNodeList();
tryToConnect();
}
@@ -196,7 +197,7 @@ public class ConfigNodeClient
try {
transport =
RpcTransportFactory.INSTANCE.getTransport(
- // as there is a try-catch already, we do not need to use TSocket.wrap
+ // As there is a try-catch already, we do not need to use TSocket.wrap
endpoint.getIp(), endpoint.getPort(), (int) connectionTimeout);
if (!transport.isOpen()) {
transport.open();
@@ -1717,6 +1718,27 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
+ @Override
+ public TSStatus migrateRegion(TMigrateRegionReq req) throws TException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.migrateRegion(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ logger.warn(
+ "Failed to connect to ConfigNode {} from DataNode {} when executing {}",
+ configNode,
+ config.getAddressAndPort(),
+ Thread.currentThread().getStackTrace()[1].getMethodName());
+ configLeader = null;
+ }
+ waitAndReconnect();
+ }
+ throw new TException(MSG_RECONNECTION_FAIL);
+ }
+
@Override
public TSStatus createCQ(TCreateCQReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 5fe55dadcc..7ca1e60b27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropTriggerTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetRegionIdTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetSeriesSlotListTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetTimeSlotListTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.MigrateRegionTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetStorageGroupTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetTTLTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowClusterDetailsTask;
@@ -81,6 +82,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetTimeSlotListStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.MigrateRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
@@ -377,6 +379,12 @@ public class ConfigTaskVisitor
return new GetTimeSlotListTask(getTimeSlotListStatement);
}
+ @Override
+ public IConfigTask visitMigrateRegion(
+ MigrateRegionStatement migrateRegionStatement, TaskContext context) {
+ return new MigrateRegionTask(migrateRegionStatement);
+ }
+
@Override
public IConfigTask visitCreateContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement, TaskContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index fee86e04e6..953550f7a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -51,11 +51,11 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListResp;
-import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
@@ -108,6 +108,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetTimeSlotListStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.MigrateRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
@@ -316,7 +317,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
return future;
}
if (!uri.getScheme().equals("file")) {
- // download executable
+ // Download executable
ExecutableResource resource =
UDFExecutableManager.getInstance().request(Collections.singletonList(uriString));
String jarFilePathUnderTempDir =
@@ -334,7 +335,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
// If jarPath is a file path on datanode, we transfer it to ByteBuffer and send it to
// ConfigNode.
jarFile = ExecutableManager.transferToBytebuffer(libRoot);
- // set md5 of the jar file
+ // Set md5 of the jar file
jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot)));
}
} catch (IOException | URISyntaxException e) {
@@ -927,8 +928,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
public SettableFuture<ConfigTaskResult> showSchemaTemplate(
ShowSchemaTemplateStatement showSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- try (ConfigNodeClient configNodeClient =
- CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ try {
// Send request to some API server
List<Template> templateList = ClusterTemplateManager.getInstance().getAllTemplates();
// build TSBlock
@@ -944,12 +944,10 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
ShowNodesInSchemaTemplateStatement showNodesInSchemaTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
String req = showNodesInSchemaTemplateStatement.getTemplateName();
- TGetTemplateResp tGetTemplateResp = new TGetTemplateResp();
- try (ConfigNodeClient configNodeClient =
- CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ try {
// Send request to some API server
Template template = ClusterTemplateManager.getInstance().getTemplate(req);
- // build TSBlock
+ // Build TSBlock
ShowNodesInSchemaTemplateTask.buildTSBlock(template, future);
} catch (Exception e) {
future.setException(e);
@@ -979,12 +977,11 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
ShowPathSetTemplateStatement showPathSetTemplateStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
String templateName = showPathSetTemplateStatement.getTemplateName();
- try (ConfigNodeClient configNodeClient =
- CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ try {
// Send request to some API server
List<PartialPath> listPath =
ClusterTemplateManager.getInstance().getPathsSetTemplate(templateName);
- // build TSBlock
+ // Build TSBlock
ShowPathSetTemplateTask.buildTSBlock(listPath, future);
} catch (Exception e) {
future.setException(e);
@@ -1011,13 +1008,13 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT
|| e.getCause() instanceof SocketTimeoutException) {
- // time out mainly caused by slow execution, wait until
+ // Time out mainly caused by slow execution, just wait
tsStatus = RpcUtils.getStatus(TSStatusCode.OVERLAP_WITH_EXISTING_TASK);
} else {
throw e;
}
}
- // keep waiting until task ends
+ // Keep waiting until task ends
} while (TSStatusCode.OVERLAP_WITH_EXISTING_TASK.getStatusCode() == tsStatus.getCode());
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
@@ -1419,6 +1416,30 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> migrateRegion(
+ MigrateRegionStatement migrateRegionStatement) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient configNodeClient =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
+ TMigrateRegionReq tMigrateRegionReq =
+ new TMigrateRegionReq(
+ migrateRegionStatement.getRegionId(),
+ migrateRegionStatement.getFromId(),
+ migrateRegionStatement.getToId());
+ TSStatus status = configNodeClient.migrateRegion(tMigrateRegionReq);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.setException(new IoTDBException(status.message, status.code));
+ return future;
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (Exception e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> createContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 1e5ba37b17..3cbd2674de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetTimeSlotListStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.MigrateRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
@@ -156,6 +157,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> getTimeSlotList(
GetTimeSlotListStatement getTimeSlotListStatement);
+ SettableFuture<ConfigTaskResult> migrateRegion(MigrateRegionStatement migrateRegionStatement);
+
SettableFuture<ConfigTaskResult> createContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index 15be2fa2cd..67cc25a745 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetTimeSlotListStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.MigrateRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
@@ -669,6 +670,17 @@ public class StandaloneConfigTaskExecutor implements IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> migrateRegion(
+ MigrateRegionStatement migrateRegionStatement) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ future.setException(
+ new IoTDBException(
+ "Executing migrateRegion is not supported in standalone mode",
+ TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> createContinuousQuery(
CreateContinuousQueryStatement createContinuousQueryStatement, String sql, String username) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/MigrateRegionTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/MigrateRegionTask.java
new file mode 100644
index 0000000000..2488ecc671
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/MigrateRegionTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.MigrateRegionStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class MigrateRegionTask implements IConfigTask {
+
+ protected final MigrateRegionStatement statement;
+
+ public MigrateRegionTask(MigrateRegionStatement migrateRegionStatement) {
+ this.statement = migrateRegionStatement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor) {
+ // If the action is executed successfully, return the Future.
+ // If your operation is async, you can return the corresponding future directly.
+ return configTaskExecutor.migrateRegion(statement);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 23e897155a..89a4147ea4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -111,6 +111,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetTimeSlotListStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.MigrateRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
@@ -3055,4 +3056,12 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
return getTimeSlotListStatement;
}
+
+ @Override
+ public Statement visitMigrateRegion(IoTDBSqlParser.MigrateRegionContext ctx) {
+ return new MigrateRegionStatement(
+ Integer.parseInt(ctx.regionId.getText()),
+ Integer.parseInt(ctx.fromId.getText()),
+ Integer.parseInt(ctx.toId.getText()));
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index bd1d02330a..56f39db6d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetSeriesSlotListStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetTimeSlotListStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.MigrateRegionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
@@ -416,6 +417,10 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(getTimeSlotListStatement, context);
}
+ public R visitMigrateRegion(MigrateRegionStatement migrateRegionStatement, C context) {
+ return visitStatement(migrateRegionStatement, context);
+ }
+
public R visitDeactivateTemplate(
DeactivateTemplateStatement deactivateTemplateStatement, C context) {
return visitStatement(deactivateTemplateStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/GetRegionIdStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/GetRegionIdStatement.java
index 50f06be44d..c4fbcfb374 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/GetRegionIdStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/GetRegionIdStatement.java
@@ -34,12 +34,13 @@ import java.util.Collections;
import java.util.List;
/**
- * GET REGION statement
+ * GET REGIONID statement
*
* <p>Here is the syntax definition:
*
- * <p>SHOW (DATA|SCHEMA) REGIONID OF path=prefixPath WHERE SERIESSLOTID operator_eq
- * seriesSlot=INTEGER_LITERAL (OPERATOR_AND TIMESLOTID operator_eq timeSlot=INTEGER_LITERAL)?
+ * <p>SHOW (DATA|SCHEMA) REGIONID OF path=prefixPath WHERE (SERIESSLOTID operator_eq
+ * seriesSlot=INTEGER_LITERAL|DEVICEID operator_eq deviceId=prefixPath) (OPERATOR_AND (TIMESLOTID
+ * operator_eq timeSlot=INTEGER_LITERAL| TIMESTAMP operator_eq timeStamp=INTEGER_LITERAL))?
*/
public class GetRegionIdStatement extends Statement implements IConfigStatement {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/MigrateRegionStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/MigrateRegionStatement.java
new file mode 100644
index 0000000000..c993747ddf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/MigrateRegionStatement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.metadata;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * MIGRATE REGION statement
+ *
+ * <p>Here is the syntax definition:
+ *
+ * <p>MIGRATE REGION regionid=INTEGER_LITERAL FROM fromid=INTEGER_LITERAL TO toid=INTEGERLITERAL
+ */
+// TODO: Whether to support more complex migration, such as, migrate all region from 1, 2 to 5, 6
+public class MigrateRegionStatement extends Statement implements IConfigStatement {
+
+ private final int RegionId;
+
+ private final int FromId;
+
+ private final int ToId;
+
+ public MigrateRegionStatement(int RegionId, int FromId, int ToId) {
+ super();
+ this.RegionId = RegionId;
+ this.FromId = FromId;
+ this.ToId = ToId;
+ }
+
+ public int getRegionId() {
+ return RegionId;
+ }
+
+ public int getFromId() {
+ return FromId;
+ }
+
+ public int getToId() {
+ return ToId;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitMigrateRegion(this, context);
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.WRITE;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 4e86f9e6d2..3515f372f8 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -255,6 +255,12 @@ struct TGetSeriesSlotListResp {
2: optional list<common.TSeriesPartitionSlot> seriesSlotList
}
+struct TMigrateRegionReq {
+ 1: required i32 regionId
+ 2: required i32 fromId
+ 3: required i32 toId
+}
+
// Authorize
struct TAuthorizerReq {
1: required i32 authorType
@@ -945,6 +951,9 @@ service IConfigNodeRPCService {
/** TestOnly. Set the target DataNode to the specified status */
common.TSStatus setDataNodeStatus(TSetDataNodeStatusReq req)
+ /** Migrate a region replica from one dataNode to another */
+ common.TSStatus migrateRegion(TMigrateRegionReq req)
+
// ======================================================
// Cluster Tools
// ======================================================
@@ -1074,7 +1083,6 @@ service IConfigNodeRPCService {
/** Get the given database's assigned SeriesSlots */
TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req)
-
// ====================================================
// CQ
// ====================================================