You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/07/22 02:30:18 UTC
[iotdb] branch master updated: [IOTDB-3815] replace remove data node to procedure (#6736)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1ffe93cb27 [IOTDB-3815] replace remove data node to procedure (#6736)
1ffe93cb27 is described below
commit 1ffe93cb270ae9d8152a9507bb80acb95ceb6ee6
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Fri Jul 22 10:30:13 2022 +0800
[IOTDB-3815] replace remove data node to procedure (#6736)
---
.../request/write/RemoveDataNodePlan.java | 124 +---
.../statemachine/PartitionRegionStateMachine.java | 2 -
.../iotdb/confignode/manager/ConfigManager.java | 10 +-
.../confignode/manager/DataNodeRemoveManager.java | 806 ---------------------
.../apache/iotdb/confignode/manager/IManager.java | 6 +-
.../iotdb/confignode/manager/NodeManager.java | 28 +-
.../iotdb/confignode/manager/ProcedureManager.java | 34 +
.../iotdb/confignode/persistence/NodeInfo.java | 191 +----
.../procedure/env/ConfigNodeProcedureEnv.java | 26 +-
.../procedure/env/DataNodeRemoveHandler.java | 381 ++++++++++
.../procedure/impl/AbstractNodeProcedure.java | 64 ++
.../procedure/impl/AddConfigNodeProcedure.java | 33 +-
.../procedure/impl/RegionMigrateProcedure.java | 230 ++++++
.../procedure/impl/RemoveConfigNodeProcedure.java | 33 +-
.../procedure/impl/RemoveDataNodeProcedure.java | 184 +++++
.../confignode/procedure/scheduler/LockQueue.java | 63 ++
.../procedure/state/RegionTransitionState.java | 28 +
.../procedure/state/RemoveDataNodeState.java | 27 +
.../procedure/store/ProcedureFactory.java | 14 +
.../iotdb/confignode/service/ConfigNode.java | 27 -
.../confignode/service/ConfigNodeCommandLine.java | 31 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 2 +-
.../request/write/RemoveDataNodePlanTest.java | 1 -
.../java/org/apache/iotdb/db/service/DataNode.java | 105 ---
.../db/service/DataNodeServerCommandLine.java | 120 ++-
25 files changed, 1225 insertions(+), 1345 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlan.java
index e2178e6725..bf59f00ed1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlan.java
@@ -18,10 +18,7 @@
*/
package org.apache.iotdb.confignode.consensus.request.write;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.commons.enums.DataNodeRemoveState;
-import org.apache.iotdb.commons.enums.RegionMigrateState;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -32,29 +29,10 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
public class RemoveDataNodePlan extends ConfigPhysicalPlan {
private List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
- // if true, means update the request exec state.
- private boolean isUpdate;
-
- // [0, dataNodeLocations.size), -1 means No Data Node exec remove action
- private int execDataNodeIndex = -1;
-
- private DataNodeRemoveState execDataNodeState = DataNodeRemoveState.NORMAL;
-
- private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
-
- // [0, execDataNodeRegionIds), -1 means No Region exec migrate action
- private int execRegionIndex = -1;
-
- // if the request finished
- private boolean finished = false;
-
- private RegionMigrateState execRegionState = RegionMigrateState.ONLINE;
-
public RemoveDataNodePlan() {
super(ConfigPhysicalPlanType.RemoveDataNode);
}
@@ -62,7 +40,6 @@ public class RemoveDataNodePlan extends ConfigPhysicalPlan {
public RemoveDataNodePlan(List<TDataNodeLocation> dataNodeLocations) {
this();
this.dataNodeLocations = dataNodeLocations;
- isUpdate = false;
}
@Override
@@ -71,17 +48,6 @@ public class RemoveDataNodePlan extends ConfigPhysicalPlan {
stream.writeInt(dataNodeLocations.size());
dataNodeLocations.forEach(
location -> ThriftCommonsSerDeUtils.serializeTDataNodeLocation(location, stream));
- stream.writeInt(isUpdate ? 1 : 0);
- if (isUpdate) {
- stream.writeInt(execDataNodeIndex);
- stream.writeInt(execDataNodeState.getCode());
- stream.writeInt(execDataNodeRegionIds.size());
- execDataNodeRegionIds.forEach(
- tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
- stream.writeInt(execRegionIndex);
- stream.writeInt(execRegionState.getCode());
- stream.writeInt(finished ? 1 : 0);
- }
}
@Override
@@ -90,19 +56,6 @@ public class RemoveDataNodePlan extends ConfigPhysicalPlan {
for (int i = 0; i < dataNodeLocationSize; i++) {
dataNodeLocations.add(ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer));
}
- isUpdate = buffer.getInt() == 1;
- if (isUpdate) {
- execDataNodeIndex = buffer.getInt();
- execDataNodeState = DataNodeRemoveState.getStateByCode(buffer.getInt());
- int regionSize = buffer.getInt();
- execDataNodeRegionIds = new ArrayList<>(regionSize);
- for (int i = 0; i < regionSize; i++) {
- execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
- }
- execRegionIndex = buffer.getInt();
- execRegionState = RegionMigrateState.getStateByCode(buffer.getInt());
- finished = buffer.getInt() == 1;
- }
}
@Override
@@ -138,81 +91,6 @@ public class RemoveDataNodePlan extends ConfigPhysicalPlan {
@Override
public String toString() {
- return "{RemoveDataNodeReq: "
- + "TDataNodeLocations: "
- + this.getDataNodeLocations()
- + ", is update: "
- + this.isUpdate()
- + ", is finished: "
- + this.isFinished()
- + ", exec node index: "
- + this.getExecDataNodeIndex()
- + ", exec node state: "
- + this.getExecDataNodeState()
- + ", exec region index: "
- + this.getExecRegionIndex()
- + ", exec region state: "
- + this.getExecRegionState()
- + ", exec node region ids: "
- + this.getExecDataNodeRegionIds().stream()
- .map(TConsensusGroupId::getId)
- .collect(Collectors.toList())
- + "}";
- }
-
- public boolean isUpdate() {
- return isUpdate;
- }
-
- public void setUpdate(boolean update) {
- isUpdate = update;
- }
-
- public int getExecDataNodeIndex() {
- return execDataNodeIndex;
- }
-
- public void setExecDataNodeIndex(int execDataNodeIndex) {
- this.execDataNodeIndex = execDataNodeIndex;
- }
-
- public DataNodeRemoveState getExecDataNodeState() {
- return execDataNodeState;
- }
-
- public void setExecDataNodeState(DataNodeRemoveState execDataNodeState) {
- this.execDataNodeState = execDataNodeState;
- }
-
- public List<TConsensusGroupId> getExecDataNodeRegionIds() {
- return execDataNodeRegionIds;
- }
-
- public void setExecDataNodeRegionIds(List<TConsensusGroupId> execDataNodeRegionIds) {
- this.execDataNodeRegionIds = execDataNodeRegionIds;
- }
-
- public int getExecRegionIndex() {
- return execRegionIndex;
- }
-
- public void setExecRegionIndex(int execRegionIndex) {
- this.execRegionIndex = execRegionIndex;
- }
-
- public RegionMigrateState getExecRegionState() {
- return execRegionState;
- }
-
- public void setExecRegionState(RegionMigrateState execRegionState) {
- this.execRegionState = execRegionState;
- }
-
- public void setFinished(boolean finished) {
- this.finished = finished;
- }
-
- public boolean isFinished() {
- return this.finished;
+ return "{RemoveDataNodeReq: " + "TDataNodeLocations: " + this.getDataNodeLocations() + "}";
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 026135128c..1afcde2f4c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -142,11 +142,9 @@ public class PartitionRegionStateMachine implements IStateMachine, IStateMachine
LOGGER.info("Current node {} is Leader, start procedure manager.", newLeader);
configManager.getProcedureManager().shiftExecutor(true);
configManager.getLoadManager().start();
- configManager.getDataNodeRemoveManager().beLeader();
} else {
configManager.getProcedureManager().shiftExecutor(false);
configManager.getLoadManager().stop();
- configManager.getDataNodeRemoveManager().beFollower();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 24f34f5577..1f905c0258 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -143,8 +143,6 @@ public class ConfigManager implements IManager {
/** UDF */
private final UDFManager udfManager;
- private final DataNodeRemoveManager dataNodeRemoveManager;
-
public ConfigManager() throws IOException {
// Build the persistence module
NodeInfo nodeInfo = new NodeInfo();
@@ -168,7 +166,6 @@ public class ConfigManager implements IManager {
this.procedureManager = new ProcedureManager(this, procedureInfo);
this.udfManager = new UDFManager(this, udfInfo);
this.loadManager = new LoadManager(this);
- this.dataNodeRemoveManager = new DataNodeRemoveManager(this);
this.consensusManager = new ConsensusManager(this, stateMachine);
}
@@ -176,7 +173,6 @@ public class ConfigManager implements IManager {
consensusManager.close();
partitionManager.getRegionCleaner().shutdown();
procedureManager.shiftExecutor(false);
- dataNodeRemoveManager.stop();
}
@Override
@@ -862,11 +858,6 @@ public class ConfigManager implements IManager {
return udfManager;
}
- @Override
- public DataNodeRemoveManager getDataNodeRemoveManager() {
- return dataNodeRemoveManager;
- }
-
@Override
public DataSet showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
TSStatus status = confirmLeader();
@@ -936,6 +927,7 @@ public class ConfigManager implements IManager {
return dataNodeConfigurationResp;
}
+ @Override
public ProcedureManager getProcedureManager() {
return procedureManager;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
deleted file mode 100644
index 6d58fdd10e..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
+++ /dev/null
@@ -1,806 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.enums.DataNodeRemoveState;
-import org.apache.iotdb.commons.enums.RegionMigrateState;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
-import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
-import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
-import org.apache.iotdb.confignode.persistence.NodeInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-
-public class DataNodeRemoveManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveManager.class);
- private static final int QUEUE_SIZE_LIMIT = 200;
-
- private boolean hasSetUp = false;
-
- private ConfigManager configManager;
-
- private final LinkedBlockingQueue<RemoveDataNodePlan> removeQueue;
-
- // which request is running
- private RemoveDataNodePlan headRequest = null;
-
- // which data node is removing in `headRequest`, [0, req TDataNodeLocation list size)
- private int headNodeIndex = -1;
-
- private DataNodeRemoveState headNodeState = DataNodeRemoveState.NORMAL;
-
- // the region ids belongs to head node
- private List<TConsensusGroupId> headNodeRegionIds = new ArrayList<>();
-
- // which region is migrating on head node
- private int headRegionIndex = -1;
-
- private RegionMigrateState headRegionState = RegionMigrateState.ONLINE;
-
- private volatile boolean stopped = false;
-
- private volatile boolean isLeader = false;
- private final Object leaderLock = new Object();
- private Thread workThread;
- private Thread waitLeaderThread;
-
- private final Object regionMigrateLock = new Object();
- private TRegionMigrateResultReportReq lastRegionMigrateResult = null;
-
- public DataNodeRemoveManager(ConfigManager configManager) {
- this.configManager = configManager;
- removeQueue = new LinkedBlockingQueue<>(QUEUE_SIZE_LIMIT);
-
- createWaitLeaderThread();
- createWorkThread();
- }
-
- /** start the manager when Config node startup */
- public void start() {
- if (!hasSetUp) {
- // TODO 1. when restart,reload info from NoInfo and continue
- setUp();
- hasSetUp = true;
- }
- LOGGER.info("Data Node remove service start");
- // 2. if it is not leader, loop check
- // configManager.getConsensusManager().isLeader())
- waitUntilBeLeader();
-
- // 3. Take and exec request one by one
- // 4. When Data Node's state or Region's state change, then modify the request and write it to
- // Consensus.
- // 5. TODO if leader change?
- execRequestFromQueue();
- }
-
- private void createWaitLeaderThread() {
- waitLeaderThread =
- new Thread(
- () -> {
- while (!stopped && !configManager.getConsensusManager().isLeader()) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- LOGGER.info("the config node is leader now!");
- isLeader = true;
- synchronized (leaderLock) {
- leaderLock.notify();
- }
- });
- waitLeaderThread.setName("Wait-Leader-Thread");
- }
-
- private void createWorkThread() {
- workThread =
- new Thread(
- () -> {
- while (!stopped) {
- RemoveDataNodePlan req = null;
- try {
- while (!isLeader) {
- LOGGER.warn("the ConfigNode is not leader, waiting...");
- synchronized (leaderLock) {
- leaderLock.wait();
- }
- }
- LOGGER.info("the ConfigNode is leader now, will take request and run");
- req = removeQueue.take();
- LOGGER.info("exec the request : {}", req);
- prepareHeadRequestInfo(req);
- TSStatus status = execRemoveDataNodeRequest(headRequest);
- LOGGER.info("exec the request: {}, result: {}", req, status);
- // unRegisterRequest(req);
- } catch (InterruptedException e) {
- LOGGER.warn("work thread interrupted", e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOGGER.warn("the request run failed", e);
- } finally {
- if (req != null) {
- unRegisterRequest(req);
- }
- }
- }
- });
- workThread.setName("Exec-RemoveDataNode-Thread");
- }
-
- private void waitUntilBeLeader() {
- if (waitLeaderThread != null) {
- waitLeaderThread.start();
- }
- }
-
- private void execRequestFromQueue() {
- if (workThread != null) {
- workThread.start();
- }
- }
-
- public void beLeader() {
- // TODO
- /** this.isLeader = true; synchronized (leaderLock) { leaderLock.notify(); } */
- }
-
- public void beFollower() {
- // TODO
- /** this.isLeader = false; */
- }
-
- /**
- * prepare for the request. from first node, first region to exec
- *
- * @param req RemoveDataNodeReq
- */
- private void prepareHeadRequestInfo(RemoveDataNodePlan req) {
- LOGGER.info("start to prepare for request: {}", req);
- this.headRequest = req;
- // to exec the request, will change it's state at different stage.
- this.headNodeIndex = 0;
- this.headNodeState = DataNodeRemoveState.NORMAL;
-
- TDataNodeLocation headNode = req.getDataNodeLocations().get(headNodeIndex);
- this.headNodeRegionIds =
- configManager.getPartitionManager().getAllReplicaSets().stream()
- .filter(rg -> rg.getDataNodeLocations().contains(headNode))
- .filter(rg -> rg.regionId.getType() != TConsensusGroupType.PartitionRegion)
- .map(TRegionReplicaSet::getRegionId)
- .collect(Collectors.toList());
- this.headRegionIndex = 0;
- this.headRegionState = RegionMigrateState.ONLINE;
-
- // modify head quest
- this.headRequest.setUpdate(true);
- this.headRequest.setExecDataNodeRegionIds(headNodeRegionIds);
- this.headRequest.setExecDataNodeIndex(headNodeIndex);
- this.headRequest.setExecRegionIndex(headRegionIndex);
- this.headRequest.setExecDataNodeState(headNodeState);
- this.headRequest.setExecRegionState(headRegionState);
- configManager.getConsensusManager().write(headRequest);
- LOGGER.info("finished to prepare for request: {}", req);
- }
-
- private void setUp() {
- removeQueue.addAll(configManager.getNodeManager().getDataNodeRemoveRequestQueue());
- headRequest = configManager.getNodeManager().getHeadRequestForDataNodeRemove();
- if (headRequest == null) {
- return;
- }
-
- // avoid duplication
- if (removeQueue.contains(headRequest)) {
- removeQueue.remove(headRequest);
- }
-
- headNodeIndex = headRequest.getExecDataNodeIndex();
- headNodeState = headRequest.getExecDataNodeState();
- headNodeRegionIds = headRequest.getExecDataNodeRegionIds();
- headRegionIndex = headRequest.getExecRegionIndex();
- headRegionState = headRequest.getExecRegionState();
- }
-
- /**
- * exec the request loop all removed node 1: brocast it to cluster 2: loop region on it and
- * migrate region 3 stop the node or roll back
- *
- * @param req RemoveDataNodeReq
- */
- private TSStatus execRemoveDataNodeRequest(RemoveDataNodePlan req) {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-
- for (TDataNodeLocation dataNodeLocation : req.getDataNodeLocations()) {
- headNodeIndex = req.getDataNodeLocations().indexOf(dataNodeLocation);
- status = broadcastDisableDataNode(req, dataNodeLocation);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error("Disable Data Node Error {}", status);
- return status;
- }
-
- // fetch data/schema region from one datanode
- status = migrateSingleDataNodeRegions(req, dataNodeLocation);
- if (isSucceed(status)) {
- status = stopDataNode(req, dataNodeLocation);
- if (isFailed(status)) {
- LOGGER.error(
- "send rpc to stop the Data Node {} error, please stop it use command",
- dataNodeLocation);
- }
- } else {
- LOGGER.error("the request run failed {}, result {}. will roll back", req, status);
- rollBackSingleNode(req, dataNodeLocation);
- }
- }
- return status;
- }
-
- private void updateRegionLocationCache(
- TConsensusGroupId regionId, TDataNodeLocation oldNode, TDataNodeLocation newNode) {
- LOGGER.debug(
- "start to update region {} location from {} to {} when it migrate succeed",
- regionId,
- oldNode.getInternalEndPoint().getIp(),
- newNode.getInternalEndPoint().getIp());
- UpdateRegionLocationPlan req = new UpdateRegionLocationPlan(regionId, oldNode, newNode);
- TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
- LOGGER.debug(
- "update region {} location finished, result:{}, old:{}, new:{}",
- regionId,
- status,
- oldNode.getInternalEndPoint().getIp(),
- newNode.getInternalEndPoint().getIp());
- }
-
- private void rollBackSingleNode(RemoveDataNodePlan req, TDataNodeLocation node) {
- LOGGER.warn("roll back remove data node {} in the request {}", node, req);
- if (headRegionState == RegionMigrateState.DATA_COPY_FAILED) {
- // TODO delete target node the head region data
- TConsensusGroupId tRegionId = lastRegionMigrateResult.getRegionId();
- for (Map.Entry<TDataNodeLocation, TRegionMigrateFailedType> entry :
- lastRegionMigrateResult.getFailedNodeAndReason().entrySet()) {
- TDataNodeLocation failedNode = entry.getKey();
- TRegionMigrateFailedType failedReason = entry.getValue();
- switch (failedReason) {
- // TODO how to impl roll back
- case AddPeerFailed:
- LOGGER.warn(
- "add new peer node {} for region {} failed, will roll back",
- failedNode.getInternalEndPoint().getIp(),
- tRegionId);
- break;
- case RemovePeerFailed:
- LOGGER.warn(
- "remove old peer node {} for region {} failed, will roll back",
- failedNode.getInternalEndPoint().getIp(),
- tRegionId);
- break;
- case RemoveConsensusGroupFailed:
- LOGGER.warn(
- "remove consensus group on node {} for region {} failed, will roll back",
- failedNode.getInternalEndPoint().getIp(),
- tRegionId);
- break;
- case DeleteRegionFailed:
- LOGGER.warn(
- "create region {} instance on {} failed, will roll back",
- failedNode.getInternalEndPoint().getIp(),
- tRegionId);
- break;
- default:
- LOGGER.warn(
- "UnSupport reason {} for region {} migrate failed", failedReason, tRegionId);
- }
- }
- }
- // TODO if roll back failed, FAILED
- storeDataNodeState(req, DataNodeRemoveState.REMOVE_FAILED);
- }
-
- /**
- * broadcast these datanode in RemoveDataNodeReq are disabled, so they will not accept read/write
- * request
- *
- * @param req RemoveDataNodeReq
- * @param disabledDataNode TDataNodeLocation
- */
- private TSStatus broadcastDisableDataNode(
- RemoveDataNodePlan req, TDataNodeLocation disabledDataNode) {
- LOGGER.info(
- "DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
- storeDataNodeState(req, DataNodeRemoveState.REMOVE_START);
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- List<TEndPoint> otherOnlineDataNodes =
- configManager.getLoadManager().getOnlineDataNodes(-1).stream()
- .map(TDataNodeConfiguration::getLocation)
- .filter(loc -> !loc.equals(disabledDataNode))
- .map(TDataNodeLocation::getInternalEndPoint)
- .collect(Collectors.toList());
-
- for (TEndPoint server : otherOnlineDataNodes) {
- TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
- status =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE);
- if (!isSucceed(status)) {
- return status;
- }
- }
- LOGGER.info(
- "DataNodeRemoveService finished send disable the Data Node to cluster, {}",
- disabledDataNode);
- status.setMessage("Succeed disable the Data Node from cluster");
- return status;
- }
-
- private void storeRegionState(RemoveDataNodePlan req, RegionMigrateState state) {
- req.setExecRegionIndex(headRegionIndex);
- headRegionState = state;
- req.setExecRegionState(headRegionState);
- configManager.getConsensusManager().write(req);
- }
-
- private void storeDataNodeState(RemoveDataNodePlan req, DataNodeRemoveState state) {
- req.setExecDataNodeIndex(headNodeIndex);
- headNodeState = state;
- req.setExecDataNodeState(headNodeState);
- configManager.getConsensusManager().write(req);
- }
-
- private TSStatus migrateSingleDataNodeRegions(
- RemoveDataNodePlan req, TDataNodeLocation dataNodeLocation) {
- LOGGER.info("start to migrate regions on the Data Node: {}", dataNodeLocation);
- TSStatus status;
- storeDataNodeState(req, DataNodeRemoveState.REGION_MIGRATING);
- for (TConsensusGroupId regionId : headNodeRegionIds) {
- headRegionIndex = headNodeRegionIds.indexOf(regionId);
- // impl migrate region with twice rpc: CN-->DN(send), DN-->CN(report)
- status = migrateSingleRegion(req, dataNodeLocation, regionId);
- // if has one region migrate failed, the node remove failed
- if (isFailed(status)) {
- storeDataNodeState(req, DataNodeRemoveState.REGION_MIGRATE_FAILED);
- return status;
- }
- }
-
- // all regions on the node migrate succeed
- storeDataNodeState(req, DataNodeRemoveState.REGION_MIGRATE_SUCCEED);
- status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- status.setMessage("The Data Node migrate regions succeed");
- LOGGER.info("finished to migrate regions on the Data Node: {}", dataNodeLocation);
- return status;
- }
-
- private TSStatus migrateSingleRegion(
- RemoveDataNodePlan req, TDataNodeLocation node, TConsensusGroupId regionId) {
- // change region leader: change leader and then add peer, will cause ratis server exit
- // so modify policy: add peer firs, then change region leader, see:
- // RegionMigrateService.changeLeader()
-
- // do migrate region
- TSStatus status = doMigrateSingleRegion(req, node, regionId);
- if (isFailed(status)) {
- storeRegionState(req, RegionMigrateState.DATA_COPY_FAILED);
- return status;
- }
- storeRegionState(req, RegionMigrateState.DATA_COPY_SUCCEED);
- return status;
- }
-
- private TSStatus changeSingleRegionLeader(
- RemoveDataNodePlan req, TDataNodeLocation node, TConsensusGroupId regionId) {
- storeRegionState(req, RegionMigrateState.LEADER_CHANGING);
- LOGGER.debug("start to send region leader change. {}", regionId);
- TSStatus status;
- // pick a node in same raft group to be new region leader
- List<TRegionReplicaSet> regionReplicaSets =
- configManager.getPartitionManager().getAllReplicaSets().stream()
- .filter(rg -> rg.getRegionId().equals(regionId))
- .collect(Collectors.toList());
- if (regionReplicaSets.isEmpty()) {
- LOGGER.warn("not find TRegionReplica for region: {}, ignore it", regionId);
- status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("not find TRegionReplica for region, ignore");
- return status;
- }
- Optional<TDataNodeLocation> newLeaderNode =
- regionReplicaSets.get(0).getDataNodeLocations().stream()
- .filter(e -> !e.equals(node))
- .findAny();
- if (!newLeaderNode.isPresent()) {
- LOGGER.warn("No enough Data node to change leader for region: {}", regionId);
- status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("No enough Data node to change leader for region " + regionId);
- return status;
- }
- status =
- SyncDataNodeClientPool.getInstance()
- .changeRegionLeader(regionId, node.getInternalEndPoint(), newLeaderNode.get());
- LOGGER.debug("finished to send region leader change. {}", regionId);
- return status;
- }
-
- private TSStatus doMigrateSingleRegion(
- RemoveDataNodePlan req, TDataNodeLocation node, TConsensusGroupId regionId) {
- storeRegionState(req, RegionMigrateState.DATA_COPYING);
- LOGGER.debug("start to migrate region {}", regionId);
- TSStatus status;
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
- if (regionReplicaNodes.isEmpty()) {
- LOGGER.warn("Not find region replica nodes, region: {}", regionId);
- status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("not find region replica nodes, region: " + regionId);
- return status;
- }
-
- // will migrate the region to the new node, which should not be same raft
- Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
- if (!newNode.isPresent()) {
- LOGGER.warn("No enough Data node to migrate region: {}", regionId);
- status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("No enough Data node to migrate region, region: " + regionId);
- return status;
- }
-
- status = addNewNodeToRegionConsensusGroup(regionId, regionReplicaNodes, newNode.get());
- if (isFailed(status)) {
- return status;
- }
-
- // TODO if region replica is 1, the new leader is null, it also need to migrate
- Optional<TDataNodeLocation> newLeaderNode =
- regionReplicaNodes.stream().filter(e -> !e.equals(node)).findAny();
- if (!newLeaderNode.isPresent()) {
- LOGGER.warn(
- "No other Node to change region leader, check by show regions, region: {}", regionId);
- status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("No other Node to change region leader, check by show regions");
- return status;
- }
-
- TMigrateRegionReq migrateRegionReq = new TMigrateRegionReq(regionId, node, newNode.get());
- migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
- status =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- node.getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.MIGRATE_REGION);
- // maybe send rpc failed
- if (isFailed(status)) {
- return status;
- }
- LOGGER.debug("send region {} migrate action to {}, wait it finished", regionId, node);
- // wait DN report the region migrate result, when DN reported, then will notify and continue
- status = waitForTheRegionMigrateFinished();
- // interrupt wait
- if (isFailed(status)) {
- return status;
- }
- LOGGER.debug(
- "wait region {} migrate finished. migrate result: {}", regionId, lastRegionMigrateResult);
- status = lastRegionMigrateResult.migrateResult;
- if (isSucceed(status)) {
- updateRegionLocationCache(regionId, node, newNode.get());
- }
- return status;
- }
-
- private List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
- List<TRegionReplicaSet> regionReplicaSets =
- configManager.getPartitionManager().getAllReplicaSets().stream()
- .filter(rg -> rg.regionId.equals(regionId))
- .collect(Collectors.toList());
- if (regionReplicaSets.isEmpty()) {
- LOGGER.warn("not find TRegionReplica for region: {}", regionId);
- return Collections.emptyList();
- }
-
- return regionReplicaSets.get(0).getDataNodeLocations();
- }
-
- private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
- List<TDataNodeLocation> regionReplicaNodes) {
- return configManager.getLoadManager().getOnlineDataNodes(-1).stream()
- .map(TDataNodeConfiguration::getLocation)
- .filter(e -> !regionReplicaNodes.contains(e))
- .findAny();
- }
-
- private TSStatus addNewNodeToRegionConsensusGroup(
- TConsensusGroupId regionId,
- List<TDataNodeLocation> regionReplicaNodes,
- TDataNodeLocation newNode) {
- String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
- TSStatus status =
- SyncDataNodeClientPool.getInstance()
- .addToRegionConsensusGroup(
- // TODO replace with real ttl
- regionReplicaNodes, regionId, newNode, storageGroup, Long.MAX_VALUE);
- LOGGER.debug("send add region {} consensus group to {}", regionId, newNode);
- if (isFailed(status)) {
- LOGGER.error(
- "add new node {} to region {} consensus group failed, result: {}",
- newNode,
- regionId,
- status);
- }
- return status;
- }
-
- private boolean isSucceed(TSStatus status) {
- return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
- }
-
- private boolean isFailed(TSStatus status) {
- return !isSucceed(status);
- }
-
- /**
- * register a RemoveDataNodeReq
- *
- * @param req RemoveDataNodeReq
- * @return true if register succeed.
- */
- public synchronized boolean registerRequest(RemoveDataNodePlan req) {
- if (!removeQueue.add(req)) {
- LOGGER.error("register request failed");
- return false;
- }
- ConsensusWriteResponse resp = configManager.getConsensusManager().write(req);
- LOGGER.info("write register request to Consensus result : {} for req {}", resp, req);
- return resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
- }
-
- /**
- * remove a RemoveDataNodeReq
- *
- * @param req RemoveDataNodeReq
- */
- public void unRegisterRequest(RemoveDataNodePlan req) {
- req.setFinished(true);
- configManager.getConsensusManager().write(req);
- reset();
- LOGGER.info("unregister request succeed, remain {} request", removeQueue.size());
- }
-
- public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
- LOGGER.debug("accept region {} migrate result, result: {}", req.getRegionId(), req);
- notifyTheRegionMigrateFinished(req);
- }
-
- private TSStatus waitForTheRegionMigrateFinished() {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- synchronized (regionMigrateLock) {
- try {
- // TODO set timeOut?
- regionMigrateLock.wait();
- } catch (InterruptedException e) {
- LOGGER.error("region migrate {} interrupt", headNodeRegionIds.get(headRegionIndex), e);
- Thread.currentThread().interrupt();
- status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("wait region migrate interrupt," + e.getMessage());
- }
- }
- return status;
- }
-
- private void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq req) {
- lastRegionMigrateResult = req;
- synchronized (regionMigrateLock) {
- regionMigrateLock.notify();
- }
- }
-
- private void reset() {
- this.headRequest = null;
- this.headNodeIndex = -1;
- this.headNodeState = DataNodeRemoveState.NORMAL;
- this.headNodeRegionIds.clear();
- this.headRegionIndex = -1;
- this.headRegionState = RegionMigrateState.ONLINE;
- }
-
- private TSStatus stopDataNode(RemoveDataNodePlan req, TDataNodeLocation dataNode) {
- LOGGER.info("begin to stop Data Node {} in request {}", dataNode, req);
- storeDataNodeState(req, DataNodeRemoveState.STOP);
- AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
- TSStatus status =
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
- LOGGER.info("stop Data Node {} result: {}", dataNode, status);
- return status;
- }
-
- /** stop the manager */
- public void stop() {
- stopped = true;
- if (waitLeaderThread != null) {
- waitLeaderThread.interrupt();
- }
- if (workThread != null) {
- workThread.interrupt();
- }
- LOGGER.info("Data Node remove service is stopped");
- }
-
- public void setConfigManager(ConfigManager configManager) {
- this.configManager = configManager;
- }
-
- /**
- * check if the remove datanode request illegal
- *
- * @param removeDataNodePlan RemoveDataNodeReq
- * @return SUCCEED_STATUS when request is legal.
- */
- public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) {
- DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
- dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
- TSStatus status = checkRegionReplication(removeDataNodePlan);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataSet.setStatus(status);
- return dataSet;
- }
-
- status = checkRequestLimit();
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataSet.setStatus(status);
- return dataSet;
- }
-
- status = checkDataNodeExist(removeDataNodePlan);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataSet.setStatus(status);
- return dataSet;
- }
-
- status = checkDuplicateRequest(removeDataNodePlan);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataSet.setStatus(status);
- return dataSet;
- }
-
- status = checkDuplicateDataNodeAcrossRequests(removeDataNodePlan);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- dataSet.setStatus(status);
- return dataSet;
- }
- return dataSet;
- }
-
- /**
- * check if request exceed threshold
- *
- * @return SUCCEED_STATUS if not exceed threshold
- */
- private TSStatus checkRequestLimit() {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- if (removeQueue.size() >= QUEUE_SIZE_LIMIT) {
- status.setCode(TSStatusCode.REQUEST_SIZE_EXCEED.getStatusCode());
- status.setMessage("remove Data Node request exceed threshold, reject this request");
- }
- return status;
- }
-
- /**
- * check if the request repeat
- *
- * @param removeDataNodePlan RemoveDataNodeReq
- * @return SUCCEED_STATUS if not repeat
- */
- private TSStatus checkDuplicateRequest(RemoveDataNodePlan removeDataNodePlan) {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- if (removeQueue.contains(removeDataNodePlan)) {
- status.setCode(TSStatusCode.DUPLICATE_REMOVE.getStatusCode());
- status.setMessage(
- "the remove datanode request is duplicate, wait the last same request finished");
- }
- return status;
- }
-
- /**
- * check if has same Data Node amount different request
- *
- * @param removeDataNodePlan RemoveDataNodeReq
- * @return SUCCEED_STATUS if not has
- */
- private TSStatus checkDuplicateDataNodeAcrossRequests(RemoveDataNodePlan removeDataNodePlan) {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- boolean hasDuplicateDataNodeAcrossRequests =
- removeQueue.stream()
- .map(RemoveDataNodePlan::getDataNodeLocations)
- .anyMatch(loc -> removeDataNodePlan.getDataNodeLocations().contains(loc));
- if (hasDuplicateDataNodeAcrossRequests) {
- TSStatus dataNodeDuplicate = new TSStatus(TSStatusCode.DUPLICATE_REMOVE.getStatusCode());
- dataNodeDuplicate.setMessage(
- "there exist duplicate Data Node between this request and other requests, can't run");
- }
- return status;
- }
-
- /**
- * check if has removed Data Node but not exist in cluster
- *
- * @param removeDataNodePlan RemoveDataNodeReq
- * @return SUCCEED_STATUS if not has
- */
- private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-
- List<TDataNodeLocation> allDataNodes =
- configManager.getNodeManager().getRegisteredDataNodes(-1).stream()
- .map(TDataNodeConfiguration::getLocation)
- .collect(Collectors.toList());
- boolean hasNotExistNode =
- removeDataNodePlan.getDataNodeLocations().stream()
- .anyMatch(loc -> !allDataNodes.contains(loc));
- if (hasNotExistNode) {
- status.setCode(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode());
- status.setMessage("there exist Data Node in request but not in cluster");
- }
- return status;
- }
-
- /**
- * check if has enought replication in cluster
- *
- * @param removeDataNodePlan RemoveDataNodeReq
- * @return SUCCEED_STATUS if not has
- */
- private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) {
- TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- int removedDataNodeSize = removeDataNodePlan.getDataNodeLocations().size();
- int allDataNodeSize = configManager.getNodeManager().getRegisteredDataNodeCount();
- if (allDataNodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
- status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode());
- status.setMessage(
- "lack replication, allow most removed Data Node size : "
- + (allDataNodeSize - NodeInfo.getMinimumDataNode()));
- }
- return status;
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 42055dd18a..79da434580 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -110,11 +110,11 @@ public interface IManager {
UDFManager getUDFManager();
/**
- * Get DataNodeRemoveManager
+ * Get ProcedureManager
*
- * @return DataNodeRemoveManager instance
+ * @return ProcedureManager instance
*/
- DataNodeRemoveManager getDataNodeRemoveManager();
+ ProcedureManager getProcedureManager();
/**
* Register DataNode
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index cf82d79aaf..03219060df 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.persistence.NodeInfo;
+import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
@@ -60,7 +61,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -139,8 +139,11 @@ public class NodeManager {
*/
public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
LOGGER.info("Node manager start to remove DataNode {}", removeDataNodePlan);
+
+ DataNodeRemoveHandler dataNodeRemoveHandler =
+ new DataNodeRemoveHandler((ConfigManager) configManager);
DataNodeToStatusResp preCheckStatus =
- configManager.getDataNodeRemoveManager().checkRemoveDataNodeRequest(removeDataNodePlan);
+ dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
"the remove Data Node request check failed. req: {}, check result: {}",
@@ -151,7 +154,7 @@ public class NodeManager {
// if add request to queue, then return to client
DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
boolean registerSucceed =
- configManager.getDataNodeRemoveManager().registerRequest(removeDataNodePlan);
+ configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
if (registerSucceed) {
status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -227,25 +230,6 @@ public class NodeManager {
}
return dataNodesLocations;
}
-
- /**
- * get data node remove request queue
- *
- * @return LinkedBlockingQueue
- */
- public LinkedBlockingQueue<RemoveDataNodePlan> getDataNodeRemoveRequestQueue() {
- return nodeInfo.getDataNodeRemoveRequestQueue();
- }
-
- /**
- * get head data node remove request
- *
- * @return RemoveDataNodeReq
- */
- public RemoveDataNodePlan getHeadRequestForDataNodeRemove() {
- return nodeInfo.getHeadRequestForDataNodeRemove();
- }
-
/**
* Provides ConfigNodeGroup information for the newly registered ConfigNode
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index bae3062c4d..7ed3527ab3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -24,19 +24,23 @@ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.rpc.RpcUtils;
@@ -134,6 +138,23 @@ public class ProcedureManager {
LOGGER.info("Submit to remove ConfigNode, {}", removeConfigNodePlan);
}
+ /**
+ * generate a procedure, and execute remove datanode one by one
+ *
+ * @param removeDataNodePlan
+ * @return
+ */
+ public boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
+ removeDataNodePlan
+ .getDataNodeLocations()
+ .forEach(
+ tDataNodeLocation -> {
+ this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
+ LOGGER.info("Submit to remove data node procedure, {}", tDataNodeLocation);
+ });
+ return true;
+ }
+
private static boolean getProcedureStatus(
ProcedureExecutor executor, List<Long> procIds, List<TSStatus> statusList) {
boolean isSucceed = true;
@@ -215,4 +236,17 @@ public class ProcedureManager {
public void setEnv(ConfigNodeProcedureEnv env) {
this.env = env;
}
+
+ public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
+ this.executor.getProcedures().values().stream()
+ .forEach(
+ procedure -> {
+ if (procedure instanceof RegionMigrateProcedure) {
+ RegionMigrateProcedure regionMigrateProcedure = (RegionMigrateProcedure) procedure;
+ if (regionMigrateProcedure.getConsensusGroupId().equals(req.getRegionId())) {
+ regionMigrateProcedure.notifyTheRegionMigrateFinished();
+ }
+ }
+ });
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index c996131e23..f62ac14fe9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -19,12 +19,9 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.enums.DataNodeRemoveState;
-import org.apache.iotdb.commons.enums.RegionMigrateState;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -35,7 +32,6 @@ import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -68,7 +64,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -99,15 +94,12 @@ public class NodeInfo implements SnapshotProcessor {
// TODO: implement
private final Set<TDataNodeLocation> drainingDataNodes = new HashSet<>();
- private final RemoveNodeInfo removeNodeInfo;
-
private final String snapshotFileName = "node_info.bin";
public NodeInfo() {
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
this.registeredConfigNodes = new HashSet<>();
- removeNodeInfo = new RemoveNodeInfo();
}
public void addMetrics() {
@@ -197,30 +189,38 @@ public class NodeInfo implements SnapshotProcessor {
}
/**
- * Persist Infomation about remove dataNode
+ * Persist Information about remove dataNode
*
* @param req RemoveDataNodeReq
* @return TSStatus
*/
public TSStatus removeDataNode(RemoveDataNodePlan req) {
- TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- removeNodeInfo.removeDataNode(req);
- return result;
+ try {
+ dataNodeInfoReadWriteLock.writeLock().lock();
+ req.getDataNodeLocations()
+ .forEach(
+ removeDataNodes -> {
+ registeredDataNodes.remove(removeDataNodes.getDataNodeId());
+ });
+ } finally {
+ dataNodeInfoReadWriteLock.writeLock().unlock();
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
/**
* Get DataNode info
*
- * @param getDataNodeConfigurationPlan QueryDataNodeInfoPlan
+ * @param getDataNodeInfoPlan QueryDataNodeInfoPlan
* @return The specific DataNode's info or all DataNode info if dataNodeId in
* QueryDataNodeInfoPlan is -1
*/
public DataNodeConfigurationResp getDataNodeInfo(
- GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
+ GetDataNodeConfigurationPlan getDataNodeInfoPlan) {
DataNodeConfigurationResp result = new DataNodeConfigurationResp();
result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
- int dataNodeId = getDataNodeConfigurationPlan.getDataNodeId();
+ int dataNodeId = getDataNodeInfoPlan.getDataNodeId();
dataNodeInfoReadWriteLock.readLock().lock();
try {
if (dataNodeId == -1) {
@@ -392,8 +392,6 @@ public class NodeInfo implements SnapshotProcessor {
serializeDrainingDataNodes(fileOutputStream, protocol);
- removeNodeInfo.serializeRemoveNodeInfo(fileOutputStream, protocol);
-
fileOutputStream.flush();
fileOutputStream.close();
@@ -467,8 +465,6 @@ public class NodeInfo implements SnapshotProcessor {
deserializeDrainingDataNodes(fileInputStream, protocol);
- removeNodeInfo.deserializeRemoveNodeInfo(fileInputStream, protocol);
-
} finally {
configNodeInfoReadWriteLock.writeLock().unlock();
dataNodeInfoReadWriteLock.writeLock().unlock();
@@ -534,162 +530,5 @@ public class NodeInfo implements SnapshotProcessor {
registeredDataNodes.clear();
drainingDataNodes.clear();
registeredConfigNodes.clear();
- removeNodeInfo.clear();
- }
-
- /**
- * get data node remove request queue
- *
- * @return LinkedBlockingQueue
- */
- public LinkedBlockingQueue<RemoveDataNodePlan> getDataNodeRemoveRequestQueue() {
- return removeNodeInfo.getDataNodeRemoveRequestQueue();
- }
-
- /**
- * get head data node remove request
- *
- * @return RemoveDataNodeReq
- */
- public RemoveDataNodePlan getHeadRequestForDataNodeRemove() {
- return removeNodeInfo.getHeadRequest();
- }
-
- /** storage remove Data Node request Info */
- private class RemoveNodeInfo {
- private LinkedBlockingQueue<RemoveDataNodePlan> dataNodeRemoveRequestQueue =
- new LinkedBlockingQueue<>();
-
- // which request is running
- private RemoveDataNodePlan headRequest = null;
-
- public RemoveNodeInfo() {}
-
- private void removeDataNode(RemoveDataNodePlan req) {
- if (!dataNodeRemoveRequestQueue.contains(req)) {
- dataNodeRemoveRequestQueue.add(req);
- } else {
- updateRemoveState(req);
- }
- LOGGER.info("request detail: {}", req);
- }
-
- private void removeSoppedDDataNode(TDataNodeLocation node) {
- try {
- dataNodeInfoReadWriteLock.writeLock().lock();
- registeredDataNodes.remove(node.getDataNodeId());
- } finally {
- dataNodeInfoReadWriteLock.writeLock().unlock();
- }
- }
-
- private void updateRemoveState(RemoveDataNodePlan req) {
- if (!req.isUpdate()) {
- LOGGER.warn("request is not in update status: {}", req);
- return;
- }
- this.headRequest = req;
-
- if (req.getExecDataNodeState() == DataNodeRemoveState.STOP) {
- // headNodeState = DataNodeRemoveState.STOP;
- int headNodeIndex = req.getExecDataNodeIndex();
- TDataNodeLocation stopNode = req.getDataNodeLocations().get(headNodeIndex);
- removeSoppedDDataNode(stopNode);
- LOGGER.info(
- "the Data Node {} remove succeed, now the registered Data Node size: {}",
- stopNode.getInternalEndPoint(),
- registeredDataNodes.size());
- }
-
- if (req.isFinished()) {
- this.dataNodeRemoveRequestQueue.remove(req);
- this.headRequest = null;
- }
- }
-
- private void serializeRemoveNodeInfo(OutputStream outputStream, TProtocol protocol)
- throws IOException, TException {
- // request queue
- ReadWriteIOUtils.write(dataNodeRemoveRequestQueue.size(), outputStream);
- for (RemoveDataNodePlan req : dataNodeRemoveRequestQueue) {
- TDataNodeRemoveReq tReq = new TDataNodeRemoveReq(req.getDataNodeLocations());
- tReq.write(protocol);
- }
- // -1 means headRequest is null, 1 means headRequest is not null
- if (headRequest == null) {
- ReadWriteIOUtils.write(-1, outputStream);
- return;
- }
-
- ReadWriteIOUtils.write(1, outputStream);
- TDataNodeRemoveReq tHeadReq = new TDataNodeRemoveReq(headRequest.getDataNodeLocations());
- tHeadReq.write(protocol);
-
- ReadWriteIOUtils.write(headRequest.getExecDataNodeIndex(), outputStream);
- ReadWriteIOUtils.write(headRequest.getExecDataNodeState().getCode(), outputStream);
-
- ReadWriteIOUtils.write(headRequest.getExecDataNodeRegionIds().size(), outputStream);
- for (TConsensusGroupId regionId : headRequest.getExecDataNodeRegionIds()) {
- regionId.write(protocol);
- }
- ReadWriteIOUtils.write(headRequest.getExecRegionIndex(), outputStream);
- ReadWriteIOUtils.write(headRequest.getExecRegionState().getCode(), outputStream);
- }
-
- private void deserializeRemoveNodeInfo(InputStream inputStream, TProtocol protocol)
- throws IOException, TException {
- int queueSize = ReadWriteIOUtils.readInt(inputStream);
- dataNodeRemoveRequestQueue = new LinkedBlockingQueue<>();
- for (int i = 0; i < queueSize; i++) {
- TDataNodeRemoveReq tReq = new TDataNodeRemoveReq();
- tReq.read(protocol);
- dataNodeRemoveRequestQueue.add(new RemoveDataNodePlan(tReq.getDataNodeLocations()));
- }
- boolean headRequestExist = ReadWriteIOUtils.readInt(inputStream) == 1;
- if (!headRequestExist) {
- headRequest = null;
- return;
- }
-
- TDataNodeRemoveReq tHeadReq = new TDataNodeRemoveReq();
- tHeadReq.read(protocol);
- headRequest = new RemoveDataNodePlan(tHeadReq.getDataNodeLocations());
- headRequest.setUpdate(true);
- headRequest.setFinished(false);
-
- int headNodeIndex = ReadWriteIOUtils.readInt(inputStream);
- DataNodeRemoveState headNodeState =
- DataNodeRemoveState.getStateByCode(ReadWriteIOUtils.readInt(inputStream));
- headRequest.setExecDataNodeIndex(headNodeIndex);
- headRequest.setExecDataNodeState(headNodeState);
-
- int headNodeRegionSize = ReadWriteIOUtils.readInt(inputStream);
- List<TConsensusGroupId> headNodeRegionIds = new ArrayList<>();
- for (int i = 0; i < headNodeRegionSize; i++) {
- TConsensusGroupId regionId = new TConsensusGroupId();
- regionId.read(protocol);
- headNodeRegionIds.add(regionId);
- }
- headRequest.setExecDataNodeRegionIds(headNodeRegionIds);
-
- int headRegionIndex = ReadWriteIOUtils.readInt(inputStream);
- RegionMigrateState headRegionState =
- RegionMigrateState.getStateByCode(ReadWriteIOUtils.readInt(inputStream));
- headRequest.setExecRegionIndex(headRegionIndex);
- headRequest.setExecRegionState(headRegionState);
- }
-
- private void clear() {
- dataNodeRemoveRequestQueue.clear();
- headRequest = null;
- }
-
- public LinkedBlockingQueue<RemoveDataNodePlan> getDataNodeRemoveRequestQueue() {
- return dataNodeRemoveRequestQueue;
- }
-
- public RemoveDataNodePlan getHeadRequest() {
- return headRequest;
- }
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index ff3ff0fe8b..ba316a95da 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroup
import org.apache.iotdb.confignode.exception.AddPeerException;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -49,13 +50,17 @@ public class ConfigNodeProcedureEnv {
private static final Logger LOG = LoggerFactory.getLogger(ConfigNodeProcedureEnv.class);
- /** add and remove config node lock */
- private final ReentrantLock configNodeLock = new ReentrantLock();
+ /** add or remove node lock */
+ private final LockQueue nodeLock = new LockQueue();
+
+ private final ReentrantLock schedulerLock = new ReentrantLock();
private final ConfigManager configManager;
private final ProcedureScheduler scheduler;
+ private final DataNodeRemoveHandler dataNodeRemoveHandler;
+
private static boolean skipForTest = false;
private static boolean invalidCacheResult = true;
@@ -71,6 +76,7 @@ public class ConfigNodeProcedureEnv {
public ConfigNodeProcedureEnv(ConfigManager configManager, ProcedureScheduler scheduler) {
this.configManager = configManager;
this.scheduler = scheduler;
+ this.dataNodeRemoveHandler = new DataNodeRemoveHandler(configManager);
}
public ConfigManager getConfigManager() {
@@ -245,11 +251,23 @@ public class ConfigNodeProcedureEnv {
ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
}
- public ReentrantLock getConfigNodeLock() {
- return configNodeLock;
+ public LockQueue getNodeLock() {
+ return nodeLock;
}
public ProcedureScheduler getScheduler() {
return scheduler;
}
+
+ public LockQueue getRegionMigrateLock() {
+ return dataNodeRemoveHandler.getRegionMigrateLock();
+ }
+
+ public ReentrantLock getSchedulerLock() {
+ return schedulerLock;
+ }
+
+ public DataNodeRemoveHandler getDataNodeRemoveHandler() {
+ return dataNodeRemoveHandler;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
new file mode 100644
index 0000000000..4663ed47d3
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
+import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.NodeInfo;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class DataNodeRemoveHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class);
+
+ private ConfigManager configManager;
+
+ /** region migrate lock */
+ private final LockQueue regionMigrateLock = new LockQueue();
+
+ public DataNodeRemoveHandler(ConfigManager configManager) {
+ this.configManager = configManager;
+ }
+
+ /**
+ * Get all consensus group id in this node
+ *
+ * @param dataNodeLocation data node location
+ * @return group id list
+ */
+ public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) {
+ return configManager.getPartitionManager().getAllReplicaSets().stream()
+ .filter(
+ rg ->
+ rg.getDataNodeLocations().contains(dataNodeLocation)
+ && rg.regionId.getType() != TConsensusGroupType.PartitionRegion)
+ .map(TRegionReplicaSet::getRegionId)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * broadcast these datanode in RemoveDataNodeReq are disabled, so they will not accept read/write
+ * request
+ *
+ * @param disabledDataNode TDataNodeLocation
+ */
+ public TSStatus broadcastDisableDataNode(TDataNodeLocation disabledDataNode) {
+ LOGGER.info(
+ "DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ List<TEndPoint> otherOnlineDataNodes =
+ configManager.getLoadManager().getOnlineDataNodes(-1).stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .filter(loc -> !loc.equals(disabledDataNode))
+ .map(TDataNodeLocation::getInternalEndPoint)
+ .collect(Collectors.toList());
+
+ for (TEndPoint server : otherOnlineDataNodes) {
+ TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
+ status =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE);
+ if (!isSucceed(status)) {
+ return status;
+ }
+ }
+ LOGGER.info(
+ "DataNodeRemoveService finished send disable the Data Node to cluster, {}",
+ disabledDataNode);
+ status.setMessage("Succeed disable the Data Node from cluster");
+ return status;
+ }
+
+ /**
+ * Find dest data node
+ *
+ * @param regionId region id
+ * @return dest data node location
+ */
+ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
+ TSStatus status;
+ List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ if (regionReplicaNodes.isEmpty()) {
+ LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage("not find region replica nodes, region: " + regionId);
+ return null;
+ }
+
+ // will migrate the region to the new node, which should not be same raft
+ Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
+ if (!newNode.isPresent()) {
+ LOGGER.warn("No enough Data node to migrate region: {}", regionId);
+ }
+ return newNode.get();
+ }
+
+ /**
+ * Send to DataNode, migrate region from originalDataNode to destDataNode
+ *
+ * @param originalDataNode old location data node
+ * @param destDataNode dest data node
+ * @param regionId region id
+ * @return migrate status
+ */
+ public TSStatus migrateRegion(
+ TDataNodeLocation originalDataNode,
+ TDataNodeLocation destDataNode,
+ TConsensusGroupId regionId) {
+ TSStatus status;
+ List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ if (regionReplicaNodes.isEmpty()) {
+ LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage("not find region replica nodes, region: " + regionId);
+ return status;
+ }
+
+ // TODO if region replica is 1, the new leader is null, it also need to migrate
+ Optional<TDataNodeLocation> newLeaderNode =
+ regionReplicaNodes.stream().filter(e -> !e.equals(originalDataNode)).findAny();
+ if (!newLeaderNode.isPresent()) {
+ LOGGER.warn(
+ "No other Node to change region leader, check by show regions, region: {}", regionId);
+ status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage("No other Node to change region leader, check by show regions");
+ return status;
+ }
+
+ TMigrateRegionReq migrateRegionReq =
+ new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
+ migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
+ status =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ originalDataNode.getInternalEndPoint(),
+ migrateRegionReq,
+ DataNodeRequestType.MIGRATE_REGION);
+ LOGGER.debug(
+ "send region {} migrate action to {}, wait it finished", regionId, originalDataNode);
+ return status;
+ }
+
+ /**
+ * Update region location cache
+ *
+ * @param regionId region id
+ * @param originalDataNode old location data node
+ * @param destDataNode dest data node
+ */
+ public void updateRegionLocationCache(
+ TConsensusGroupId regionId,
+ TDataNodeLocation originalDataNode,
+ TDataNodeLocation destDataNode) {
+ LOGGER.debug(
+ "start to update region {} location from {} to {} when it migrate succeed",
+ regionId,
+ originalDataNode.getInternalEndPoint().getIp(),
+ destDataNode.getInternalEndPoint().getIp());
+ UpdateRegionLocationPlan req =
+ new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
+ TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
+ LOGGER.info(
+ "update region {} location finished, result:{}, old:{}, new:{}",
+ regionId,
+ status,
+ originalDataNode.getInternalEndPoint().getIp(),
+ destDataNode.getInternalEndPoint().getIp());
+ }
+
+ /**
+ * Find region replication Nodes
+ *
+ * @param regionId region id
+ * @return data node location
+ */
+ public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
+ List<TRegionReplicaSet> regionReplicaSets =
+ configManager.getPartitionManager().getAllReplicaSets().stream()
+ .filter(rg -> rg.regionId.equals(regionId))
+ .collect(Collectors.toList());
+ if (regionReplicaSets.isEmpty()) {
+ LOGGER.warn("not find TRegionReplica for region: {}", regionId);
+ return Collections.emptyList();
+ }
+
+ return regionReplicaSets.get(0).getDataNodeLocations();
+ }
+
+ private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
+ List<TDataNodeLocation> regionReplicaNodes) {
+ return configManager.getLoadManager().getOnlineDataNodes(-1).stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .filter(e -> !regionReplicaNodes.contains(e))
+ .findAny();
+ }
+
+ /**
+ * add region Consensus group in new node
+ *
+ * @param regionId region id
+ * @param destDataNode dest data node
+ * @return status
+ */
+ public TSStatus addNewNodeToRegionConsensusGroup(
+ TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
+ TSStatus status;
+ List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ if (regionReplicaNodes.isEmpty()) {
+ LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage("not find region replica nodes, region: " + regionId);
+ return status;
+ }
+
+ String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
+ status =
+ SyncDataNodeClientPool.getInstance()
+ .addToRegionConsensusGroup(
+ // TODO replace with real ttl
+ regionReplicaNodes, regionId, destDataNode, storageGroup, Long.MAX_VALUE);
+ LOGGER.debug("send add region {} consensus group to {}", regionId, destDataNode);
+ if (isFailed(status)) {
+ LOGGER.error(
+ "add new node {} to region {} consensus group failed, result: {}",
+ destDataNode,
+ regionId,
+ status);
+ }
+ return status;
+ }
+
+ private boolean isSucceed(TSStatus status) {
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+ }
+
+ private boolean isFailed(TSStatus status) {
+ return !isSucceed(status);
+ }
+
+ /**
+ * Stop old data node
+ *
+ * @param dataNode old data node
+ * @return status
+ * @throws ProcedureException procedure exception
+ */
+ public TSStatus stopDataNode(TDataNodeLocation dataNode) throws ProcedureException {
+ LOGGER.info("begin to stop Data Node {}", dataNode);
+ AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
+ TSStatus status =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(
+ dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
+ LOGGER.info("stop Data Node {} result: {}", dataNode, status);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new ProcedureException("Failed to stop data node");
+ }
+ return status;
+ }
+
+ /**
+ * check if the remove datanode request illegal
+ *
+ * @param removeDataNodePlan RemoveDataNodeReq
+ * @return SUCCEED_STATUS when request is legal.
+ */
+ public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) {
+ DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
+ dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ TSStatus status = checkRegionReplication(removeDataNodePlan);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataSet.setStatus(status);
+ return dataSet;
+ }
+
+ status = checkDataNodeExist(removeDataNodePlan);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataSet.setStatus(status);
+ return dataSet;
+ }
+
+ return dataSet;
+ }
+
+ /**
+ * check if has removed Data Node but not exist in cluster
+ *
+ * @param removeDataNodePlan RemoveDataNodeReq
+ * @return SUCCEED_STATUS if not has
+ */
+ private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) {
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+ List<TDataNodeLocation> allDataNodes =
+ configManager.getNodeManager().getRegisteredDataNodes(-1).stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toList());
+ boolean hasNotExistNode =
+ removeDataNodePlan.getDataNodeLocations().stream()
+ .anyMatch(loc -> !allDataNodes.contains(loc));
+ if (hasNotExistNode) {
+ status.setCode(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode());
+ status.setMessage("there exist Data Node in request but not in cluster");
+ }
+ return status;
+ }
+
+ /**
+ * check if has enought replication in cluster
+ *
+ * @param removeDataNodePlan RemoveDataNodeReq
+ * @return SUCCEED_STATUS if not has
+ */
+ private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) {
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ int removedDataNodeSize = removeDataNodePlan.getDataNodeLocations().size();
+ int allDataNodeSize = configManager.getNodeManager().getRegisteredDataNodeCount();
+ if (allDataNodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
+ status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode());
+ status.setMessage(
+ "lack replication, allow most removed Data Node size : "
+ + (allDataNodeSize - NodeInfo.getMinimumDataNode()));
+ }
+ return status;
+ }
+
+ public LockQueue getRegionMigrateLock() {
+ return regionMigrateLock;
+ }
+
+ /**
+ * Remove data node in node info
+ *
+ * @param tDataNodeLocation data node location
+ */
+ public void removeDataNodePersistence(TDataNodeLocation tDataNodeLocation) {
+ List<TDataNodeLocation> removeDataNodes = new ArrayList<>();
+ removeDataNodes.add(tDataNodeLocation);
+ configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java
new file mode 100644
index 0000000000..15a256143e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Node procedure */
+public abstract class AbstractNodeProcedure<TState>
+ extends StateMachineProcedure<ConfigNodeProcedureEnv, TState> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeProcedure.class);
+
+ @Override
+ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ configNodeProcedureEnv.getSchedulerLock().lock();
+ try {
+ if (configNodeProcedureEnv.getNodeLock().tryLock(this)) {
+ LOG.info("{} acquire lock.", getProcId());
+ return ProcedureLockState.LOCK_ACQUIRED;
+ }
+ configNodeProcedureEnv.getNodeLock().waitProcedure(this);
+ LOG.info("{} wait for lock.", getProcId());
+ return ProcedureLockState.LOCK_EVENT_WAIT;
+ } finally {
+ configNodeProcedureEnv.getSchedulerLock().unlock();
+ }
+ }
+
+ @Override
+ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ configNodeProcedureEnv.getSchedulerLock().lock();
+ try {
+ LOG.info("{} release lock.", getProcId());
+ if (configNodeProcedureEnv.getNodeLock().releaseLock(this)) {
+ configNodeProcedureEnv
+ .getNodeLock()
+ .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler());
+ }
+ } finally {
+ configNodeProcedureEnv.getSchedulerLock().unlock();
+ }
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
index dd544c9815..7edda907cc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
@@ -22,12 +22,9 @@ package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.state.AddConfigNodeState;
-import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.slf4j.Logger;
@@ -38,8 +35,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
/** add config node procedure */
-public class AddConfigNodeProcedure
- extends StateMachineProcedure<ConfigNodeProcedureEnv, AddConfigNodeState> {
+public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeState> {
private static final Logger LOG = LoggerFactory.getLogger(AddConfigNodeProcedure.class);
private static final int retryThreshold = 5;
@@ -121,33 +117,6 @@ public class AddConfigNodeProcedure
return false;
}
- @Override
- protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
- if (configNodeProcedureEnv.getConfigNodeLock().tryLock()) {
- LOG.info("{} acquire lock.", getProcId());
- return ProcedureLockState.LOCK_ACQUIRED;
- }
- SimpleProcedureScheduler simpleProcedureScheduler =
- (SimpleProcedureScheduler) configNodeProcedureEnv.getScheduler();
- simpleProcedureScheduler.addWaiting(this);
- LOG.info("{} wait for lock.", getProcId());
- return ProcedureLockState.LOCK_EVENT_WAIT;
- }
-
- @Override
- protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
- LOG.info("{} release lock.", getProcId());
- configNodeProcedureEnv.getConfigNodeLock().unlock();
- SimpleProcedureScheduler simpleProcedureScheduler =
- (SimpleProcedureScheduler) configNodeProcedureEnv.getScheduler();
- simpleProcedureScheduler.releaseWaiting();
- }
-
- @Override
- protected boolean holdLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
- return configNodeProcedureEnv.getConfigNodeLock().isHeldByCurrentThread();
- }
-
@Override
protected AddConfigNodeState getState(int stateId) {
return AddConfigNodeState.values()[stateId];
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
new file mode 100644
index 0000000000..9a8c13505f
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
+import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** region migrate procedure */
+public class RegionMigrateProcedure
+ extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> {
+ private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class);
+ private static final int retryThreshold = 5;
+
+ /** Wait region migrate finished */
+ private final Object regionMigrateLock = new Object();
+
+ private TConsensusGroupId consensusGroupId;
+
+ private TDataNodeLocation originalDataNode;
+
+ private TDataNodeLocation destDataNode;
+
+ public RegionMigrateProcedure() {
+ super();
+ }
+
+ public RegionMigrateProcedure(
+ TConsensusGroupId consensusGroupId,
+ TDataNodeLocation originalDataNode,
+ TDataNodeLocation destDataNode) {
+ super();
+ this.consensusGroupId = consensusGroupId;
+ this.originalDataNode = originalDataNode;
+ this.destDataNode = destDataNode;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionState state) {
+ if (consensusGroupId == null) {
+ return Flow.NO_MORE_STATE;
+ }
+ try {
+ switch (state) {
+ case REGION_MIGRATE_PREPARE:
+ setNextState(RegionTransitionState.ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP);
+ break;
+ case ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP:
+ env.getDataNodeRemoveHandler()
+ .addNewNodeToRegionConsensusGroup(consensusGroupId, destDataNode);
+ setNextState(RegionTransitionState.MIGRATE_REGION);
+ break;
+ case MIGRATE_REGION:
+ env.getDataNodeRemoveHandler()
+ .migrateRegion(originalDataNode, destDataNode, consensusGroupId);
+ setNextState(RegionTransitionState.WAIT_FOR_REGION_MIGRATE_FINISHED);
+ break;
+ case WAIT_FOR_REGION_MIGRATE_FINISHED:
+ waitForTheRegionMigrateFinished(consensusGroupId);
+ setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE);
+ LOG.info("Wait for region migrate finished");
+ break;
+ case UPDATE_REGION_LOCATION_CACHE:
+ env.getDataNodeRemoveHandler()
+ .updateRegionLocationCache(consensusGroupId, originalDataNode, destDataNode);
+ return Flow.NO_MORE_STATE;
+ }
+ } catch (Exception e) {
+ if (isRollbackSupported(state)) {
+ setFailure(new ProcedureException("Region migrate failed " + state));
+ } else {
+ LOG.error(
+ "Retrievable error trying to region migrate {}, state {}", originalDataNode, state, e);
+ if (getCycles() > retryThreshold) {
+ setFailure(new ProcedureException("State stuck at " + state));
+ }
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, RegionTransitionState state)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ protected boolean isRollbackSupported(RegionTransitionState state) {
+ return false;
+ }
+
+ @Override
+ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ configNodeProcedureEnv.getSchedulerLock().lock();
+ try {
+ if (configNodeProcedureEnv.getRegionMigrateLock().tryLock(this)) {
+ LOG.info("{} acquire lock.", getProcId());
+ return ProcedureLockState.LOCK_ACQUIRED;
+ }
+ configNodeProcedureEnv.getRegionMigrateLock().waitProcedure(this);
+
+ LOG.info("{} wait for lock.", getProcId());
+ return ProcedureLockState.LOCK_EVENT_WAIT;
+ } finally {
+ configNodeProcedureEnv.getSchedulerLock().unlock();
+ }
+ }
+
+ @Override
+ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ configNodeProcedureEnv.getSchedulerLock().lock();
+ try {
+ LOG.info("{} release lock.", getProcId());
+ if (configNodeProcedureEnv.getRegionMigrateLock().releaseLock(this)) {
+ configNodeProcedureEnv
+ .getRegionMigrateLock()
+ .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler());
+ }
+ } finally {
+ configNodeProcedureEnv.getSchedulerLock().unlock();
+ }
+ }
+
+ @Override
+ protected RegionTransitionState getState(int stateId) {
+ return RegionTransitionState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(RegionTransitionState regionTransitionState) {
+ return regionTransitionState.ordinal();
+ }
+
+ @Override
+ protected RegionTransitionState getInitialState() {
+ return RegionTransitionState.REGION_MIGRATE_PREPARE;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeInt(ProcedureFactory.ProcedureType.REGION_MIGRATE_PROCEDURE.ordinal());
+ super.serialize(stream);
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(originalDataNode, stream);
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(destDataNode, stream);
+ ThriftCommonsSerDeUtils.serializeTConsensusGroupId(consensusGroupId, stream);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ try {
+ originalDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+ destDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+ consensusGroupId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+ } catch (ThriftSerDeException e) {
+ LOG.error("Error in deserialize RemoveConfigNodeProcedure", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof RegionMigrateProcedure) {
+ RegionMigrateProcedure thatProc = (RegionMigrateProcedure) that;
+ return thatProc.getProcId() == this.getProcId()
+ && thatProc.getState() == this.getState()
+ && thatProc.originalDataNode.equals(this.originalDataNode)
+ && thatProc.destDataNode.equals(this.destDataNode)
+ && thatProc.consensusGroupId.equals(this.consensusGroupId);
+ }
+ return false;
+ }
+
+ public TSStatus waitForTheRegionMigrateFinished(TConsensusGroupId consensusGroupId) {
+ TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ synchronized (regionMigrateLock) {
+ try {
+ // TODO set timeOut?
+ regionMigrateLock.wait();
+ } catch (InterruptedException e) {
+ LOG.error("region migrate {} interrupt", consensusGroupId, e);
+ Thread.currentThread().interrupt();
+ status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+ status.setMessage("wait region migrate interrupt," + e.getMessage());
+ }
+ }
+ return status;
+ }
+
+ public void notifyTheRegionMigrateFinished() {
+ synchronized (regionMigrateLock) {
+ regionMigrateLock.notify();
+ }
+ }
+
+ public TConsensusGroupId getConsensusGroupId() {
+ return consensusGroupId;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
index d7d3be33db..52f13e4aa6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
@@ -22,11 +22,8 @@ package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
-import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.RemoveConfigNodeState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
@@ -38,8 +35,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
/** remove config node procedure */
-public class RemoveConfigNodeProcedure
- extends StateMachineProcedure<ConfigNodeProcedureEnv, RemoveConfigNodeState> {
+public class RemoveConfigNodeProcedure extends AbstractNodeProcedure<RemoveConfigNodeState> {
private static final Logger LOG = LoggerFactory.getLogger(RemoveConfigNodeProcedure.class);
private static final int retryThreshold = 5;
@@ -105,33 +101,6 @@ public class RemoveConfigNodeProcedure
return true;
}
- @Override
- protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
- if (configNodeProcedureEnv.getConfigNodeLock().tryLock()) {
- LOG.info("{} acquire lock.", getProcId());
- return ProcedureLockState.LOCK_ACQUIRED;
- }
- SimpleProcedureScheduler simpleProcedureScheduler =
- (SimpleProcedureScheduler) configNodeProcedureEnv.getScheduler();
- simpleProcedureScheduler.addWaiting(this);
- LOG.info("{} wait for lock.", getProcId());
- return ProcedureLockState.LOCK_EVENT_WAIT;
- }
-
- @Override
- protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
- LOG.info("{} release lock.", getProcId());
- configNodeProcedureEnv.getConfigNodeLock().unlock();
- SimpleProcedureScheduler simpleProcedureScheduler =
- (SimpleProcedureScheduler) configNodeProcedureEnv.getScheduler();
- simpleProcedureScheduler.releaseWaiting();
- }
-
- @Override
- protected boolean holdLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
- return configNodeProcedureEnv.getConfigNodeLock().isHeldByCurrentThread();
- }
-
@Override
protected RemoveConfigNodeState getState(int stateId) {
return RemoveConfigNodeState.values()[stateId];
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
new file mode 100644
index 0000000000..14348a3cc1
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/** remove data node procedure */
+public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNodeState> {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
+ private static final int retryThreshold = 5;
+
+ private TDataNodeLocation tDataNodeLocation;
+
+ private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
+
+ public RemoveDataNodeProcedure() {
+ super();
+ }
+
+ public RemoveDataNodeProcedure(TDataNodeLocation tDataNodeLocation) {
+ super();
+ this.tDataNodeLocation = tDataNodeLocation;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) {
+ if (tDataNodeLocation == null) {
+ return Flow.NO_MORE_STATE;
+ }
+ try {
+ switch (state) {
+ case REMOVE_DATA_NODE_PREPARE:
+ execDataNodeRegionIds =
+ env.getDataNodeRemoveHandler().getDataNodeRegionIds(tDataNodeLocation);
+ LOG.info("DataNode region id is {}", execDataNodeRegionIds);
+ setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
+ break;
+ case BROADCAST_DISABLE_DATA_NODE:
+ env.getDataNodeRemoveHandler().broadcastDisableDataNode(tDataNodeLocation);
+ setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE);
+ break;
+ case SUBMIT_REGION_MIGRATE:
+ submitChildRegionMigrate(env);
+ setNextState(RemoveDataNodeState.STOP_DATA_NODE);
+ break;
+ case STOP_DATA_NODE:
+ env.getDataNodeRemoveHandler().stopDataNode(tDataNodeLocation);
+ env.getDataNodeRemoveHandler().removeDataNodePersistence(tDataNodeLocation);
+ return Flow.NO_MORE_STATE;
+ }
+ } catch (Exception e) {
+ if (isRollbackSupported(state)) {
+ setFailure(new ProcedureException("Remove Data Node failed " + state));
+ } else {
+ LOG.error(
+ "Retrievable error trying to remove data node {}, state {}",
+ tDataNodeLocation,
+ state,
+ e);
+ if (getCycles() > retryThreshold) {
+ setFailure(new ProcedureException("State stuck at " + state));
+ }
+ }
+ }
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv env, RemoveDataNodeState state)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ protected boolean isRollbackSupported(RemoveDataNodeState state) {
+ return false;
+ }
+
+ /**
+ * Used to keep procedure lock even when the procedure is yielded or suspended.
+ *
+ * @param env env
+ * @return true if hold the lock
+ */
+ protected boolean holdLock(ConfigNodeProcedureEnv env) {
+ return true;
+ }
+
+ @Override
+ protected RemoveDataNodeState getState(int stateId) {
+ return RemoveDataNodeState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(RemoveDataNodeState removeDataNodeState) {
+ return removeDataNodeState.ordinal();
+ }
+
+ @Override
+ protected RemoveDataNodeState getInitialState() {
+ return RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeInt(ProcedureFactory.ProcedureType.REMOVE_DATA_NODE_PROCEDURE.ordinal());
+ super.serialize(stream);
+ ThriftCommonsSerDeUtils.serializeTDataNodeLocation(tDataNodeLocation, stream);
+ stream.writeInt(execDataNodeRegionIds.size());
+ execDataNodeRegionIds.forEach(
+ tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ try {
+ tDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+ int regionSize = byteBuffer.getInt();
+ execDataNodeRegionIds = new ArrayList<>(regionSize);
+ for (int i = 0; i < regionSize; i++) {
+ execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
+ }
+ } catch (ThriftSerDeException e) {
+ LOG.error("Error in deserialize RemoveConfigNodeProcedure", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof RemoveDataNodeProcedure) {
+ RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that;
+ return thatProc.getProcId() == this.getProcId()
+ && thatProc.getState() == this.getState()
+ && thatProc.tDataNodeLocation.equals(this.tDataNodeLocation);
+ }
+ return false;
+ }
+
+ private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
+ execDataNodeRegionIds.forEach(
+ regionId -> {
+ TDataNodeLocation destDataNode =
+ env.getDataNodeRemoveHandler().findDestDataNode(regionId);
+ if (destDataNode != null) {
+ RegionMigrateProcedure regionMigrateProcedure =
+ new RegionMigrateProcedure(regionId, tDataNodeLocation, destDataNode);
+ addChildProcedure(regionMigrateProcedure);
+ LOG.info("Submit child procedure, {}", regionMigrateProcedure);
+ }
+ });
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
new file mode 100644
index 0000000000..6980a2054d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.scheduler;
+
+import org.apache.iotdb.confignode.procedure.Procedure;
+
+import java.util.ArrayDeque;
+
+/** Lock Queue for procedure of the same type */
+public class LockQueue {
+ private final ArrayDeque<Procedure> deque = new ArrayDeque<>();
+
+ private Procedure<?> lockOwnerProcedure = null;
+
+ public boolean tryLock(Procedure<?> procedure) {
+ if (lockOwnerProcedure == null) {
+ lockOwnerProcedure = procedure;
+ return true;
+ }
+ if (procedure.getProcId() == lockOwnerProcedure.getProcId()) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean releaseLock(Procedure<?> procedure) {
+ if (lockOwnerProcedure == null || lockOwnerProcedure.getProcId() != procedure.getProcId()) {
+ return false;
+ }
+ lockOwnerProcedure = null;
+ return true;
+ }
+
+ public void waitProcedure(Procedure procedure) {
+ deque.addLast(procedure);
+ }
+
+ public int wakeWaitingProcedures(ProcedureScheduler procedureScheduler) {
+ int count = deque.size();
+ while (!deque.isEmpty()) {
+ procedureScheduler.addFront(deque.pollFirst());
+ }
+ deque.clear();
+ return count;
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
new file mode 100644
index 0000000000..0c83cd2411
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum RegionTransitionState {
+ REGION_MIGRATE_PREPARE,
+ ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP,
+ MIGRATE_REGION,
+ WAIT_FOR_REGION_MIGRATE_FINISHED,
+ UPDATE_REGION_LOCATION_CACHE
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveDataNodeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveDataNodeState.java
new file mode 100644
index 0000000000..30d2476056
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveDataNodeState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum RemoveDataNodeState {
+ REMOVE_DATA_NODE_PREPARE,
+ BROADCAST_DISABLE_DATA_NODE,
+ SUBMIT_REGION_MIGRATE,
+ STOP_DATA_NODE
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 1b5398b77e..dd28aebfdb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.confignode.procedure.store;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -47,6 +49,12 @@ public class ProcedureFactory implements IProcedureFactory {
case REMOVE_CONFIG_NODE_PROCEDURE:
procedure = new RemoveConfigNodeProcedure();
break;
+ case REMOVE_DATA_NODE_PROCEDURE:
+ procedure = new RemoveDataNodeProcedure();
+ break;
+ case REGION_MIGRATE_PROCEDURE:
+ procedure = new RegionMigrateProcedure();
+ break;
default:
throw new IOException("unknown Procedure type: " + typeNum);
}
@@ -61,6 +69,10 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.ADD_CONFIG_NODE_PROCEDURE;
} else if (procedure instanceof RemoveConfigNodeProcedure) {
return ProcedureType.REMOVE_CONFIG_NODE_PROCEDURE;
+ } else if (procedure instanceof RemoveDataNodeProcedure) {
+ return ProcedureType.REMOVE_DATA_NODE_PROCEDURE;
+ } else if (procedure instanceof RegionMigrateProcedure) {
+ return ProcedureType.REGION_MIGRATE_PROCEDURE;
}
return null;
}
@@ -69,6 +81,8 @@ public class ProcedureFactory implements IProcedureFactory {
DELETE_STORAGE_GROUP_PROCEDURE,
ADD_CONFIG_NODE_PROCEDURE,
REMOVE_CONFIG_NODE_PROCEDURE,
+ REMOVE_DATA_NODE_PROCEDURE,
+ REGION_MIGRATE_PROCEDURE
}
private static class ProcedureFactoryHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index b818bade24..a8b6648055 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -21,20 +21,17 @@ package org.apache.iotdb.confignode.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
@@ -161,7 +158,6 @@ public class ConfigNode implements ConfigNodeMBean {
// Setup MetricsService
registerManager.register(MetricsService.getInstance());
MetricsService.getInstance().startAllReporter();
- configManager.getDataNodeRemoveManager().start();
LOGGER.info("Successfully setup internal services.");
}
@@ -231,29 +227,6 @@ public class ConfigNode implements ConfigNodeMBean {
LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
}
- public void doRemoveNode(String[] args) throws IOException {
- LOGGER.info("Starting to remove {}...", ConfigNodeConstant.GLOBAL_NAME);
- if (args.length != 3) {
- LOGGER.info("Usage: -r <ip>:<rpcPort>");
- return;
- }
-
- try {
- TEndPoint endPoint = NodeUrlUtils.parseTEndPointUrl(args[2]);
- TConfigNodeLocation removeConfigNodeLocation =
- ConfigNodeRemoveCheck.getInstance().removeCheck(endPoint);
- if (removeConfigNodeLocation == null) {
- LOGGER.error("The ConfigNode not in the Cluster.");
- return;
- }
-
- ConfigNodeRemoveCheck.getInstance().removeConfigNode(removeConfigNodeLocation);
- } catch (BadNodeUrlException e) {
- LOGGER.warn("No ConfigNodes need to be removed.", e);
- }
- LOGGER.info("{} is removed.", ConfigNodeConstant.GLOBAL_NAME);
- }
-
private static class ConfigNodeHolder {
private static final ConfigNode INSTANCE = new ConfigNode();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
index 421ea4b961..124b765449 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
@@ -18,10 +18,16 @@
*/
package org.apache.iotdb.confignode.service;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.StartupChecks;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
import org.slf4j.Logger;
@@ -76,7 +82,7 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
} else if (MODE_REMOVE.equals(mode)) {
// remove node
try {
- ConfigNode.getInstance().doRemoveNode(args);
+ doRemoveNode(args);
} catch (IOException e) {
LOGGER.error("Meet error when doing remove", e);
return -1;
@@ -85,4 +91,27 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
return 0;
}
+
+ private void doRemoveNode(String[] args) throws IOException {
+ LOGGER.info("Starting to remove {}...", ConfigNodeConstant.GLOBAL_NAME);
+ if (args.length != 3) {
+ LOGGER.info("Usage: -r <ip>:<rpcPort>");
+ return;
+ }
+
+ try {
+ TEndPoint endPoint = NodeUrlUtils.parseTEndPointUrl(args[2]);
+ TConfigNodeLocation removeConfigNodeLocation =
+ ConfigNodeRemoveCheck.getInstance().removeCheck(endPoint);
+ if (removeConfigNodeLocation == null) {
+ LOGGER.error("The ConfigNode not in the Cluster.");
+ return;
+ }
+
+ ConfigNodeRemoveCheck.getInstance().removeConfigNode(removeConfigNodeLocation);
+ } catch (BadNodeUrlException e) {
+ LOGGER.warn("No ConfigNodes need to be removed.", e);
+ }
+ LOGGER.info("{} is removed.", ConfigNodeConstant.GLOBAL_NAME);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 295e01737d..1d104f23f7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -176,7 +176,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
@Override
public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) throws TException {
- configManager.getDataNodeRemoveManager().reportRegionMigrateResult(req);
+ configManager.getProcedureManager().reportRegionMigrateResult(req);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlanTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlanTest.java
index bb4977a1f7..85e026d18f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlanTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlanTest.java
@@ -153,7 +153,6 @@ public class RemoveDataNodePlanTest {
private RemoveDataNodePlan runPlanSerializeAndDeSerialize(boolean update) throws IOException {
try (DataOutputStream outputStream =
new DataOutputStream(Files.newOutputStream(Paths.get(serializedFileName)))) {
- req1.setUpdate(update);
req1.serializeImpl(outputStream);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 54ff85eef6..49305421cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
@@ -35,18 +34,14 @@ import org.apache.iotdb.commons.service.StartupChecks;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.IoTDBStartCheck;
-import org.apache.iotdb.db.conf.IoTDBStopCheck;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -78,10 +73,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
public class DataNode implements DataNodeMBean {
private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
@@ -144,103 +136,6 @@ public class DataNode implements DataNodeMBean {
}
}
- /**
- * remove datanodes from cluster
- *
- * @param args IPs for removed datanodes, split with ','
- */
- protected void doRemoveNode(String[] args) {
- try {
- removePrepare(args);
- removeNodesFromCluster(args);
- removeTail();
- } catch (Exception e) {
- logger.error("remove Data Nodes error", e);
- }
- }
-
- private void removePrepare(String[] args) throws BadNodeUrlException {
- ConfigNodeInfo.getInstance()
- .updateConfigNodeList(IoTDBDescriptor.getInstance().getConfig().getTargetConfigNodeList());
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
- TDataNodeConfigurationResp resp = configNodeClient.getDataNodeConfiguration(-1);
- // 1. online Data Node size - removed Data Node size < replication,NOT ALLOW remove
- // But replication size is set in Config Node's configuration, so check it in remote Config
- // Node
-
- // 2. removed Data Node IP not contained in below map, CAN NOT remove.
- Map<Integer, TDataNodeConfiguration> nodeIdToNodeConfiguration =
- resp.getDataNodeConfigurationMap();
- List<String> removedDataNodeIps = Arrays.asList(args[1].split(","));
- List<String> onlineDataNodeIps =
- nodeIdToNodeConfiguration.values().stream()
- .map(TDataNodeConfiguration::getLocation)
- .map(TDataNodeLocation::getInternalEndPoint)
- .map(TEndPoint::getIp)
- .collect(Collectors.toList());
- IoTDBStopCheck.getInstance().checkDuplicateIp(removedDataNodeIps);
- IoTDBStopCheck.getInstance().checkIpInCluster(removedDataNodeIps, onlineDataNodeIps);
- } catch (TException e) {
- logger.error("remove Data Nodes check failed", e);
- }
- }
-
- private void removeNodesFromCluster(String[] args) {
- logger.info("start to remove DataNode from cluster");
- List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]);
- if (dataNodeLocations.isEmpty()) {
- logger.error("data nodes location is empty");
- // throw Exception OR return?
- return;
- }
- logger.info(
- "there has data nodes location will be removed. size is: {}, detail: {}",
- dataNodeLocations.size(),
- dataNodeLocations);
- TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
- TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
- logger.info("Remove result {} ", removeResp.toString());
- } catch (TException e) {
- logger.error("send remove Data Node request failed!", e);
- }
- }
-
- /**
- * fetch all datanode info from ConfigNode, then compare with input 'ips'
- *
- * @param ips data node ip, split with ','
- * @return TDataNodeLocation list
- */
- private List<TDataNodeLocation> buildDataNodeLocations(String ips) {
- List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
- if (ips == null || ips.trim().isEmpty()) {
- return dataNodeLocations;
- }
-
- List<String> dataNodeIps = Arrays.asList(ips.split(","));
- try (ConfigNodeClient client = new ConfigNodeClient()) {
- dataNodeLocations =
- client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
- .map(TDataNodeConfiguration::getLocation)
- .filter(location -> dataNodeIps.contains(location.getInternalEndPoint().getIp()))
- .collect(Collectors.toList());
- } catch (TException e) {
- logger.error("get data node locations failed", e);
- }
-
- if (dataNodeIps.size() != dataNodeLocations.size()) {
- logger.error(
- "build DataNode locations error, "
- + "because number of input ip NOT EQUALS the number of fetched DataNodeLocations, will return empty locations");
- return dataNodeLocations;
- }
-
- return dataNodeLocations;
- }
-
- private void removeTail() {}
-
/** initialize the current node and its services */
public boolean initLocalEngines() {
IoTDB.setClusterMode();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index 3900236413..debe51b5ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -18,13 +18,30 @@
*/
package org.apache.iotdb.db.service;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.ConfigurationException;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.IoTDBStopCheck;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
public class DataNodeServerCommandLine extends ServerCommandLine {
@@ -75,10 +92,111 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
if (MODE_START.equals(mode)) {
dataNode.doAddNode(args);
} else if (MODE_REMOVE.equals(mode)) {
- dataNode.doRemoveNode(args);
+ doRemoveNode(args);
} else {
logger.error("Unrecognized mode {}", mode);
}
return 0;
}
+
+ /**
+ * remove datanodes from cluster
+ *
+ * @param args IPs for removed datanodes, split with ','
+ */
+ private void doRemoveNode(String[] args) {
+ try {
+ removePrepare(args);
+ removeNodesFromCluster(args);
+ removeTail();
+ } catch (Exception e) {
+ logger.error("remove Data Nodes error", e);
+ }
+ }
+
+ private void removePrepare(String[] args) throws BadNodeUrlException {
+ ConfigNodeInfo.getInstance()
+ .updateConfigNodeList(IoTDBDescriptor.getInstance().getConfig().getTargetConfigNodeList());
+ try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ TDataNodeConfigurationResp resp = configNodeClient.getDataNodeConfiguration(-1);
+ // 1. online Data Node size - removed Data Node size < replication,NOT ALLOW remove
+ // But replication size is set in Config Node's configuration, so check it in remote Config
+ // Node
+
+ // 2. removed Data Node IP not contained in below map, CAN NOT remove.
+ Map<Integer, TDataNodeConfiguration> nodeIdToNodeConfiguration =
+ resp.getDataNodeConfigurationMap();
+ List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args[1]);
+ List<String> removedDataNodeIps =
+ endPoints.stream().map(TEndPoint::getIp).collect(Collectors.toList());
+
+ List<String> onlineDataNodeIps =
+ nodeIdToNodeConfiguration.values().stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .map(TDataNodeLocation::getInternalEndPoint)
+ .map(TEndPoint::getIp)
+ .collect(Collectors.toList());
+ IoTDBStopCheck.getInstance().checkIpInCluster(removedDataNodeIps, onlineDataNodeIps);
+ } catch (TException e) {
+ logger.error("remove Data Nodes check failed", e);
+ }
+ }
+
+ private void removeNodesFromCluster(String[] args) throws BadNodeUrlException {
+ logger.info("start to remove DataNode from cluster");
+ List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]);
+ if (dataNodeLocations.isEmpty()) {
+ logger.error("data nodes location is empty");
+ // throw Exception OR return?
+ return;
+ }
+ logger.info(
+ "there has data nodes location will be removed. size is: {}, detail: {}",
+ dataNodeLocations.size(),
+ dataNodeLocations);
+ TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
+ try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
+ logger.info("Remove result {} ", removeResp.toString());
+ } catch (TException e) {
+ logger.error("send remove Data Node request failed!", e);
+ }
+ }
+
+ /**
+ * fetch all datanode info from ConfigNode, then compare with input 'ips'
+ *
+ * @param endPorts data node ip:port, split with ','
+ * @return TDataNodeLocation list
+ */
+ private List<TDataNodeLocation> buildDataNodeLocations(String endPorts)
+ throws BadNodeUrlException {
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ if (endPorts == null || endPorts.trim().isEmpty()) {
+ return dataNodeLocations;
+ }
+
+ List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(endPorts);
+
+ try (ConfigNodeClient client = new ConfigNodeClient()) {
+ dataNodeLocations =
+ client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .filter(location -> endPoints.contains(location.getClientRpcEndPoint()))
+ .collect(Collectors.toList());
+ } catch (TException e) {
+ logger.error("get data node locations failed", e);
+ }
+
+ if (endPoints.size() != dataNodeLocations.size()) {
+ logger.error(
+ "build DataNode locations error, "
+ + "because number of input ip NOT EQUALS the number of fetched DataNodeLocations, will return empty locations");
+ return dataNodeLocations;
+ }
+
+ return dataNodeLocations;
+ }
+
+ private void removeTail() {}
}