You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/24 11:35:57 UTC
[iotdb] 05/05: perfect the migrate process
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/fix_remove_node_problem_1123
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 822f4f1c2f9e3fd8b872dd92a77a56c76a7335f1
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Thu Nov 24 19:35:34 2022 +0800
perfect the migrate process
---
.../procedure/env/DataNodeRemoveHandler.java | 15 +-
.../iotdb/db/service/RegionMigrateService.java | 352 +++++++++------------
.../impl/DataNodeInternalRPCServiceImpl.java | 22 +-
3 files changed, 185 insertions(+), 204 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 6666d7e802..7abe82d46b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -178,8 +178,17 @@ public class DataNodeRemoveHandler {
return status;
}
- List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionReplicaNodes);
- currentPeerNodes.add(destDataNode);
+ List<TDataNodeLocation> currentPeerNodes;
+ if (TConsensusGroupType.DataRegion.equals(regionId.getType())
+ && MULTI_LEADER_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
+ // parameter of createPeer for MultiLeader should be all peers
+ currentPeerNodes = new ArrayList<>(regionReplicaNodes);
+ currentPeerNodes.add(destDataNode);
+ } else {
+ // parameter of createPeer for Ratis can be empty
+ currentPeerNodes = Collections.emptyList();
+ }
+
String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
// TODO replace with real ttl
@@ -278,7 +287,7 @@ public class DataNodeRemoveHandler {
// Here we pick the DataNode who contains one of the RegionReplica of the specified
// ConsensusGroup except the origin one
// in order to notify the new ConsensusGroup that the origin peer should secede now
- // if the selectedDataNode equals null, we choose the destDataNode to execute the method
+ // If the selectedDataNode equals null, we choose the destDataNode to execute the method
Optional<TDataNodeLocation> selectedDataNode =
filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
rpcClientDataNode = selectedDataNode.orElse(destDataNode);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index fc68469a3d..bc6c25f131 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -52,7 +52,7 @@ import java.util.Map;
public class RegionMigrateService implements IService {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
- public static final String REMOVE_DATANODE_PROCESS = "[REMOVE_DATANODE_PROCESS]";
+ public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]";
private static final int MAX_RETRY_NUM = 5;
@@ -67,7 +67,7 @@ public class RegionMigrateService implements IService {
}
/**
- * submit AddRegionPeerTask
+ * Submit AddRegionPeerTask
*
* @param req TMaintainPeerReq
* @return if the submit task succeed
@@ -79,9 +79,9 @@ public class RegionMigrateService implements IService {
regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
- "Submit addRegionPeer task error for Region: {} on DataNode: {}.",
+ "{}, Submit AddRegionPeerTask error for Region: {}",
+ REGION_MIGRATE_PROCESS,
req.getRegionId(),
- req.getDestNode().getInternalEndPoint().getIp(),
e);
submitSucceed = false;
}
@@ -89,7 +89,7 @@ public class RegionMigrateService implements IService {
}
/**
- * submit RemoveRegionPeerTask
+ * Submit RemoveRegionPeerTask
*
* @param req TMaintainPeerReq
* @return if the submit task succeed
@@ -101,9 +101,9 @@ public class RegionMigrateService implements IService {
regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(), req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
- "Submit removeRegionPeer task error for Region: {} on DataNode: {}.",
+ "{}, Submit RemoveRegionPeer task error for Region: {}",
+ REGION_MIGRATE_PROCESS,
req.getRegionId(),
- req.getDestNode().getInternalEndPoint().getIp(),
e);
submitSucceed = false;
}
@@ -111,10 +111,10 @@ public class RegionMigrateService implements IService {
}
/**
- * remove a region peer
+ * Submit DeleteOldRegionPeerTask
*
* @param req TMigrateRegionReq
- * @return submit task succeed?
+ * @return if the submit task succeed
*/
public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq req) {
@@ -123,9 +123,9 @@ public class RegionMigrateService implements IService {
regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(), req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
- "Submit deleteOldRegionPeerTask error for Region: {} on DataNode: {}.",
+ "{}, Submit DeleteOldRegionPeerTask error for Region: {}",
+ REGION_MIGRATE_PROCESS,
req.getRegionId(),
- req.getDestNode().getInternalEndPoint().getIp(),
e);
submitSucceed = false;
}
@@ -152,17 +152,10 @@ public class RegionMigrateService implements IService {
return ServiceType.DATA_NODE_REGION_MIGRATE_SERVICE;
}
- private static class Holder {
- private static final RegionMigrateService INSTANCE = new RegionMigrateService();
-
- private Holder() {}
- }
-
private static class RegionMigratePool extends AbstractPoolManager {
private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);
private RegionMigratePool() {
- // migrate region one by one
this.pool = IoTDBThreadPoolFactory.newSingleThreadExecutor("Region-Migrate-Pool");
}
@@ -184,78 +177,25 @@ public class RegionMigrateService implements IService {
}
}
- private static void reportSucceed(TConsensusGroupId tRegionId, String migrateState) {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- status.setMessage(
- String.format("Region:%s, state: %s, executed succeed", tRegionId, migrateState));
- TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
- try {
- reportRegionMigrateResultToConfigNode(req);
- } catch (Throwable e) {
- LOGGER.error(
- "Report region {} migrate successful result error, result: {}", tRegionId, req, e);
- }
- }
-
- private static void reportFailed(
- TConsensusGroupId tRegionId,
- TDataNodeLocation failedNode,
- TRegionMigrateFailedType failedType,
- TSStatus status) {
- TRegionMigrateResultReportReq req =
- createFailedRequest(tRegionId, failedNode, failedType, status);
- try {
- reportRegionMigrateResultToConfigNode(req);
- } catch (Throwable e) {
- LOGGER.error("Report region {} migrate failed result error, result:{}", tRegionId, req, e);
- }
- }
-
- private static TRegionMigrateResultReportReq createFailedRequest(
- TConsensusGroupId tRegionId,
- TDataNodeLocation failedNode,
- TRegionMigrateFailedType failedType,
- TSStatus status) {
- Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
- failedNodeAndReason.put(failedNode, failedType);
- TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
- req.setFailedNodeAndReason(failedNodeAndReason);
- return req;
- }
-
- private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
- throws TException {
- TSStatus status;
- try (ConfigNodeClient client = new ConfigNodeClient()) {
- status = client.reportRegionMigrateResult(req);
- LOGGER.info(
- "Report region {} migrate result {} to Config node succeed, result: {}",
- req.getRegionId(),
- req,
- status);
- }
- }
-
private static class AddRegionPeerTask implements Runnable {
private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
// The RegionGroup that shall perform the add peer process
private final TConsensusGroupId tRegionId;
- // The DataNode that selected to perform the add peer process
- private final TDataNodeLocation selectedDataNode;
+ // The new DataNode to be added in the RegionGroup
+ private final TDataNodeLocation destDataNode;
- public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation selectedDataNode) {
+ public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) {
this.tRegionId = tRegionId;
- this.selectedDataNode = selectedDataNode;
+ this.destDataNode = destDataNode;
}
@Override
public void run() {
TSStatus runResult = addPeer();
if (isFailed(runResult)) {
- reportFailed(
- tRegionId, selectedDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
+ reportFailed(tRegionId, destDataNode, TRegionMigrateFailedType.AddPeerFailed, runResult);
return;
}
@@ -263,11 +203,12 @@ public class RegionMigrateService implements IService {
}
private TSStatus addPeer() {
+ taskLogger.info(
+ "{}, Start to addPeer {} for region {}", REGION_MIGRATE_PROCESS, destDataNode, tRegionId);
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGenericResponse resp = null;
- TEndPoint newPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
- taskLogger.info("Start to addPeer {} for region {}", newPeerNode, tRegionId);
+ TEndPoint destEndpoint = getConsensusEndPoint(destDataNode, regionId);
boolean addPeerSucceed = true;
for (int i = 0; i < MAX_RETRY_NUM; i++) {
try {
@@ -276,49 +217,40 @@ public class RegionMigrateService implements IService {
}
resp =
addRegionPeer(
- regionId, new Peer(regionId, selectedDataNode.getDataNodeId(), newPeerNode));
- addPeerSucceed = true;
+ regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndpoint));
} catch (Throwable e) {
addPeerSucceed = false;
taskLogger.error(
"{}, executed addPeer {} for region {} error, retry times: {}",
- REMOVE_DATANODE_PROCESS,
- newPeerNode,
+ REGION_MIGRATE_PROCESS,
+ destEndpoint,
regionId,
i,
e);
- status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- String.format(
- "Add peer for region error, peerId: %s, regionId: %s, errorMessage: %s",
- newPeerNode, regionId, e.getMessage()));
}
if (addPeerSucceed && resp != null && resp.isSuccess()) {
break;
}
}
+
if (!addPeerSucceed || resp == null || !resp.isSuccess()) {
- taskLogger.error(
- "{}, Add new peer {} for region {} failed, resp: {}",
- REMOVE_DATANODE_PROCESS,
- newPeerNode,
- regionId,
- resp);
- status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
+ String errorMsg =
String.format(
- "Add peer for region error, peerId: %s, regionId: %s, resp: %s",
- newPeerNode, regionId, resp));
+ "%s, AddPeer for region error after max retry times, peerId: %s, regionId: %s, resp: %s",
+ REGION_MIGRATE_PROCESS, destEndpoint, regionId, resp);
+ taskLogger.error(errorMsg);
+ status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(errorMsg);
return status;
}
taskLogger.info(
"{}, Succeed to addPeer {} for region {}",
- REMOVE_DATANODE_PROCESS,
- newPeerNode,
+ REGION_MIGRATE_PROCESS,
+ destEndpoint,
regionId);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- status.setMessage("add peer " + newPeerNode + " for region " + regionId + " succeed");
+ status.setMessage("addPeer " + destEndpoint + " for region " + regionId + " succeed");
return status;
}
@@ -331,22 +263,6 @@ public class RegionMigrateService implements IService {
}
return resp;
}
-
- private TEndPoint getConsensusEndPoint(
- TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
- if (regionId instanceof DataRegionId) {
- return nodeLocation.getDataRegionConsensusEndPoint();
- }
- return nodeLocation.getSchemaRegionConsensusEndPoint();
- }
-
- private boolean isSucceed(TSStatus status) {
- return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
- }
-
- private boolean isFailed(TSStatus status) {
- return !isSucceed(status);
- }
}
private static class RemoveRegionPeerTask implements Runnable {
@@ -354,11 +270,11 @@ public class RegionMigrateService implements IService {
private final TConsensusGroupId tRegionId;
- private final TDataNodeLocation selectedDataNode;
+ private final TDataNodeLocation destDataNode;
- public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation selectedDataNode) {
+ public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation destDataNode) {
this.tRegionId = tRegionId;
- this.selectedDataNode = selectedDataNode;
+ this.destDataNode = destDataNode;
}
@Override
@@ -367,29 +283,18 @@ public class RegionMigrateService implements IService {
if (isSucceed(runResult)) {
reportSucceed(tRegionId, "RemovePeer");
} else {
- reportFailed(
- tRegionId, selectedDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
- }
- }
-
- private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer) {
- ConsensusGenericResponse resp;
- if (regionId instanceof DataRegionId) {
- resp = DataRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
- } else {
- resp = SchemaRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
+ reportFailed(tRegionId, destDataNode, TRegionMigrateFailedType.RemovePeerFailed, runResult);
}
- return resp;
}
private TSStatus removePeer() {
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- TEndPoint oldPeerNode = getConsensusEndPoint(selectedDataNode, regionId);
+ TEndPoint destEndPoint = getConsensusEndPoint(destDataNode, regionId);
taskLogger.info(
- "{}, Start to remove peer {} for region {}",
- REMOVE_DATANODE_PROCESS,
- oldPeerNode,
+ "{}, Start to removePeer {} for region {}",
+ REGION_MIGRATE_PROCESS,
+ destEndPoint,
regionId);
ConsensusGenericResponse resp = null;
boolean removePeerSucceed = true;
@@ -400,20 +305,16 @@ public class RegionMigrateService implements IService {
}
resp =
removeRegionPeer(
- regionId, new Peer(regionId, selectedDataNode.getDataNodeId(), oldPeerNode));
- removePeerSucceed = true;
+ regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndPoint));
} catch (Throwable e) {
removePeerSucceed = false;
taskLogger.error(
- "Remove peer {} for region {} error, retry times: {}", oldPeerNode, regionId, i, e);
- status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
- "remove peer: "
- + oldPeerNode
- + " for region: "
- + regionId
- + " error. exception: "
- + e.getMessage());
+ "{}, executed removePeer {} for region {} error, retry times: {}",
+ REGION_MIGRATE_PROCESS,
+ destEndPoint,
+ regionId,
+ i,
+ e);
}
if (removePeerSucceed && resp != null && resp.isSuccess()) {
break;
@@ -421,41 +322,34 @@ public class RegionMigrateService implements IService {
}
if (!removePeerSucceed || resp == null || !resp.isSuccess()) {
- taskLogger.error(
- "{}, Remove old peer {} for region {} failed, resp: {}",
- REMOVE_DATANODE_PROCESS,
- oldPeerNode,
- regionId,
- resp);
+ String errorMsg =
+ String.format(
+ "%s, RemovePeer for region error after max retry times, peerId: %s, regionId: %s, resp: %s",
+ REGION_MIGRATE_PROCESS, destEndPoint, regionId, resp);
+ taskLogger.error(errorMsg);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("remove old peer " + oldPeerNode + " for region " + regionId + " failed");
+ status.setMessage(errorMsg);
return status;
}
taskLogger.info(
- "{}, Succeed to remove peer {} for region {}",
- REMOVE_DATANODE_PROCESS,
- oldPeerNode,
+ "{}, Succeed to removePeer {} for region {}",
+ REGION_MIGRATE_PROCESS,
+ destEndPoint,
regionId);
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- status.setMessage("Remove peer " + oldPeerNode + " for region " + regionId + " succeed");
+ status.setMessage("removePeer " + destEndPoint + " for region " + regionId + " succeed");
return status;
}
- private TEndPoint getConsensusEndPoint(
- TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+ private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId regionId, Peer oldPeer) {
+ ConsensusGenericResponse resp;
if (regionId instanceof DataRegionId) {
- return nodeLocation.getDataRegionConsensusEndPoint();
+ resp = DataRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
+ } else {
+ resp = SchemaRegionConsensusImpl.getInstance().removePeer(regionId, oldPeer);
}
- return nodeLocation.getSchemaRegionConsensusEndPoint();
- }
-
- private boolean isSucceed(TSStatus status) {
- return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
- }
-
- private boolean isFailed(TSStatus status) {
- return !isSucceed(status);
+ return resp;
}
}
@@ -464,32 +358,43 @@ public class RegionMigrateService implements IService {
private final TConsensusGroupId tRegionId;
- private final TDataNodeLocation fromNode;
+ private final TDataNodeLocation originalDataNode;
- public DeleteOldRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation fromNode) {
+ public DeleteOldRegionPeerTask(
+ TConsensusGroupId tRegionId, TDataNodeLocation originalDataNode) {
this.tRegionId = tRegionId;
- this.fromNode = fromNode;
+ this.originalDataNode = originalDataNode;
}
@Override
public void run() {
- TSStatus runResult = deleteOldRegionPeer();
+ // deletePeer: remove the peer from the consensus group
+ TSStatus runResult = deletePeer();
if (isFailed(runResult)) {
reportFailed(
- tRegionId, fromNode, TRegionMigrateFailedType.RemoveConsensusGroupFailed, runResult);
+ tRegionId,
+ originalDataNode,
+ TRegionMigrateFailedType.RemoveConsensusGroupFailed,
+ runResult);
}
+ // deleteRegion: delete region data
runResult = deleteRegion();
if (isFailed(runResult)) {
- reportFailed(tRegionId, fromNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
+ reportFailed(
+ tRegionId, originalDataNode, TRegionMigrateFailedType.DeleteRegionFailed, runResult);
}
reportSucceed(tRegionId, "DeletePeer");
}
- private TSStatus deleteOldRegionPeer() {
+ private TSStatus deletePeer() {
+ taskLogger.info(
+ "{}, Start to deletePeer {} for region {}",
+ REGION_MIGRATE_PROCESS,
+ originalDataNode,
+ tRegionId);
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
- taskLogger.info("Start to deleteOldRegionPeer: {}", regionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGenericResponse resp;
try {
@@ -499,30 +404,31 @@ public class RegionMigrateService implements IService {
resp = SchemaRegionConsensusImpl.getInstance().deletePeer(regionId);
}
} catch (Throwable e) {
- taskLogger.error("DeleteOldRegionPeer error, regionId: {}", regionId, e);
+ taskLogger.error("{}, deletePeer error, regionId: {}", regionId, e);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
- "deleteOldRegionPeer for region: " + regionId + " error. exception: " + e.getMessage());
+ "deletePeer for region: " + regionId + " error. exception: " + e.getMessage());
return status;
}
if (!resp.isSuccess()) {
- taskLogger.error("DeleteOldRegionPeer error, regionId: {}", regionId, resp.getException());
- status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(
+ String errorMsg =
String.format(
- "deleteOldRegionPeer error, regionId: %s, errorMessage: %s",
- regionId, resp.getException().getMessage()));
+ "deletePeer error, regionId: %s, errorMessage: %s",
+ regionId, resp.getException().getMessage());
+ taskLogger.error(errorMsg);
+ status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage(errorMsg);
return status;
}
- taskLogger.info("Succeed to remove region {} consensus group", regionId);
- status.setMessage("remove region consensus group " + regionId + "succeed");
+ taskLogger.info("Succeed to deletePeer {} from consensus group", regionId);
+ status.setMessage("deletePeer from consensus group " + regionId + "succeed");
return status;
}
private TSStatus deleteRegion() {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
- taskLogger.info("Start to delete region {}", regionId);
+ taskLogger.info("{}, Start to deleteRegion {}", REGION_MIGRATE_PROCESS, regionId);
try {
if (regionId instanceof DataRegionId) {
StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) regionId);
@@ -530,22 +436,78 @@ public class RegionMigrateService implements IService {
SchemaEngine.getInstance().deleteSchemaRegion((SchemaRegionId) regionId);
}
} catch (Throwable e) {
- taskLogger.error("Delete the region {} failed", regionId, e);
+ taskLogger.error("deleteRegion {} failed", regionId, e);
status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
- status.setMessage("Delete region " + regionId + "failed, " + e.getMessage());
+ status.setMessage("deleteRegion " + regionId + "failed, " + e.getMessage());
return status;
}
- status.setMessage("delete region " + regionId + " succeed");
- taskLogger.info("Finished to delete region {}", regionId);
+ status.setMessage("deleteRegion " + regionId + " succeed");
+ taskLogger.info("Succeed to deleteRegion {}", regionId);
return status;
}
+ }
+
+ private static class Holder {
+ private static final RegionMigrateService INSTANCE = new RegionMigrateService();
+
+ private Holder() {}
+ }
+
+ private static void reportSucceed(TConsensusGroupId tRegionId, String migrateState) {
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ status.setMessage(
+ String.format("Region: %s, state: %s, executed succeed", tRegionId, migrateState));
+ TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+ try {
+ reportRegionMigrateResultToConfigNode(req);
+ } catch (Throwable e) {
+ LOGGER.error(
+ "Report region {} migrate result error in reportSucceed, result: {}", tRegionId, req, e);
+ }
+ }
+
+ private static void reportFailed(
+ TConsensusGroupId tRegionId,
+ TDataNodeLocation failedNode,
+ TRegionMigrateFailedType failedType,
+ TSStatus status) {
+ Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new HashMap<>();
+ failedNodeAndReason.put(failedNode, failedType);
+ TRegionMigrateResultReportReq req = new TRegionMigrateResultReportReq(tRegionId, status);
+ req.setFailedNodeAndReason(failedNodeAndReason);
+ try {
+ reportRegionMigrateResultToConfigNode(req);
+ } catch (Throwable e) {
+ LOGGER.error("Report region {} migrate error in reportFailed, result:{}", tRegionId, req, e);
+ }
+ }
- private boolean isSucceed(TSStatus status) {
- return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
+ throws TException {
+ TSStatus status;
+ try (ConfigNodeClient client = new ConfigNodeClient()) {
+ status = client.reportRegionMigrateResult(req);
+ LOGGER.info(
+ "Report region {} migrate result {} to Config node succeed, result: {}",
+ req.getRegionId(),
+ req,
+ status);
}
+ }
+
+ private static boolean isSucceed(TSStatus status) {
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ private static boolean isFailed(TSStatus status) {
+ return !isSucceed(status);
+ }
- private boolean isFailed(TSStatus status) {
- return !isSucceed(status);
+ private static TEndPoint getConsensusEndPoint(
+ TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
+ if (regionId instanceof DataRegionId) {
+ return nodeLocation.getDataRegionConsensusEndPoint();
}
+ return nodeLocation.getSchemaRegionConsensusEndPoint();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 559684bd34..8149b5534e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -201,6 +201,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.service.RegionMigrateService.REGION_MIGRATE_PROCESS;
import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -1261,7 +1262,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
- "Successfully submit addRegionPeer task for region: {} on DataNode: {}",
+ "Successfully submit addRegionPeer task for region: {}, target DataNode: {}",
regionId,
selectedDataNodeIP);
return status;
@@ -1279,7 +1280,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
- "Successfully submit removeRegionPeer task for region: {} on DataNode: {}",
+ "Successfully submit removeRegionPeer task for region: {}, DataNode to be removed: {}",
regionId,
selectedDataNodeIP);
return status;
@@ -1297,7 +1298,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
- "Successfully submit deleteOldRegionPeer task for region: {} on DataNode: {}",
+ "Successfully submit deleteOldRegionPeer task for region: {}, DataNode to be removed: {}",
regionId,
selectedDataNodeIP);
return status;
@@ -1455,7 +1456,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
private TSStatus createNewRegionPeer(ConsensusGroupId regionId, List<Peer> peers) {
- LOGGER.info("Start to createNewRegionPeer {} to region {}", peers, regionId);
+ LOGGER.info(
+ "{}, Start to createNewRegionPeer {} to region {}",
+ REGION_MIGRATE_PROCESS,
+ peers,
+ regionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGenericResponse resp;
if (regionId instanceof DataRegionId) {
@@ -1465,7 +1470,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
}
if (!resp.isSuccess()) {
LOGGER.error(
- "CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage",
+ "{}, CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage",
+ REGION_MIGRATE_PROCESS,
peers,
regionId,
resp.getException());
@@ -1473,7 +1479,11 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
status.setMessage(resp.getException().getMessage());
return status;
}
- LOGGER.info("Succeed to createNewRegionPeer {} for region {}", peers, regionId);
+ LOGGER.info(
+ "{}, Succeed to createNewRegionPeer {} for region {}",
+ REGION_MIGRATE_PROCESS,
+ peers,
+ regionId);
status.setMessage("createNewRegionPeer succeed, regionId: " + regionId);
return status;
}