You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/10/12 11:50:48 UTC
[iotdb] branch master updated: [IOTDB-4609] Add more progress tips for remove-datanode and make deleteOldPeer available in ratis 1 replica situation (#7569)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3b67808612 [IOTDB-4609] Add more progress tips for remove-datanode and make deleteOldPeer available in ratis 1 replica situation (#7569)
3b67808612 is described below
commit 3b67808612f03901ee991b2bf8dd4ad0967f3a44
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Wed Oct 12 19:50:42 2022 +0800
[IOTDB-4609] Add more progress tips for remove-datanode and make deleteOldPeer available in ratis 1 replica situation (#7569)
---
.../iotdb/confignode/conf/ConfigNodeConstant.java | 2 +
.../confignode/conf/ConfigNodeRemoveCheck.java | 1 +
.../iotdb/confignode/persistence/NodeInfo.java | 12 +-
.../procedure/env/DataNodeRemoveHandler.java | 199 ++++++++++++---------
.../impl/node/RemoveDataNodeProcedure.java | 7 +-
.../impl/statemachine/RegionMigrateProcedure.java | 40 +++--
.../apache/iotdb/commons/utils/NodeUrlUtils.java | 6 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 2 +-
.../db/service/DataNodeServerCommandLine.java | 2 +-
9 files changed, 162 insertions(+), 109 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
index 4f58c58331..70c2a1c620 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
@@ -45,6 +45,8 @@ public class ConfigNodeConstant {
public static final String METRIC_STATUS_ONLINE = "Online";
public static final String METRIC_STATUS_UNKNOWN = "Unknown";
+ public static final String REMOVE_DATANODE_PROCESS = "[REMOVE_DATANODE_PROCESS]";
+
private ConfigNodeConstant() {
// empty constructor
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
index 6b9031c30a..62be003b2c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
@@ -40,6 +40,7 @@ import java.util.Properties;
import static org.apache.commons.lang3.StringUtils.isNumeric;
public class ConfigNodeRemoveCheck {
+
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeStartupCheck.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 30c108a06f..7adcf7e389 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -62,6 +62,8 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+
/**
* The NodeInfo stores cluster node information. The cluster node information including: 1. DataNode
* information 2. ConfigNode information
@@ -166,7 +168,10 @@ public class NodeInfo implements SnapshotProcessor {
* @return TSStatus
*/
public TSStatus removeDataNode(RemoveDataNodePlan req) {
- LOGGER.info("there are {} data node in cluster before remove some", registeredDataNodes.size());
+ LOGGER.info(
+ "{}, There are {} data node in cluster before executed remove-datanode.sh",
+ REMOVE_DATANODE_PROCESS,
+ registeredDataNodes.size());
try {
dataNodeInfoReadWriteLock.writeLock().lock();
req.getDataNodeLocations()
@@ -178,7 +183,10 @@ public class NodeInfo implements SnapshotProcessor {
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
- LOGGER.info("there are {} data node in cluster after remove some", registeredDataNodes.size());
+ LOGGER.info(
+ "{}, There are {} data node in cluster after executed remove-datanode.sh",
+ REMOVE_DATANODE_PROCESS,
+ registeredDataNodes.size());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 73e2b65c48..d240ee7bf1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -53,6 +53,8 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+
public class DataNodeRemoveHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class);
@@ -67,6 +69,12 @@ public class DataNodeRemoveHandler {
this.configManager = configManager;
}
+ public static String getIdWithRpcEndpoint(TDataNodeLocation location) {
+ return String.format(
+ "[dataNodeId: %s, clientRpcEndPoint: %s]",
+ location.getDataNodeId(), location.getClientRpcEndPoint());
+ }
+
/**
* Get all consensus group id in this node
*
@@ -109,7 +117,8 @@ public class DataNodeRemoveHandler {
DataNodeRequestType.DISABLE_DATA_NODE);
if (!isSucceed(status)) {
LOGGER.error(
- "broadcastDisableDataNode meets error, disabledDataNode: {}, error: {}",
+ "{}, BroadcastDisableDataNode meets error, disabledDataNode: {}, error: {}",
+ REMOVE_DATANODE_PROCESS,
getIdWithRpcEndpoint(disabledDataNode),
status);
return;
@@ -117,7 +126,8 @@ public class DataNodeRemoveHandler {
}
LOGGER.info(
- "DataNodeRemoveService finished broadcastDisableDataNode to cluster, disabledDataNode: {}",
+ "{}, DataNodeRemoveService finished broadcastDisableDataNode to cluster, disabledDataNode: {}",
+ REMOVE_DATANODE_PROCESS,
getIdWithRpcEndpoint(disabledDataNode));
}
@@ -145,6 +155,60 @@ public class DataNodeRemoveHandler {
return newNode.get();
}
+ /**
+ * Create a new RegionReplica and build the ConsensusGroup on the destined DataNode
+ *
+ * <p>createNewRegionPeer should be invoked on a DataNode that doesn't contain any peer of the
+ * specific ConsensusGroup, in order to avoid there exists one DataNode who has more than one
+ * RegionReplica.
+ *
+ * @param regionId The given ConsensusGroup
+ * @param destDataNode The destined DataNode where the new peer will be created
+ * @return status
+ */
+ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
+ TSStatus status;
+ List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ if (regionReplicaNodes.isEmpty()) {
+ LOGGER.warn(
+ "{}, Not find region replica nodes in createPeer, regionId: {}",
+ REMOVE_DATANODE_PROCESS,
+ regionId);
+ status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage("Not find region replica nodes in createPeer, regionId: " + regionId);
+ return status;
+ }
+
+ List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionReplicaNodes);
+ currentPeerNodes.add(destDataNode);
+ String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
+ TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
+ // TODO replace with real ttl
+ req.setTtl(Long.MAX_VALUE);
+
+ status =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ destDataNode.getInternalEndPoint(),
+ req,
+ DataNodeRequestType.CREATE_NEW_REGION_PEER);
+
+ LOGGER.info(
+ "{}, Send action createNewRegionPeer finished, regionId: {}, newPeerDataNodeId: {}",
+ REMOVE_DATANODE_PROCESS,
+ regionId,
+ getIdWithRpcEndpoint(destDataNode));
+ if (isFailed(status)) {
+ LOGGER.error(
+ "{}, Send action createNewRegionPeer error, regionId: {}, newPeerDataNodeId: {}, result: {}",
+ REMOVE_DATANODE_PROCESS,
+ regionId,
+ getIdWithRpcEndpoint(destDataNode),
+ status);
+ }
+ return status;
+ }
+
/**
* Order the specific ConsensusGroup to add peer for the new RegionReplica.
*
@@ -165,13 +229,14 @@ public class DataNodeRemoveHandler {
filterDataNodeWithOtherRegionReplica(regionId, destDataNode);
if (!selectedDataNode.isPresent()) {
LOGGER.warn(
- "There are no other DataNodes could be selected to perform the add peer process, "
- + "please check RegionGroup: {} by SQL: show regions",
+ "{}, There are no other DataNodes could be selected to perform the add peer process, "
+ + "please check RegionGroup: {} by show regions sql command",
+ REMOVE_DATANODE_PROCESS,
regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(
"There are no other DataNodes could be selected to perform the add peer process, "
- + "please check by SQL: show regions");
+ + "please check by show regions sql command");
return status;
}
@@ -185,9 +250,11 @@ public class DataNodeRemoveHandler {
maintainPeerReq,
DataNodeRequestType.ADD_REGION_PEER);
LOGGER.info(
- "Send action addRegionPeer, wait it finished, regionId: {}, dataNode: {}",
+ "{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {}, destDataNode: {}",
+ REMOVE_DATANODE_PROCESS,
regionId,
- getIdWithRpcEndpoint(selectedDataNode.get()));
+ getIdWithRpcEndpoint(selectedDataNode.get()),
+ destDataNode);
return status;
}
@@ -226,7 +293,8 @@ public class DataNodeRemoveHandler {
maintainPeerReq,
DataNodeRequestType.REMOVE_REGION_PEER);
LOGGER.info(
- "Send action removeRegionPeer, wait it finished, regionId: {}, dataNode: {}",
+ "{}, Send action removeRegionPeer finished, regionId: {}, dataNode: {}",
+ REMOVE_DATANODE_PROCESS,
regionId,
rpcClientDataNode.getInternalEndPoint());
return status;
@@ -245,19 +313,6 @@ public class DataNodeRemoveHandler {
public TSStatus deleteOldRegionPeer(
TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
- // when SchemaReplicationFactor==1, execute deleteOldRegionPeer method will cause error
- // user must delete the related data manually
- if (CONF.getSchemaReplicationFactor() == 1
- && TConsensusGroupType.SchemaRegion.equals(regionId.getType())) {
- String errorMessage =
- "deleteOldRegionPeer is not supported for schemaRegion when SchemaReplicationFactor equals 1, "
- + "you are supposed to delete the region data of datanode manually";
- LOGGER.info(errorMessage);
- TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage(errorMessage);
- return status;
- }
-
// when DataReplicationFactor==1, execute deleteOldRegionPeer method will cause error
// user must delete the related data manually
// TODO if multi-leader supports deleteOldRegionPeer when DataReplicationFactor==1?
@@ -266,7 +321,7 @@ public class DataNodeRemoveHandler {
String errorMessage =
"deleteOldRegionPeer is not supported for dataRegion when DataReplicationFactor equals 1, "
+ "you are supposed to delete the region data of datanode manually";
- LOGGER.info(errorMessage);
+ LOGGER.info("{}, {}", REMOVE_DATANODE_PROCESS, errorMessage);
TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(errorMessage);
return status;
@@ -281,7 +336,8 @@ public class DataNodeRemoveHandler {
maintainPeerReq,
DataNodeRequestType.DELETE_OLD_REGION_PEER);
LOGGER.info(
- "Send action deleteOldRegionPeer to regionId {} on dataNodeId {}, wait it finished",
+ "{}, Send action deleteOldRegionPeer finished, regionId: {}, dataNodeId: {}",
+ REMOVE_DATANODE_PROCESS,
regionId,
originalDataNode.getInternalEndPoint());
return status;
@@ -299,7 +355,7 @@ public class DataNodeRemoveHandler {
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode) {
LOGGER.info(
- "start to update region {} location from {} to {} when it migrate succeed",
+ "Start to update region {} location from {} to {} when it migrate succeed",
regionId,
originalDataNode.getInternalEndPoint().getIp(),
destDataNode.getInternalEndPoint().getIp());
@@ -307,7 +363,7 @@ public class DataNodeRemoveHandler {
new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
LOGGER.info(
- "update region {} location finished, result:{}, old:{}, new:{}",
+ "Update region {} location finished, result:{}, old:{}, new:{}",
regionId,
status,
originalDataNode.getInternalEndPoint().getIp(),
@@ -343,53 +399,6 @@ public class DataNodeRemoveHandler {
.findAny();
}
- /**
- * Create a new RegionReplica and build the ConsensusGroup on the destined DataNode
- *
- * <p>createNewRegionPeer should be invoked on a DataNode that doesn't contain any peer of the
- * specific ConsensusGroup, in order to avoid there exists one DataNode who has more than one
- * RegionReplica.
- *
- * @param regionId The given ConsensusGroup
- * @param destDataNode The destined DataNode where the new peer will be created
- * @return status
- */
- public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
- TSStatus status;
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
- if (regionReplicaNodes.isEmpty()) {
- LOGGER.warn("Not find region replica nodes in createPeer, regionId: {}", regionId);
- status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("Not find region replica nodes in createPeer, regionId: " + regionId);
- return status;
- }
-
- List<TDataNodeLocation> currentPeerNodes = new ArrayList<>(regionReplicaNodes);
- currentPeerNodes.add(destDataNode);
- String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
- TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
- // TODO replace with real ttl
- req.setTtl(Long.MAX_VALUE);
-
- status =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- destDataNode.getInternalEndPoint(),
- req,
- DataNodeRequestType.CREATE_NEW_REGION_PEER);
-
- LOGGER.info(
- "Send action createNewRegionPeer, regionId: {}, dataNode: {}", regionId, destDataNode);
- if (isFailed(status)) {
- LOGGER.error(
- "Send action createNewRegionPeer, regionId: {}, dataNode: {}, result: {}",
- regionId,
- destDataNode,
- status);
- }
- return status;
- }
-
private boolean isSucceed(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
@@ -406,14 +415,14 @@ public class DataNodeRemoveHandler {
* @throws ProcedureException procedure exception
*/
public TSStatus stopDataNode(TDataNodeLocation dataNode) throws ProcedureException {
- LOGGER.info("Begin to stop Data Node {}", dataNode);
+ LOGGER.info("{}, Begin to stop Data Node {}", REMOVE_DATANODE_PROCESS, dataNode);
AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
TSStatus status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
- LOGGER.info("stop Data Node {} result: {}", dataNode, status);
+ LOGGER.info("{}, Stop Data Node {} result: {}", REMOVE_DATANODE_PROCESS, dataNode, status);
return status;
}
@@ -532,22 +541,28 @@ public class DataNodeRemoveHandler {
configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
}
- public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
+ public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation originalDataNode) {
Optional<TDataNodeLocation> newLeaderNode =
- filterDataNodeWithOtherRegionReplica(regionId, tDataNodeLocation);
+ filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
if (newLeaderNode.isPresent()) {
SyncDataNodeClientPool.getInstance()
.changeRegionLeader(
- regionId, tDataNodeLocation.getInternalEndPoint(), newLeaderNode.get());
+ regionId, originalDataNode.getInternalEndPoint(), newLeaderNode.get());
LOGGER.info(
- "Change region leader finished, region is {}, newLeaderNode is {}",
+ "{}, Change region leader finished, regionId: {}, newLeaderNode: {}",
+ REMOVE_DATANODE_PROCESS,
regionId,
newLeaderNode);
}
}
/**
- * Filter a DataNode who contains other RegionReplica excepts the given one
+ * Filter a DataNode who contains other RegionReplica excepts the given one.
+ *
+ * <p>Choose the RUNNING status datanode firstly, if no RUNNING status datanode match the
+ * condition, then we choose the REMOVING status datanode.
+ *
+ * <p>`addRegionPeer`, `removeRegionPeer` and `changeRegionLeader` invoke this method.
*
* @param regionId The specific RegionId
* @param filterLocation The DataNodeLocation that should be filtered
@@ -567,16 +582,24 @@ public class DataNodeRemoveHandler {
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
- // TODO replace findAny() by select the low load node.
- return regionReplicaNodes.stream()
- .filter(e -> aliveDataNodes.contains(e) && !e.equals(filterLocation))
- .findAny();
- }
+ // filter the RUNNING datanode firstly
+ // if all the datanodes are not in RUNNING status, choose the REMOVING datanode
+ // because REMOVING datanode is also alive, it can execute rpc request
+ if (aliveDataNodes.isEmpty()) {
+ aliveDataNodes =
+ configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toList());
+ }
- private String getIdWithRpcEndpoint(TDataNodeLocation location) {
- return String.format(
- "dataNodeId: %s, clientRpcEndPoint: %s",
- location.getDataNodeId(), location.getClientRpcEndPoint());
+ // TODO return the node which has lowest load.
+ for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) {
+ if (aliveDataNodes.contains(regionReplicaNode) && !regionReplicaNode.equals(filterLocation)) {
+ return Optional.of(regionReplicaNode);
+ }
+ }
+
+ return Optional.empty();
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index f46d6bf4f7..1e824a0b59 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -38,6 +38,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+
/** remove data node procedure */
public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNodeState> {
private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
@@ -68,7 +70,10 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
env.markDataNodeAsRemovingAndBroadCast(disableDataNodeLocation);
execDataNodeRegionIds =
env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation);
- LOG.info("DataNode region id is {}", execDataNodeRegionIds);
+ LOG.info(
+ "{}, DataNode regions to be removed is {}",
+ REMOVE_DATANODE_PROCESS,
+ execDataNodeRegionIds);
setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
break;
case BROADCAST_DISABLE_DATA_NODE:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 6d4343c084..e3909aa46b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -39,6 +39,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
/** region migrate procedure */
@@ -92,8 +93,7 @@ public class RegionMigrateProcedure
case ADD_REGION_PEER:
tsStatus = env.getDataNodeRemoveHandler().addRegionPeer(destDataNode, consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId);
- LOG.info("Wait for ADD_REGION_PEER finished, regionId: {}", consensusGroupId);
+ waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
throw new ProcedureException("Failed to add region peer");
}
@@ -108,8 +108,7 @@ public class RegionMigrateProcedure
env.getDataNodeRemoveHandler()
.removeRegionPeer(originalDataNode, destDataNode, consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId);
- LOG.info("Wait REMOVE_REGION_PEER finished, regionId: {}", consensusGroupId);
+ waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
throw new ProcedureException("Failed to remove region peer");
}
@@ -120,8 +119,7 @@ public class RegionMigrateProcedure
env.getDataNodeRemoveHandler()
.deleteOldRegionPeer(originalDataNode, consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId);
- LOG.info("Wait for DELETE_OLD_REGION_PEER finished, regionId: {}", consensusGroupId);
+ waitForOneMigrationStepFinished(consensusGroupId, state);
}
// remove consensus group after a node stop, which will be failed, but we will
// continuously execute.
@@ -134,15 +132,18 @@ public class RegionMigrateProcedure
}
} catch (Exception e) {
LOG.error(
- "Meets error in region migrate state, please do the rollback operation yourself manually according to the error message!!! "
+ "{}, Meets error in region migrate state, "
+ + "please do the rollback operation yourself manually according to the error message!!! "
+ "error state: {}, migrateResult: {}",
+ REMOVE_DATANODE_PROCESS,
state,
migrateResult);
if (isRollbackSupported(state)) {
setFailure(new ProcedureException("Region migrate failed at state: " + state));
} else {
LOG.error(
- "Failed state is not support rollback, filed state {}, originalDataNode: {}",
+ "{}, Failed state is not support rollback, failed state {}, originalDataNode: {}",
+ REMOVE_DATANODE_PROCESS,
state,
originalDataNode);
if (getCycles() > RETRY_THRESHOLD) {
@@ -249,8 +250,15 @@ public class RegionMigrateProcedure
return false;
}
- public TSStatus waitForOneMigrationStepFinished(TConsensusGroupId consensusGroupId)
- throws Exception {
+ public TSStatus waitForOneMigrationStepFinished(
+ TConsensusGroupId consensusGroupId, RegionTransitionState state) throws Exception {
+
+ LOG.info(
+ "{}, Wait for state {} finished, regionId: {}",
+ REMOVE_DATANODE_PROCESS,
+ state,
+ consensusGroupId);
+
TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode());
synchronized (regionMigrateLock) {
try {
@@ -262,7 +270,7 @@ public class RegionMigrateProcedure
String.format("Region migrate failed, regionId: %s", consensusGroupId));
}
} catch (InterruptedException e) {
- LOG.error("region migrate {} interrupt", consensusGroupId, e);
+ LOG.error("{}, region migrate {} interrupt", REMOVE_DATANODE_PROCESS, consensusGroupId, e);
Thread.currentThread().interrupt();
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("wait region migrate interrupt," + e.getMessage());
@@ -274,14 +282,20 @@ public class RegionMigrateProcedure
/** DataNode report region migrate result to ConfigNode, and continue */
public void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq req) {
- LOG.info("ConfigNode received DataNode reported region migrate result: {} ", req);
+ LOG.info(
+ "{}, ConfigNode received DataNode reported region migrate result: {}",
+ REMOVE_DATANODE_PROCESS,
+ req);
// TODO the req is used in roll back
synchronized (regionMigrateLock) {
TSStatus migrateStatus = req.getMigrateResult();
// migrate failed
if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
- LOG.info("Region migrate executed failed in DataNode, migrateStatus: {}", migrateStatus);
+ LOG.info(
+ "{}, Region migrate executed failed in DataNode, migrateStatus: {}",
+ REMOVE_DATANODE_PROCESS,
+ migrateStatus);
migrateSuccess = false;
migrateResult = migrateStatus.toString();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
index 8dbf8459e3..f54290d0d7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/NodeUrlUtils.java
@@ -72,8 +72,8 @@ public class NodeUrlUtils {
public static TEndPoint parseTEndPointUrl(String endPointUrl) throws BadNodeUrlException {
String[] split = endPointUrl.split(":");
if (split.length != 2) {
- logger.warn("Bad node url: {}, please check the IP:PORT inputs", endPointUrl);
- throw new BadNodeUrlException(String.format("Bad node url: %s", endPointUrl));
+ logger.warn("Illegal endpoint url format: {}", endPointUrl);
+ throw new BadNodeUrlException(String.format("Bad endpoint url: %s", endPointUrl));
}
String ip = split[0];
TEndPoint result;
@@ -81,7 +81,7 @@ public class NodeUrlUtils {
int port = Integer.parseInt(split[1]);
result = new TEndPoint(ip, port);
} catch (NumberFormatException e) {
- logger.warn("Bad node url: {}, please check the IP:PORT inputs", endPointUrl);
+ logger.warn("Illegal endpoint url format: {}", endPointUrl);
throw new BadNodeUrlException(String.format("Bad node url: %s", endPointUrl));
}
return result;
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 39fae8d887..2ebff31347 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -265,7 +265,7 @@ public class ConfigNodeClient
configLeader = null;
}
logger.warn(
- "Failed to connect to ConfigNode {} from DataNode {},because the current node is not leader,try next node",
+ "Failed to connect to ConfigNode {} from DataNode {}, because the current node is not leader, try next node",
configNode,
config.getAddressAndPort());
return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index fedc46a4bf..d3de543558 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -117,7 +117,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
if (dataNodeLocations.isEmpty()) {
throw new BadNodeUrlException("No DataNode to remove");
}
- logger.info("Start to remove datanode, detail:{}", dataNodeLocations);
+ logger.info("Start to remove datanode, removed datanode: {}", dataNodeLocations);
TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);