You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/11 03:44:50 UTC
[iotdb] branch master updated: [IOTDB-4044] Remove a DataNode from the cluster, when this node stopped. (#6938)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 68dedba8d2 [IOTDB-4044] Remove a DataNode from the cluster, when this node stopped. (#6938)
68dedba8d2 is described below
commit 68dedba8d2cb3c42a25d3fe83be2dcfc81b79575
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Thu Aug 11 11:44:44 2022 +0800
[IOTDB-4044] Remove a DataNode from the cluster, when this node stopped. (#6938)
---
.../confignode/client/DataNodeRequestType.java | 5 +-
.../sync/datanode/SyncDataNodeClientPool.java | 38 +-
.../procedure/env/DataNodeRemoveHandler.java | 134 +++++--
.../procedure/impl/RegionMigrateProcedure.java | 48 ++-
.../procedure/impl/RemoveDataNodeProcedure.java | 26 +-
.../procedure/state/RegionTransitionState.java | 5 +-
.../iotdb/db/service/RegionMigrateService.java | 387 ++++++++++++---------
.../impl/DataNodeInternalRPCServiceImpl.java | 72 ++--
thrift/src/main/thrift/datanode.thrift | 19 +-
9 files changed, 469 insertions(+), 265 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 8eccbfe2d0..7fc041f37e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -24,7 +24,10 @@ public enum DataNodeRequestType {
INVALIDATE_PARTITION_CACHE,
INVALIDATE_PERMISSION_CACHE,
INVALIDATE_SCHEMA_CACHE,
- MIGRATE_REGION,
+ ADD_REGION_CONSENSUS_GROUP,
+ ADD_REGION_PEER,
+ REMOVE_REGION_PEER,
+ REMOVE_REGION_CONSENSUS_GROUP,
DISABLE_DATA_NODE,
STOP_DATA_NODE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
index cf95dd7bf0..792ffd4640 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/datanode/SyncDataNodeClientPool.java
@@ -79,14 +79,20 @@ public class SyncDataNodeClientPool {
return client.deleteRegion((TConsensusGroupId) req);
case INVALIDATE_PERMISSION_CACHE:
return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
- case MIGRATE_REGION:
- return client.migrateRegion((TMigrateRegionReq) req);
case DISABLE_DATA_NODE:
return client.disableDataNode((TDisableDataNodeReq) req);
case STOP_DATA_NODE:
return client.stopDataNode();
case UPDATE_TEMPLATE:
return client.updateTemplate((TUpdateTemplateReq) req);
+ case ADD_REGION_CONSENSUS_GROUP:
+ return client.addToRegionConsensusGroup((TAddConsensusGroup) req);
+ case ADD_REGION_PEER:
+ return client.addRegionPeer((TMigrateRegionReq) req);
+ case REMOVE_REGION_PEER:
+ return client.removeRegionPeer((TMigrateRegionReq) req);
+ case REMOVE_REGION_CONSENSUS_GROUP:
+ return client.removeToRegionConsensusGroup((TMigrateRegionReq) req);
default:
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
@@ -175,34 +181,6 @@ public class SyncDataNodeClientPool {
return status;
}
- public TSStatus addToRegionConsensusGroup(
- List<TDataNodeLocation> regionOriginalPeerNodes,
- TConsensusGroupId regionId,
- TDataNodeLocation newPeerNode,
- String storageGroup,
- long ttl) {
- TSStatus status;
- // do addConsensusGroup in new node locally
- try (SyncDataNodeInternalServiceClient client =
- clientManager.borrowClient(newPeerNode.getInternalEndPoint())) {
- List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionOriginalPeerNodes);
- currentPeerNodes.add(newPeerNode);
- TAddConsensusGroup req = new TAddConsensusGroup(regionId, currentPeerNodes, storageGroup);
- req.setTtl(ttl);
- status = client.addToRegionConsensusGroup(req);
- } catch (IOException e) {
- LOGGER.error("Can't connect to Data Node {}.", newPeerNode.getInternalEndPoint(), e);
- status = new TSStatus(TSStatusCode.NO_CONNECTION.getStatusCode());
- status.setMessage(e.getMessage());
- } catch (TException e) {
- LOGGER.error(
- "Add region consensus {} group failed to Date node: {}", regionId, newPeerNode, e);
- status = new TSStatus(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
- status.setMessage(e.getMessage());
- }
- return status;
- }
-
// TODO: Is the ClientPool must be a singleton?
private static class ClientPoolHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 580d1f7519..ca4d63156b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import org.apache.iotdb.mpp.rpc.thrift.TAddConsensusGroup;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -126,7 +127,6 @@ public class DataNodeRemoveHandler {
return null;
}
- // will migrate the region to the new node, which should not be same raft
Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
if (!newNode.isPresent()) {
LOGGER.warn("No enough Data node to migrate region: {}", regionId);
@@ -135,30 +135,60 @@ public class DataNodeRemoveHandler {
}
/**
- * Send to DataNode, migrate region from originalDataNode to destDataNode
+ * Send to DataNode, add region peer
*
* @param originalDataNode old location data node
* @param destDataNode dest data node
* @param regionId region id
* @return migrate status
*/
- public TSStatus migrateRegion(
+ public TSStatus addRegionPeer(
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode,
TConsensusGroupId regionId) {
TSStatus status;
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
- if (regionReplicaNodes.isEmpty()) {
- LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ Optional<TDataNodeLocation> otherNode = findNodeOfAnotherReplica(regionId, originalDataNode);
+ if (!otherNode.isPresent()) {
+ LOGGER.warn(
+ "No other Node to change region leader, check by show regions, region: {}", regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("not find region replica nodes, region: " + regionId);
+ status.setMessage("No other Node to change region leader, check by show regions");
return status;
}
- // TODO if region replica is 1, the new leader is null, it also need to migrate
- Optional<TDataNodeLocation> newLeaderNode =
- regionReplicaNodes.stream().filter(e -> !e.equals(originalDataNode)).findAny();
- if (!newLeaderNode.isPresent()) {
+ TMigrateRegionReq migrateRegionReq =
+ new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
+ migrateRegionReq.setNewLeaderNode(otherNode.get());
+
+ // send to otherNode node
+ status =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ otherNode.get().getInternalEndPoint(),
+ migrateRegionReq,
+ DataNodeRequestType.ADD_REGION_PEER);
+ LOGGER.info(
+ "Send region {} add peer action to {}, wait it finished",
+ regionId,
+ otherNode.get().getInternalEndPoint());
+ return status;
+ }
+
+ /**
+ * Send to DataNode, remove region
+ *
+ * @param originalDataNode old location data node
+ * @param destDataNode dest data node
+ * @param regionId region id
+ * @return migrate status
+ */
+ public TSStatus removeRegionPeer(
+ TDataNodeLocation originalDataNode,
+ TDataNodeLocation destDataNode,
+ TConsensusGroupId regionId) {
+ TSStatus status;
+ Optional<TDataNodeLocation> otherNode = findNodeOfAnotherReplica(regionId, originalDataNode);
+ if (!otherNode.isPresent()) {
LOGGER.warn(
"No other Node to change region leader, check by show regions, region: {}", regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
@@ -168,15 +198,46 @@ public class DataNodeRemoveHandler {
TMigrateRegionReq migrateRegionReq =
new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
- migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
+ migrateRegionReq.setNewLeaderNode(otherNode.get());
+
+ // send to other node
+ status =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ otherNode.get().getInternalEndPoint(),
+ migrateRegionReq,
+ DataNodeRequestType.REMOVE_REGION_PEER);
+ LOGGER.info(
+ "Send region {} remove peer to {}, wait it finished",
+ regionId,
+ otherNode.get().getInternalEndPoint());
+ return status;
+ }
+
+ /**
+ * Send to DataNode, remove region consensus group from originalDataNode node
+ *
+ * @param originalDataNode old location data node
+ * @param destDataNode dest data node
+ * @param regionId region id
+ * @return migrate status
+ */
+ public TSStatus removeRegionConsensusGroup(
+ TDataNodeLocation originalDataNode,
+ TDataNodeLocation destDataNode,
+ TConsensusGroupId regionId) {
+ TSStatus status;
+ TMigrateRegionReq migrateRegionReq =
+ new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
+ migrateRegionReq.setNewLeaderNode(originalDataNode);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
originalDataNode.getInternalEndPoint(),
migrateRegionReq,
- DataNodeRequestType.MIGRATE_REGION);
+ DataNodeRequestType.REMOVE_REGION_CONSENSUS_GROUP);
LOGGER.info(
- "send region {} migrate action to {}, wait it finished",
+ "Send region {} remove consensus group action to {}, wait it finished",
regionId,
originalDataNode.getInternalEndPoint());
return status;
@@ -256,12 +317,20 @@ public class DataNodeRemoveHandler {
return status;
}
+ List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionReplicaNodes);
+ currentPeerNodes.add(destDataNode);
String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
+ TAddConsensusGroup req = new TAddConsensusGroup(regionId, currentPeerNodes, storageGroup);
+ // TODO replace with real ttl
+ req.setTtl(Long.MAX_VALUE);
+
status =
SyncDataNodeClientPool.getInstance()
- .addToRegionConsensusGroup(
- // TODO replace with real ttl
- regionReplicaNodes, regionId, destDataNode, storageGroup, Long.MAX_VALUE);
+ .sendSyncRequestToDataNodeWithRetry(
+ destDataNode.getInternalEndPoint(),
+ req,
+ DataNodeRequestType.ADD_REGION_CONSENSUS_GROUP);
+
LOGGER.info("send add region {} consensus group to {}", regionId, destDataNode);
if (isFailed(status)) {
LOGGER.error(
@@ -296,9 +365,6 @@ public class DataNodeRemoveHandler {
.sendSyncRequestToDataNodeWithRetry(
dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
LOGGER.info("stop Data Node {} result: {}", dataNode, status);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new ProcedureException("Failed to stop data node");
- }
return status;
}
@@ -383,4 +449,32 @@ public class DataNodeRemoveHandler {
removeDataNodes.add(tDataNodeLocation);
configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
}
+
+ public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
+ Optional<TDataNodeLocation> newLeaderNode =
+ findNodeOfAnotherReplica(regionId, tDataNodeLocation);
+ if (newLeaderNode.isPresent()) {
+ SyncDataNodeClientPool.getInstance()
+ .changeRegionLeader(
+ regionId, tDataNodeLocation.getInternalEndPoint(), newLeaderNode.get());
+ LOGGER.info(
+ "Change region leader finished, region is {}, newLeaderNode is {}",
+ regionId,
+ newLeaderNode);
+ }
+ }
+
+ private Optional<TDataNodeLocation> findNodeOfAnotherReplica(
+ TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
+ List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ if (regionReplicaNodes.isEmpty()) {
+ LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ return Optional.empty();
+ }
+
+ // TODO replace findAny() by select the low load node.
+ Optional<TDataNodeLocation> newLeaderNode =
+ regionReplicaNodes.stream().filter(e -> !e.equals(tDataNodeLocation)).findAny();
+ return newLeaderNode;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
index 8958e9c9f8..dd44b0ac33 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
@@ -74,6 +74,7 @@ public class RegionMigrateProcedure
if (consensusGroupId == null) {
return Flow.NO_MORE_STATE;
}
+ TSStatus tsStatus = null;
try {
switch (state) {
case REGION_MIGRATE_PREPARE:
@@ -82,13 +83,46 @@ public class RegionMigrateProcedure
case ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP:
env.getDataNodeRemoveHandler()
.addNewNodeToRegionConsensusGroup(consensusGroupId, destDataNode);
- setNextState(RegionTransitionState.MIGRATE_REGION);
+ setNextState(RegionTransitionState.ADD_REGION_PEER);
break;
- case MIGRATE_REGION:
- env.getDataNodeRemoveHandler()
- .migrateRegion(originalDataNode, destDataNode, consensusGroupId);
- waitForTheRegionMigrateFinished(consensusGroupId);
- LOG.info("Wait for region {} migrate finished", consensusGroupId);
+ case ADD_REGION_PEER:
+ tsStatus =
+ env.getDataNodeRemoveHandler()
+ .addRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ waitForOneMigrationStepFinished(consensusGroupId);
+ LOG.info("Wait for region {} add peer finished", consensusGroupId);
+ } else {
+ throw new ProcedureException("Failed to add region peer");
+ }
+ setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
+ break;
+ case CHANGE_REGION_LEADER:
+ env.getDataNodeRemoveHandler().changeRegionLeader(consensusGroupId, originalDataNode);
+ setNextState(RegionTransitionState.REMOVE_REGION_PEER);
+ break;
+ case REMOVE_REGION_PEER:
+ tsStatus =
+ env.getDataNodeRemoveHandler()
+ .removeRegionPeer(originalDataNode, destDataNode, consensusGroupId);
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ waitForOneMigrationStepFinished(consensusGroupId);
+ LOG.info("Wait for region {} remove peer finished", consensusGroupId);
+ } else {
+ throw new ProcedureException("Failed to remove region peer");
+ }
+ setNextState(RegionTransitionState.REMOVE_REGION_CONSENSUS_GROUP);
+ break;
+ case REMOVE_REGION_CONSENSUS_GROUP:
+ tsStatus =
+ env.getDataNodeRemoveHandler()
+ .removeRegionConsensusGroup(originalDataNode, destDataNode, consensusGroupId);
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ waitForOneMigrationStepFinished(consensusGroupId);
+ LOG.info("Wait for region {} remove consensus group finished", consensusGroupId);
+ }
+ // remove consensus group after a node stop, which will be failed, but we will continue
+ // execute.
setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE);
break;
case UPDATE_REGION_LOCATION_CACHE:
@@ -200,7 +234,7 @@ public class RegionMigrateProcedure
return false;
}
- public TSStatus waitForTheRegionMigrateFinished(TConsensusGroupId consensusGroupId) {
+ public TSStatus waitForOneMigrationStepFinished(TConsensusGroupId consensusGroupId) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
synchronized (regionMigrateLock) {
try {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
index 14348a3cc1..cf327ec8f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
@@ -42,7 +42,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
private static final int retryThreshold = 5;
- private TDataNodeLocation tDataNodeLocation;
+ private TDataNodeLocation disableDataNodeLocation;
private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
@@ -50,26 +50,26 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
super();
}
- public RemoveDataNodeProcedure(TDataNodeLocation tDataNodeLocation) {
+ public RemoveDataNodeProcedure(TDataNodeLocation disableDataNodeLocation) {
super();
- this.tDataNodeLocation = tDataNodeLocation;
+ this.disableDataNodeLocation = disableDataNodeLocation;
}
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) {
- if (tDataNodeLocation == null) {
+ if (disableDataNodeLocation == null) {
return Flow.NO_MORE_STATE;
}
try {
switch (state) {
case REMOVE_DATA_NODE_PREPARE:
execDataNodeRegionIds =
- env.getDataNodeRemoveHandler().getDataNodeRegionIds(tDataNodeLocation);
+ env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation);
LOG.info("DataNode region id is {}", execDataNodeRegionIds);
setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
break;
case BROADCAST_DISABLE_DATA_NODE:
- env.getDataNodeRemoveHandler().broadcastDisableDataNode(tDataNodeLocation);
+ env.getDataNodeRemoveHandler().broadcastDisableDataNode(disableDataNodeLocation);
setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE);
break;
case SUBMIT_REGION_MIGRATE:
@@ -77,8 +77,8 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
setNextState(RemoveDataNodeState.STOP_DATA_NODE);
break;
case STOP_DATA_NODE:
- env.getDataNodeRemoveHandler().stopDataNode(tDataNodeLocation);
- env.getDataNodeRemoveHandler().removeDataNodePersistence(tDataNodeLocation);
+ env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation);
+ env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation);
return Flow.NO_MORE_STATE;
}
} catch (Exception e) {
@@ -87,7 +87,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
} else {
LOG.error(
"Retrievable error trying to remove data node {}, state {}",
- tDataNodeLocation,
+ disableDataNodeLocation,
state,
e);
if (getCycles() > retryThreshold) {
@@ -136,7 +136,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
public void serialize(DataOutputStream stream) throws IOException {
stream.writeInt(ProcedureFactory.ProcedureType.REMOVE_DATA_NODE_PROCEDURE.ordinal());
super.serialize(stream);
- ThriftCommonsSerDeUtils.serializeTDataNodeLocation(tDataNodeLocation, stream);
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(disableDataNodeLocation, stream);
stream.writeInt(execDataNodeRegionIds.size());
execDataNodeRegionIds.forEach(
tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
@@ -146,7 +146,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
- tDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+ disableDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
int regionSize = byteBuffer.getInt();
execDataNodeRegionIds = new ArrayList<>(regionSize);
for (int i = 0; i < regionSize; i++) {
@@ -163,7 +163,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that;
return thatProc.getProcId() == this.getProcId()
&& thatProc.getState() == this.getState()
- && thatProc.tDataNodeLocation.equals(this.tDataNodeLocation);
+ && thatProc.disableDataNodeLocation.equals(this.disableDataNodeLocation);
}
return false;
}
@@ -175,7 +175,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
env.getDataNodeRemoveHandler().findDestDataNode(regionId);
if (destDataNode != null) {
RegionMigrateProcedure regionMigrateProcedure =
- new RegionMigrateProcedure(regionId, tDataNodeLocation, destDataNode);
+ new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
addChildProcedure(regionMigrateProcedure);
LOG.info("Submit child procedure, {}", regionMigrateProcedure);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
index 236a334dd9..fb0ebe4312 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.confignode.procedure.state;
public enum RegionTransitionState {
REGION_MIGRATE_PREPARE,
ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP,
- MIGRATE_REGION,
+ ADD_REGION_PEER,
+ CHANGE_REGION_LEADER,
+ REMOVE_REGION_PEER,
+ REMOVE_REGION_CONSENSUS_GROUP,
UPDATE_REGION_LOCATION_CACHE
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 0aaae47d64..914ec0b0bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -64,28 +64,21 @@ public class RegionMigrateService implements IService {
}
/**
- * migrate a region from fromNode to toNode
+ * add a region peer
*
- * @param fromNode from which node
- * @param regionId which region
- * @param toNode to which node
- * @param newLeaderNode transfer region leader to which node
+ * @param req TMigrateRegionReq
* @return submit task succeed?
*/
- private boolean submitRegionMigrateTask(
- TDataNodeLocation fromNode,
- TConsensusGroupId regionId,
- TDataNodeLocation toNode,
- TDataNodeLocation newLeaderNode) {
+ public synchronized boolean submitAddRegionPeerTask(TMigrateRegionReq req) {
+
boolean submitSucceed = true;
try {
- regionMigratePool.submit(new RegionMigrateTask(regionId, fromNode, toNode, newLeaderNode));
+ regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getToNode()));
} catch (Exception e) {
LOGGER.error(
- "submit region migrate task error. region: {}, from: {} --> to: {}.",
- regionId,
- fromNode.getInternalEndPoint().getIp(),
- toNode.getInternalEndPoint().getIp(),
+ "submit add region peer task error. region: {}, to: {}.",
+ req.getRegionId(),
+ req.getToNode().getInternalEndPoint().getIp(),
e);
submitSucceed = false;
}
@@ -93,14 +86,49 @@ public class RegionMigrateService implements IService {
}
/**
- * migrate a region
+ * remove a region peer
*
* @param req TMigrateRegionReq
* @return submit task succeed?
*/
- public synchronized boolean submitRegionMigrateTask(TMigrateRegionReq req) {
- return submitRegionMigrateTask(
- req.getFromNode(), req.getRegionId(), req.getToNode(), req.getNewLeaderNode());
+ public synchronized boolean submitRemoveRegionPeerTask(TMigrateRegionReq req) {
+
+ boolean submitSucceed = true;
+ try {
+ regionMigratePool.submit(
+ new RemoveRegionPeerTask(req.getRegionId(), req.getFromNode(), req.getNewLeaderNode()));
+ } catch (Exception e) {
+ LOGGER.error(
+ "submit remove region peer task error. region: {}, from: {}.",
+ req.getRegionId(),
+ req.getFromNode().getInternalEndPoint().getIp(),
+ e);
+ submitSucceed = false;
+ }
+ return submitSucceed;
+ }
+
+ /**
+ * remove a region peer
+ *
+ * @param req TMigrateRegionReq
+ * @return submit task succeed?
+ */
+ public synchronized boolean submitRemoveRegionConsensusGroupTask(TMigrateRegionReq req) {
+
+ boolean submitSucceed = true;
+ try {
+ regionMigratePool.submit(
+ new RemoveRegionConsensusGroupTask(req.getRegionId(), req.getFromNode()));
+ } catch (Exception e) {
+ LOGGER.error(
+ "submit remove region consensus group task error. region: {}, from: {}.",
+ req.getRegionId(),
+ req.getFromNode().getInternalEndPoint().getIp(),
+ e);
+ submitSucceed = false;
+ }
+ return submitSucceed;
}
@Override
@@ -155,160 +183,80 @@ public class RegionMigrateService implements IService {
}
}
- private static class RegionMigrateTask implements Runnable {
- private static final Logger taskLogger = LoggerFactory.getLogger(RegionMigrateTask.class);
+ private static void reportSucceed(TConsensusGroupId tRegionId) {
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ status.setMessage("Region: " + tRegionId + " migrated succeed");
+ TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+ try {
+ reportRegionMigrateResultToConfigNode(req);
+ } catch (Throwable e) {
+ LOGGER.error(
+ "Report region {} migrate successful result error, result:{}", tRegionId, req, e);
+ }
+ }
+
+ private static void reportFailed(
+ TConsensusGroupId tRegionId,
+ TDataNodeLocation failedNode,
+ TRegionMigrateFailedType failedType,
+ TSStatus status) {
+ TRegionMigrateResultReportReq req =
+ createFailedRequest(tRegionId, failedNode, failedType, status);
+ try {
+ reportRegionMigrateResultToConfigNode(req);
+ } catch (Throwable e) {
+ LOGGER.error("Report region {} migrate failed result error, result:{}", tRegionId, req, e);
+ }
+ }
+
+ private static TRegionMigrateResultReportReq createFailedRequest(
+ TConsensusGroupId tRegionId,
+ TDataNodeLocation failedNode,
+ TRegionMigrateFailedType failedType,
+ TSStatus status) {
+ Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
+ failedNodeAndReason.put(failedNode, failedType);
+ TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+ req.setFailedNodeAndReason(failedNodeAndReason);
+ return req;
+ }
+
+ private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
+ throws TException {
+ TSStatus status;
+ try (ConfigNodeClient client = new ConfigNodeClient()) {
+ status = client.reportRegionMigrateResult(req);
+ LOGGER.info(
+ "Report region {} migrate result {} to Config node succeed, result: {}",
+ req.getRegionId(),
+ req,
+ status);
+ }
+ }
+
+ private static class AddRegionPeerTask implements Runnable {
+ private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
// migrate which region
private final TConsensusGroupId tRegionId;
- // migrate from which node
- private final TDataNodeLocation fromNode;
-
// migrate to which node
private final TDataNodeLocation toNode;
- // transfer leader to which node
- private final TDataNodeLocation newLeaderNode;
-
- public RegionMigrateTask(
- TConsensusGroupId tRegionId,
- TDataNodeLocation fromNode,
- TDataNodeLocation toNode,
- TDataNodeLocation newLeaderNode) {
+ public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation toNode) {
this.tRegionId = tRegionId;
- this.fromNode = fromNode;
this.toNode = toNode;
- this.newLeaderNode = newLeaderNode;
}
@Override
public void run() {
TSStatus runResult = addPeer();
if (isFailed(runResult)) {
- reportFailed(toNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
+ reportFailed(tRegionId, toNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
return;
}
- changeLeader();
-
- runResult = removePeer();
- if (isFailed(runResult)) {
- reportFailed(fromNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
- }
- runResult = removeConsensusGroup();
- if (isFailed(runResult)) {
- reportFailed(fromNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult);
- }
-
- runResult = deleteRegion();
- if (isFailed(runResult)) {
- reportFailed(fromNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
- }
-
- reportSucceed();
- }
-
- private void changeLeader() {
- ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
- taskLogger.debug("start to transfer region {} leader", regionId);
- try {
- if (!isLeader(regionId)) {
- taskLogger.debug("region {} is not leader, no need to transfer", regionId);
- return;
- }
- transferLeader(regionId);
- } catch (Throwable e) {
- taskLogger.error(
- "transfer region {} leader to node {} error",
- regionId,
- newLeaderNode.getInternalEndPoint(),
- e);
- }
- taskLogger.debug("finished to change region {} leader", regionId);
- }
-
- private void transferLeader(ConsensusGroupId regionId) {
- taskLogger.debug("transfer region {} leader to {} ", regionId, newLeaderNode);
- if (regionId instanceof DataRegionId) {
- Peer newLeaderPeer = new Peer(regionId, newLeaderNode.getDataRegionConsensusEndPoint());
- DataRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
- } else {
- Peer newLeaderPeer = new Peer(regionId, newLeaderNode.getSchemaRegionConsensusEndPoint());
- SchemaRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
- }
- }
-
- private boolean isLeader(ConsensusGroupId regionId) {
- if (regionId instanceof DataRegionId) {
- return DataRegionConsensusImpl.getInstance().isLeader(regionId);
- }
- return SchemaRegionConsensusImpl.getInstance().isLeader(regionId);
- }
-
- private TSStatus deleteRegion() {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
- taskLogger.debug("start to delete region {}", regionId);
- try {
- if (regionId instanceof DataRegionId) {
- StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) regionId);
- } else {
- SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId);
- }
- } catch (Throwable e) {
- taskLogger.error("delete the region {} failed", regionId, e);
- status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
- status.setMessage("delete region " + regionId + "failed, " + e.getMessage());
- return status;
- }
- status.setMessage("delete region " + regionId + " succeed");
- taskLogger.debug("finished to delete region {}", regionId);
- return status;
- }
-
- private void reportSucceed() {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- status.setMessage("Region: " + tRegionId + " migrated succeed");
- TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
- try {
- reportRegionMigrateResultToConfigNode(req);
- } catch (Throwable e) {
- taskLogger.error(
- "report region {} migrate successful result error, result:{}", tRegionId, req, e);
- }
- }
-
- private void reportFailed(
- TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) {
- TRegionMigrateResultReportReq req = createFailedRequest(failedNode, failedType, status);
- try {
- reportRegionMigrateResultToConfigNode(req);
- } catch (Throwable e) {
- taskLogger.error(
- "report region {} migrate failed result error, result:{}", tRegionId, req, e);
- }
- }
-
- private TRegionMigrateResultReportReq createFailedRequest(
- TDataNodeLocation failedNode, TRegionMigrateFailedType failedType, TSStatus status) {
- Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
- failedNodeAndReason.put(failedNode, failedType);
- TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
- req.setFailedNodeAndReason(failedNodeAndReason);
- return req;
- }
-
- private void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
- throws TException {
- TSStatus status;
- try (ConfigNodeClient client = new ConfigNodeClient()) {
- status = client.reportRegionMigrateResult(req);
- taskLogger.info(
- "report region {} migrate result {} to Config node succeed, result: {}",
- tRegionId,
- req,
- status);
- }
+ reportSucceed(tRegionId);
}
private TSStatus addPeer() {
@@ -316,7 +264,7 @@ public class RegionMigrateService implements IService {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGenericResponse resp = null;
TEndPoint newPeerNode = getConsensusEndPoint(toNode, regionId);
- taskLogger.info("start to add peer {} for region {}", newPeerNode, tRegionId);
+ taskLogger.info("Start to add peer {} for region {}", newPeerNode, tRegionId);
boolean addPeerSucceed = true;
for (int i = 0; i < RETRY; i++) {
try {
@@ -350,7 +298,7 @@ public class RegionMigrateService implements IService {
return status;
}
- taskLogger.info("succeed to add peer {} for region {}", newPeerNode, regionId);
+ taskLogger.info("Succeed to add peer {} for region {}", newPeerNode, regionId);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("add peer " + newPeerNode + " for region " + regionId + " succeed");
return status;
@@ -366,6 +314,48 @@ public class RegionMigrateService implements IService {
return resp;
}
+ private TEndPoint getConsensusEndPoint(
+ TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+ if (regionId instanceof DataRegionId) {
+ return nodeLocation.getDataRegionConsensusEndPoint();
+ }
+ return nodeLocation.getSchemaRegionConsensusEndPoint();
+ }
+
+ private boolean isSucceed(TSStatus status) {
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ private boolean isFailed(TSStatus status) {
+ return !isSucceed(status);
+ }
+ }
+
+ private static class RemoveRegionPeerTask implements Runnable {
+ private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
+
+ // migrate which region
+ private final TConsensusGroupId tRegionId;
+
+ // migrate from which node
+ private final TDataNodeLocation fromNode;
+
+ public RemoveRegionPeerTask(
+ TConsensusGroupId tRegionId, TDataNodeLocation fromNode, TDataNodeLocation newLeaderNode) {
+ this.tRegionId = tRegionId;
+ this.fromNode = fromNode;
+ }
+
+ @Override
+ public void run() {
+ TSStatus runResult = removePeer();
+ if (isFailed(runResult)) {
+ reportFailed(tRegionId, fromNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
+ }
+
+ reportSucceed(tRegionId);
+ }
+
private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer) {
ConsensusGenericResponse resp;
if (regionId instanceof DataRegionId) {
@@ -410,15 +400,84 @@ public class RegionMigrateService implements IService {
if (!removePeerSucceed || resp == null || !resp.isSuccess()) {
taskLogger.error(
- "remove old peer {} for region {} failed, resp: {}", oldPeerNode, regionId, resp);
+ "Remove old peer {} for region {} failed, resp: {}", oldPeerNode, regionId, resp);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("remove old peer " + oldPeerNode + " for region " + regionId + " failed");
return status;
}
- taskLogger.info("succeed to remove peer {} for region {}", oldPeerNode, regionId);
+ taskLogger.info("Succeed to remove peer {} for region {}", oldPeerNode, regionId);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- status.setMessage("remove peer " + oldPeerNode + " for region " + regionId + " succeed");
+ status.setMessage("Remove peer " + oldPeerNode + " for region " + regionId + " succeed");
+ return status;
+ }
+
+ private TEndPoint getConsensusEndPoint(
+ TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+ if (regionId instanceof DataRegionId) {
+ return nodeLocation.getDataRegionConsensusEndPoint();
+ }
+ return nodeLocation.getSchemaRegionConsensusEndPoint();
+ }
+
+ private boolean isSucceed(TSStatus status) {
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ private boolean isFailed(TSStatus status) {
+ return !isSucceed(status);
+ }
+ }
+
+ private static class RemoveRegionConsensusGroupTask implements Runnable {
+ private static final Logger taskLogger =
+ LoggerFactory.getLogger(RemoveRegionConsensusGroupTask.class);
+
+ // migrate which region
+ private final TConsensusGroupId tRegionId;
+
+ // migrate from which node
+ private final TDataNodeLocation fromNode;
+
+ public RemoveRegionConsensusGroupTask(TConsensusGroupId tRegionId, TDataNodeLocation fromNode) {
+ this.tRegionId = tRegionId;
+ this.fromNode = fromNode;
+ }
+
+ @Override
+ public void run() {
+ TSStatus runResult = removeConsensusGroup();
+ if (isFailed(runResult)) {
+ reportFailed(
+ tRegionId, fromNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult);
+ }
+
+ runResult = deleteRegion();
+ if (isFailed(runResult)) {
+ reportFailed(tRegionId, fromNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
+ }
+
+ reportSucceed(tRegionId);
+ }
+
+ private TSStatus deleteRegion() {
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
+ taskLogger.debug("start to delete region {}", regionId);
+ try {
+ if (regionId instanceof DataRegionId) {
+ StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) regionId);
+ } else {
+ SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId);
+ }
+ } catch (Throwable e) {
+ taskLogger.error("delete the region {} failed", regionId, e);
+ status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
+ status.setMessage("delete region " + regionId + "failed, " + e.getMessage());
+ return status;
+ }
+ status.setMessage("delete region " + regionId + " succeed");
+ taskLogger.info("Finished to delete region {}", regionId);
return status;
}
@@ -458,14 +517,6 @@ public class RegionMigrateService implements IService {
return status;
}
- private TEndPoint getConsensusEndPoint(
- TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
- if (regionId instanceof DataRegionId) {
- return nodeLocation.getDataRegionConsensusEndPoint();
- }
- return nodeLocation.getSchemaRegionConsensusEndPoint();
- }
-
private boolean isSucceed(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index a7c9a06d23..074a505f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -576,10 +576,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
TEndPoint newNode = getConsensusEndPoint(req.getNewLeaderNode(), regionId);
Peer newLeaderPeer = new Peer(regionId, newNode);
if (!isLeader(regionId)) {
- LOGGER.debug("region {} is not leader, no need to change leader", regionId);
+ LOGGER.info("region {} is not leader, no need to change leader", regionId);
return status;
}
- LOGGER.debug("region {} is leader, will change leader", regionId);
+ LOGGER.info("region {} is leader, will change leader", regionId);
return transferLeader(regionId, newLeaderPeer);
}
@@ -632,6 +632,48 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
return addConsensusGroup(regionId, peers);
}
+ @Override
+ public TSStatus removeRegionPeer(TMigrateRegionReq req) throws TException {
+ TConsensusGroupId regionId = req.getRegionId();
+ String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
+ boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ if (submitSucceed) {
+ LOGGER.info(
+ "succeed to submit a remove region peer task. region: {}, from {}",
+ regionId,
+ req.getFromNode().getInternalEndPoint());
+ return status;
+ }
+ status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(
+ "submit region remove region peer task failed, region: "
+ + regionId
+ + ", from "
+ + req.getFromNode().getInternalEndPoint());
+ return status;
+ }
+
+ @Override
+ public TSStatus removeToRegionConsensusGroup(TMigrateRegionReq req) throws TException {
+ TConsensusGroupId regionId = req.getRegionId();
+ String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
+ boolean submitSucceed =
+ RegionMigrateService.getInstance().submitRemoveRegionConsensusGroupTask(req);
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ if (submitSucceed) {
+ LOGGER.info(
+ "succeed to submit a remove region consensus group task. region: {}, from {}",
+ regionId,
+ fromNodeIp);
+ return status;
+ }
+ status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(
+ "submit region remove region consensus group task failed, region: " + regionId);
+ return status;
+ }
+
private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup, long ttl) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
LOGGER.info("start to create new region {}", regionId);
@@ -681,32 +723,18 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
@Override
- public TSStatus migrateRegion(TMigrateRegionReq req) throws TException {
+ public TSStatus addRegionPeer(TMigrateRegionReq req) throws TException {
TConsensusGroupId regionId = req.getRegionId();
- String fromNodeIp = req.getFromNode().getInternalEndPoint().getIp();
String toNodeIp = req.getToNode().getInternalEndPoint().getIp();
- LOGGER.debug(
- "start to submit a region migrate task. region: {}, from {} to {}",
- regionId,
- fromNodeIp,
- toNodeIp);
- boolean submitSucceed = RegionMigrateService.getInstance().submitRegionMigrateTask(req);
+ boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
- LOGGER.debug(
- "succeed to submit a region migrate task. region: {}, from {} to {}",
- regionId,
- fromNodeIp,
- toNodeIp);
+ LOGGER.info(
+ "succeed to submit a add region peer task. region: {}, to {}", regionId, toNodeIp);
return status;
}
- LOGGER.error(
- "failed to submit a region migrate task. region: {}, from {} to {}",
- regionId,
- fromNodeIp,
- toNodeIp);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("submit region migrate task failed, region: " + regionId);
+ status.setMessage("submit add region peer task failed, region: " + regionId);
return status;
}
@@ -723,7 +751,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
private TSStatus addConsensusGroup(ConsensusGroupId regionId, List<Peer> peers) {
- LOGGER.info("start to add peers {} to region {} consensus group", peers, regionId);
+ LOGGER.info("Start to add consensus group {} to region {}", peers, regionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGenericResponse resp;
if (regionId instanceof DataRegionId) {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 5f436c12e3..a4544fd50a 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -279,10 +279,23 @@ service IDataNodeRPCService {
common.TSStatus addToRegionConsensusGroup(TAddConsensusGroup req);
/**
- * Config node will migrate a region from this node to newNode
- * @param migrate which region from one node to other node
+ * Config node will add a region peer to a region group
+ * @param add region req which region from one node to other node
*/
- common.TSStatus migrateRegion(TMigrateRegionReq req);
+ common.TSStatus addRegionPeer(TMigrateRegionReq req);
+
+ /**
+ * Config node will remove a region peer to a region group
+ * @param remove region peer region from one node to other node
+ */
+ common.TSStatus removeRegionPeer(TMigrateRegionReq req);
+
+ /**
+ * Config node will remove a region group from this node to newNode. Usually a region group has
+ * multiple replicas, thus relates to multiple nodes.
+ * @param remove consensus group req which region from one node to other node
+ */
+ common.TSStatus removeToRegionConsensusGroup(TMigrateRegionReq req);
/**
* Config node will disable the Data node, the Data node will not accept read/write request when disabled