You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/07/22 02:30:18 UTC

[iotdb] branch master updated: [IOTDB-3815] replace remove data node to procedure (#6736)

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ffe93cb27 [IOTDB-3815] replace remove data node to procedure (#6736)
1ffe93cb27 is described below

commit 1ffe93cb270ae9d8152a9507bb80acb95ceb6ee6
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Fri Jul 22 10:30:13 2022 +0800

    [IOTDB-3815] replace remove data node to procedure (#6736)
---
 .../request/write/RemoveDataNodePlan.java          | 124 +---
 .../statemachine/PartitionRegionStateMachine.java  |   2 -
 .../iotdb/confignode/manager/ConfigManager.java    |  10 +-
 .../confignode/manager/DataNodeRemoveManager.java  | 806 ---------------------
 .../apache/iotdb/confignode/manager/IManager.java  |   6 +-
 .../iotdb/confignode/manager/NodeManager.java      |  28 +-
 .../iotdb/confignode/manager/ProcedureManager.java |  34 +
 .../iotdb/confignode/persistence/NodeInfo.java     | 191 +----
 .../procedure/env/ConfigNodeProcedureEnv.java      |  26 +-
 .../procedure/env/DataNodeRemoveHandler.java       | 381 ++++++++++
 .../procedure/impl/AbstractNodeProcedure.java      |  64 ++
 .../procedure/impl/AddConfigNodeProcedure.java     |  33 +-
 .../procedure/impl/RegionMigrateProcedure.java     | 230 ++++++
 .../procedure/impl/RemoveConfigNodeProcedure.java  |  33 +-
 .../procedure/impl/RemoveDataNodeProcedure.java    | 184 +++++
 .../confignode/procedure/scheduler/LockQueue.java  |  63 ++
 .../procedure/state/RegionTransitionState.java     |  28 +
 .../procedure/state/RemoveDataNodeState.java       |  27 +
 .../procedure/store/ProcedureFactory.java          |  14 +
 .../iotdb/confignode/service/ConfigNode.java       |  27 -
 .../confignode/service/ConfigNodeCommandLine.java  |  31 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   2 +-
 .../request/write/RemoveDataNodePlanTest.java      |   1 -
 .../java/org/apache/iotdb/db/service/DataNode.java | 105 ---
 .../db/service/DataNodeServerCommandLine.java      | 120 ++-
 25 files changed, 1225 insertions(+), 1345 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlan.java
index e2178e6725..bf59f00ed1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlan.java
@@ -18,10 +18,7 @@
  */
 package org.apache.iotdb.confignode.consensus.request.write;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.commons.enums.DataNodeRemoveState;
-import org.apache.iotdb.commons.enums.RegionMigrateState;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
@@ -32,29 +29,10 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 public class RemoveDataNodePlan extends ConfigPhysicalPlan {
   private List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
 
-  // if true, means update the  request exec  state.
-  private boolean isUpdate;
-
-  // [0, dataNodeLocations.size), -1 means No Data Node exec remove action
-  private int execDataNodeIndex = -1;
-
-  private DataNodeRemoveState execDataNodeState = DataNodeRemoveState.NORMAL;
-
-  private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
-
-  // [0, execDataNodeRegionIds), -1 means No Region exec migrate action
-  private int execRegionIndex = -1;
-
-  // if the request finished
-  private boolean finished = false;
-
-  private RegionMigrateState execRegionState = RegionMigrateState.ONLINE;
-
   public RemoveDataNodePlan() {
     super(ConfigPhysicalPlanType.RemoveDataNode);
   }
@@ -62,7 +40,6 @@ public class RemoveDataNodePlan extends ConfigPhysicalPlan {
   public RemoveDataNodePlan(List<TDataNodeLocation> dataNodeLocations) {
     this();
     this.dataNodeLocations = dataNodeLocations;
-    isUpdate = false;
   }
 
   @Override
@@ -71,17 +48,6 @@ public class RemoveDataNodePlan extends ConfigPhysicalPlan {
     stream.writeInt(dataNodeLocations.size());
     dataNodeLocations.forEach(
         location -> ThriftCommonsSerDeUtils.serializeTDataNodeLocation(location, stream));
-    stream.writeInt(isUpdate ? 1 : 0);
-    if (isUpdate) {
-      stream.writeInt(execDataNodeIndex);
-      stream.writeInt(execDataNodeState.getCode());
-      stream.writeInt(execDataNodeRegionIds.size());
-      execDataNodeRegionIds.forEach(
-          tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
-      stream.writeInt(execRegionIndex);
-      stream.writeInt(execRegionState.getCode());
-      stream.writeInt(finished ? 1 : 0);
-    }
   }
 
   @Override
@@ -90,19 +56,6 @@ public class RemoveDataNodePlan extends ConfigPhysicalPlan {
     for (int i = 0; i < dataNodeLocationSize; i++) {
       dataNodeLocations.add(ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer));
     }
-    isUpdate = buffer.getInt() == 1;
-    if (isUpdate) {
-      execDataNodeIndex = buffer.getInt();
-      execDataNodeState = DataNodeRemoveState.getStateByCode(buffer.getInt());
-      int regionSize = buffer.getInt();
-      execDataNodeRegionIds = new ArrayList<>(regionSize);
-      for (int i = 0; i < regionSize; i++) {
-        execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer));
-      }
-      execRegionIndex = buffer.getInt();
-      execRegionState = RegionMigrateState.getStateByCode(buffer.getInt());
-      finished = buffer.getInt() == 1;
-    }
   }
 
   @Override
@@ -138,81 +91,6 @@ public class RemoveDataNodePlan extends ConfigPhysicalPlan {
 
   @Override
   public String toString() {
-    return "{RemoveDataNodeReq: "
-        + "TDataNodeLocations: "
-        + this.getDataNodeLocations()
-        + ", is update: "
-        + this.isUpdate()
-        + ", is finished: "
-        + this.isFinished()
-        + ", exec node index: "
-        + this.getExecDataNodeIndex()
-        + ", exec node state: "
-        + this.getExecDataNodeState()
-        + ", exec region index: "
-        + this.getExecRegionIndex()
-        + ", exec region state: "
-        + this.getExecRegionState()
-        + ", exec node region ids: "
-        + this.getExecDataNodeRegionIds().stream()
-            .map(TConsensusGroupId::getId)
-            .collect(Collectors.toList())
-        + "}";
-  }
-
-  public boolean isUpdate() {
-    return isUpdate;
-  }
-
-  public void setUpdate(boolean update) {
-    isUpdate = update;
-  }
-
-  public int getExecDataNodeIndex() {
-    return execDataNodeIndex;
-  }
-
-  public void setExecDataNodeIndex(int execDataNodeIndex) {
-    this.execDataNodeIndex = execDataNodeIndex;
-  }
-
-  public DataNodeRemoveState getExecDataNodeState() {
-    return execDataNodeState;
-  }
-
-  public void setExecDataNodeState(DataNodeRemoveState execDataNodeState) {
-    this.execDataNodeState = execDataNodeState;
-  }
-
-  public List<TConsensusGroupId> getExecDataNodeRegionIds() {
-    return execDataNodeRegionIds;
-  }
-
-  public void setExecDataNodeRegionIds(List<TConsensusGroupId> execDataNodeRegionIds) {
-    this.execDataNodeRegionIds = execDataNodeRegionIds;
-  }
-
-  public int getExecRegionIndex() {
-    return execRegionIndex;
-  }
-
-  public void setExecRegionIndex(int execRegionIndex) {
-    this.execRegionIndex = execRegionIndex;
-  }
-
-  public RegionMigrateState getExecRegionState() {
-    return execRegionState;
-  }
-
-  public void setExecRegionState(RegionMigrateState execRegionState) {
-    this.execRegionState = execRegionState;
-  }
-
-  public void setFinished(boolean finished) {
-    this.finished = finished;
-  }
-
-  public boolean isFinished() {
-    return this.finished;
+    return "{RemoveDataNodeReq: " + "TDataNodeLocations: " + this.getDataNodeLocations() + "}";
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 026135128c..1afcde2f4c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -142,11 +142,9 @@ public class PartitionRegionStateMachine implements IStateMachine, IStateMachine
       LOGGER.info("Current node {} is Leader, start procedure manager.", newLeader);
       configManager.getProcedureManager().shiftExecutor(true);
       configManager.getLoadManager().start();
-      configManager.getDataNodeRemoveManager().beLeader();
     } else {
       configManager.getProcedureManager().shiftExecutor(false);
       configManager.getLoadManager().stop();
-      configManager.getDataNodeRemoveManager().beFollower();
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 24f34f5577..1f905c0258 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -143,8 +143,6 @@ public class ConfigManager implements IManager {
   /** UDF */
   private final UDFManager udfManager;
 
-  private final DataNodeRemoveManager dataNodeRemoveManager;
-
   public ConfigManager() throws IOException {
     // Build the persistence module
     NodeInfo nodeInfo = new NodeInfo();
@@ -168,7 +166,6 @@ public class ConfigManager implements IManager {
     this.procedureManager = new ProcedureManager(this, procedureInfo);
     this.udfManager = new UDFManager(this, udfInfo);
     this.loadManager = new LoadManager(this);
-    this.dataNodeRemoveManager = new DataNodeRemoveManager(this);
     this.consensusManager = new ConsensusManager(this, stateMachine);
   }
 
@@ -176,7 +173,6 @@ public class ConfigManager implements IManager {
     consensusManager.close();
     partitionManager.getRegionCleaner().shutdown();
     procedureManager.shiftExecutor(false);
-    dataNodeRemoveManager.stop();
   }
 
   @Override
@@ -862,11 +858,6 @@ public class ConfigManager implements IManager {
     return udfManager;
   }
 
-  @Override
-  public DataNodeRemoveManager getDataNodeRemoveManager() {
-    return dataNodeRemoveManager;
-  }
-
   @Override
   public DataSet showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
     TSStatus status = confirmLeader();
@@ -936,6 +927,7 @@ public class ConfigManager implements IManager {
     return dataNodeConfigurationResp;
   }
 
+  @Override
   public ProcedureManager getProcedureManager() {
     return procedureManager;
   }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
deleted file mode 100644
index 6d58fdd10e..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeRemoveManager.java
+++ /dev/null
@@ -1,806 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.enums.DataNodeRemoveState;
-import org.apache.iotdb.commons.enums.RegionMigrateState;
-import org.apache.iotdb.confignode.client.DataNodeRequestType;
-import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
-import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
-import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
-import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
-import org.apache.iotdb.confignode.persistence.NodeInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-
-public class DataNodeRemoveManager {
-  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveManager.class);
-  private static final int QUEUE_SIZE_LIMIT = 200;
-
-  private boolean hasSetUp = false;
-
-  private ConfigManager configManager;
-
-  private final LinkedBlockingQueue<RemoveDataNodePlan> removeQueue;
-
-  // which request is running
-  private RemoveDataNodePlan headRequest = null;
-
-  // which data node is removing in `headRequest`, [0, req TDataNodeLocation list size)
-  private int headNodeIndex = -1;
-
-  private DataNodeRemoveState headNodeState = DataNodeRemoveState.NORMAL;
-
-  // the region ids belongs to head node
-  private List<TConsensusGroupId> headNodeRegionIds = new ArrayList<>();
-
-  // which region is migrating on head node
-  private int headRegionIndex = -1;
-
-  private RegionMigrateState headRegionState = RegionMigrateState.ONLINE;
-
-  private volatile boolean stopped = false;
-
-  private volatile boolean isLeader = false;
-  private final Object leaderLock = new Object();
-  private Thread workThread;
-  private Thread waitLeaderThread;
-
-  private final Object regionMigrateLock = new Object();
-  private TRegionMigrateResultReportReq lastRegionMigrateResult = null;
-
-  public DataNodeRemoveManager(ConfigManager configManager) {
-    this.configManager = configManager;
-    removeQueue = new LinkedBlockingQueue<>(QUEUE_SIZE_LIMIT);
-
-    createWaitLeaderThread();
-    createWorkThread();
-  }
-
-  /** start the manager when Config node startup */
-  public void start() {
-    if (!hasSetUp) {
-      // TODO 1. when restart,reload info from NoInfo and continue
-      setUp();
-      hasSetUp = true;
-    }
-    LOGGER.info("Data Node remove service start");
-    // 2. if it is not leader, loop check
-    // configManager.getConsensusManager().isLeader())
-    waitUntilBeLeader();
-
-    // 3. Take and exec request one by one
-    // 4. When Data Node's state or Region's state change, then modify the request and write it to
-    // Consensus.
-    // 5. TODO if leader change?
-    execRequestFromQueue();
-  }
-
-  private void createWaitLeaderThread() {
-    waitLeaderThread =
-        new Thread(
-            () -> {
-              while (!stopped && !configManager.getConsensusManager().isLeader()) {
-                try {
-                  Thread.sleep(2000);
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                }
-              }
-              LOGGER.info("the config node is leader now!");
-              isLeader = true;
-              synchronized (leaderLock) {
-                leaderLock.notify();
-              }
-            });
-    waitLeaderThread.setName("Wait-Leader-Thread");
-  }
-
-  private void createWorkThread() {
-    workThread =
-        new Thread(
-            () -> {
-              while (!stopped) {
-                RemoveDataNodePlan req = null;
-                try {
-                  while (!isLeader) {
-                    LOGGER.warn("the ConfigNode is not leader, waiting...");
-                    synchronized (leaderLock) {
-                      leaderLock.wait();
-                    }
-                  }
-                  LOGGER.info("the ConfigNode is leader now, will take request and run");
-                  req = removeQueue.take();
-                  LOGGER.info("exec the request : {}", req);
-                  prepareHeadRequestInfo(req);
-                  TSStatus status = execRemoveDataNodeRequest(headRequest);
-                  LOGGER.info("exec the request: {}, result:  {}", req, status);
-                  // unRegisterRequest(req);
-                } catch (InterruptedException e) {
-                  LOGGER.warn("work thread interrupted", e);
-                  Thread.currentThread().interrupt();
-                } catch (Exception e) {
-                  LOGGER.warn("the request run failed", e);
-                } finally {
-                  if (req != null) {
-                    unRegisterRequest(req);
-                  }
-                }
-              }
-            });
-    workThread.setName("Exec-RemoveDataNode-Thread");
-  }
-
-  private void waitUntilBeLeader() {
-    if (waitLeaderThread != null) {
-      waitLeaderThread.start();
-    }
-  }
-
-  private void execRequestFromQueue() {
-    if (workThread != null) {
-      workThread.start();
-    }
-  }
-
-  public void beLeader() {
-    // TODO
-    /** this.isLeader = true; synchronized (leaderLock) { leaderLock.notify(); } */
-  }
-
-  public void beFollower() {
-    // TODO
-    /** this.isLeader = false; */
-  }
-
-  /**
-   * prepare for the request. from first node, first region to exec
-   *
-   * @param req RemoveDataNodeReq
-   */
-  private void prepareHeadRequestInfo(RemoveDataNodePlan req) {
-    LOGGER.info("start to prepare for request: {}", req);
-    this.headRequest = req;
-    // to exec the request, will change it's state at different stage.
-    this.headNodeIndex = 0;
-    this.headNodeState = DataNodeRemoveState.NORMAL;
-
-    TDataNodeLocation headNode = req.getDataNodeLocations().get(headNodeIndex);
-    this.headNodeRegionIds =
-        configManager.getPartitionManager().getAllReplicaSets().stream()
-            .filter(rg -> rg.getDataNodeLocations().contains(headNode))
-            .filter(rg -> rg.regionId.getType() != TConsensusGroupType.PartitionRegion)
-            .map(TRegionReplicaSet::getRegionId)
-            .collect(Collectors.toList());
-    this.headRegionIndex = 0;
-    this.headRegionState = RegionMigrateState.ONLINE;
-
-    // modify head quest
-    this.headRequest.setUpdate(true);
-    this.headRequest.setExecDataNodeRegionIds(headNodeRegionIds);
-    this.headRequest.setExecDataNodeIndex(headNodeIndex);
-    this.headRequest.setExecRegionIndex(headRegionIndex);
-    this.headRequest.setExecDataNodeState(headNodeState);
-    this.headRequest.setExecRegionState(headRegionState);
-    configManager.getConsensusManager().write(headRequest);
-    LOGGER.info("finished to prepare for request: {}", req);
-  }
-
-  private void setUp() {
-    removeQueue.addAll(configManager.getNodeManager().getDataNodeRemoveRequestQueue());
-    headRequest = configManager.getNodeManager().getHeadRequestForDataNodeRemove();
-    if (headRequest == null) {
-      return;
-    }
-
-    // avoid duplication
-    if (removeQueue.contains(headRequest)) {
-      removeQueue.remove(headRequest);
-    }
-
-    headNodeIndex = headRequest.getExecDataNodeIndex();
-    headNodeState = headRequest.getExecDataNodeState();
-    headNodeRegionIds = headRequest.getExecDataNodeRegionIds();
-    headRegionIndex = headRequest.getExecRegionIndex();
-    headRegionState = headRequest.getExecRegionState();
-  }
-
-  /**
-   * exec the request loop all removed node 1: brocast it to cluster 2: loop region on it and
-   * migrate region 3 stop the node or roll back
-   *
-   * @param req RemoveDataNodeReq
-   */
-  private TSStatus execRemoveDataNodeRequest(RemoveDataNodePlan req) {
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-
-    for (TDataNodeLocation dataNodeLocation : req.getDataNodeLocations()) {
-      headNodeIndex = req.getDataNodeLocations().indexOf(dataNodeLocation);
-      status = broadcastDisableDataNode(req, dataNodeLocation);
-      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.error("Disable Data Node Error {}", status);
-        return status;
-      }
-
-      // fetch data/schema region from one datanode
-      status = migrateSingleDataNodeRegions(req, dataNodeLocation);
-      if (isSucceed(status)) {
-        status = stopDataNode(req, dataNodeLocation);
-        if (isFailed(status)) {
-          LOGGER.error(
-              "send rpc to stop the Data Node {} error, please stop it use command",
-              dataNodeLocation);
-        }
-      } else {
-        LOGGER.error("the request run failed {}, result {}. will roll back", req, status);
-        rollBackSingleNode(req, dataNodeLocation);
-      }
-    }
-    return status;
-  }
-
-  private void updateRegionLocationCache(
-      TConsensusGroupId regionId, TDataNodeLocation oldNode, TDataNodeLocation newNode) {
-    LOGGER.debug(
-        "start to update region {} location from {} to {} when it migrate succeed",
-        regionId,
-        oldNode.getInternalEndPoint().getIp(),
-        newNode.getInternalEndPoint().getIp());
-    UpdateRegionLocationPlan req = new UpdateRegionLocationPlan(regionId, oldNode, newNode);
-    TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
-    LOGGER.debug(
-        "update region {} location finished, result:{}, old:{}, new:{}",
-        regionId,
-        status,
-        oldNode.getInternalEndPoint().getIp(),
-        newNode.getInternalEndPoint().getIp());
-  }
-
-  private void rollBackSingleNode(RemoveDataNodePlan req, TDataNodeLocation node) {
-    LOGGER.warn("roll back remove data node {} in the request {}", node, req);
-    if (headRegionState == RegionMigrateState.DATA_COPY_FAILED) {
-      // TODO delete target node the head region data
-      TConsensusGroupId tRegionId = lastRegionMigrateResult.getRegionId();
-      for (Map.Entry<TDataNodeLocation, TRegionMigrateFailedType> entry :
-          lastRegionMigrateResult.getFailedNodeAndReason().entrySet()) {
-        TDataNodeLocation failedNode = entry.getKey();
-        TRegionMigrateFailedType failedReason = entry.getValue();
-        switch (failedReason) {
-            // TODO how to impl roll back
-          case AddPeerFailed:
-            LOGGER.warn(
-                "add new peer node {} for region {} failed, will roll back",
-                failedNode.getInternalEndPoint().getIp(),
-                tRegionId);
-            break;
-          case RemovePeerFailed:
-            LOGGER.warn(
-                "remove old peer node {} for region {} failed, will roll back",
-                failedNode.getInternalEndPoint().getIp(),
-                tRegionId);
-            break;
-          case RemoveConsensusGroupFailed:
-            LOGGER.warn(
-                "remove consensus group on node {} for region {} failed, will roll back",
-                failedNode.getInternalEndPoint().getIp(),
-                tRegionId);
-            break;
-          case DeleteRegionFailed:
-            LOGGER.warn(
-                "create region {} instance on {} failed, will roll back",
-                failedNode.getInternalEndPoint().getIp(),
-                tRegionId);
-            break;
-          default:
-            LOGGER.warn(
-                "UnSupport reason {} for region {} migrate failed", failedReason, tRegionId);
-        }
-      }
-    }
-    // TODO if roll back failed, FAILED
-    storeDataNodeState(req, DataNodeRemoveState.REMOVE_FAILED);
-  }
-
-  /**
-   * broadcast these datanode in RemoveDataNodeReq are disabled, so they will not accept read/write
-   * request
-   *
-   * @param req RemoveDataNodeReq
-   * @param disabledDataNode TDataNodeLocation
-   */
-  private TSStatus broadcastDisableDataNode(
-      RemoveDataNodePlan req, TDataNodeLocation disabledDataNode) {
-    LOGGER.info(
-        "DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
-    storeDataNodeState(req, DataNodeRemoveState.REMOVE_START);
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    List<TEndPoint> otherOnlineDataNodes =
-        configManager.getLoadManager().getOnlineDataNodes(-1).stream()
-            .map(TDataNodeConfiguration::getLocation)
-            .filter(loc -> !loc.equals(disabledDataNode))
-            .map(TDataNodeLocation::getInternalEndPoint)
-            .collect(Collectors.toList());
-
-    for (TEndPoint server : otherOnlineDataNodes) {
-      TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
-      status =
-          SyncDataNodeClientPool.getInstance()
-              .sendSyncRequestToDataNodeWithRetry(
-                  server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE);
-      if (!isSucceed(status)) {
-        return status;
-      }
-    }
-    LOGGER.info(
-        "DataNodeRemoveService finished send disable the Data Node to cluster, {}",
-        disabledDataNode);
-    status.setMessage("Succeed disable the Data Node from cluster");
-    return status;
-  }
-
-  private void storeRegionState(RemoveDataNodePlan req, RegionMigrateState state) {
-    req.setExecRegionIndex(headRegionIndex);
-    headRegionState = state;
-    req.setExecRegionState(headRegionState);
-    configManager.getConsensusManager().write(req);
-  }
-
-  private void storeDataNodeState(RemoveDataNodePlan req, DataNodeRemoveState state) {
-    req.setExecDataNodeIndex(headNodeIndex);
-    headNodeState = state;
-    req.setExecDataNodeState(headNodeState);
-    configManager.getConsensusManager().write(req);
-  }
-
-  private TSStatus migrateSingleDataNodeRegions(
-      RemoveDataNodePlan req, TDataNodeLocation dataNodeLocation) {
-    LOGGER.info("start to migrate regions on the Data Node: {}", dataNodeLocation);
-    TSStatus status;
-    storeDataNodeState(req, DataNodeRemoveState.REGION_MIGRATING);
-    for (TConsensusGroupId regionId : headNodeRegionIds) {
-      headRegionIndex = headNodeRegionIds.indexOf(regionId);
-      // impl migrate region with twice rpc: CN-->DN(send), DN-->CN(report)
-      status = migrateSingleRegion(req, dataNodeLocation, regionId);
-      // if has one region migrate failed, the node remove failed
-      if (isFailed(status)) {
-        storeDataNodeState(req, DataNodeRemoveState.REGION_MIGRATE_FAILED);
-        return status;
-      }
-    }
-
-    // all regions on the node migrate succeed
-    storeDataNodeState(req, DataNodeRemoveState.REGION_MIGRATE_SUCCEED);
-    status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    status.setMessage("The Data Node migrate regions succeed");
-    LOGGER.info("finished to migrate regions on the Data Node: {}", dataNodeLocation);
-    return status;
-  }
-
-  private TSStatus migrateSingleRegion(
-      RemoveDataNodePlan req, TDataNodeLocation node, TConsensusGroupId regionId) {
-    // change region leader:  change leader and then add peer, will cause ratis server exit
-    // so modify policy: add peer firs, then change region leader, see:
-    // RegionMigrateService.changeLeader()
-
-    // do migrate region
-    TSStatus status = doMigrateSingleRegion(req, node, regionId);
-    if (isFailed(status)) {
-      storeRegionState(req, RegionMigrateState.DATA_COPY_FAILED);
-      return status;
-    }
-    storeRegionState(req, RegionMigrateState.DATA_COPY_SUCCEED);
-    return status;
-  }
-
-  private TSStatus changeSingleRegionLeader(
-      RemoveDataNodePlan req, TDataNodeLocation node, TConsensusGroupId regionId) {
-    storeRegionState(req, RegionMigrateState.LEADER_CHANGING);
-    LOGGER.debug("start to send region leader change. {}", regionId);
-    TSStatus status;
-    // pick a node in same raft group to be new region leader
-    List<TRegionReplicaSet> regionReplicaSets =
-        configManager.getPartitionManager().getAllReplicaSets().stream()
-            .filter(rg -> rg.getRegionId().equals(regionId))
-            .collect(Collectors.toList());
-    if (regionReplicaSets.isEmpty()) {
-      LOGGER.warn("not find TRegionReplica for region: {}, ignore it", regionId);
-      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find TRegionReplica for region, ignore");
-      return status;
-    }
-    Optional<TDataNodeLocation> newLeaderNode =
-        regionReplicaSets.get(0).getDataNodeLocations().stream()
-            .filter(e -> !e.equals(node))
-            .findAny();
-    if (!newLeaderNode.isPresent()) {
-      LOGGER.warn("No enough Data node to change leader for region: {}", regionId);
-      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("No enough Data node to change leader for region " + regionId);
-      return status;
-    }
-    status =
-        SyncDataNodeClientPool.getInstance()
-            .changeRegionLeader(regionId, node.getInternalEndPoint(), newLeaderNode.get());
-    LOGGER.debug("finished to send region leader change. {}", regionId);
-    return status;
-  }
-
-  private TSStatus doMigrateSingleRegion(
-      RemoveDataNodePlan req, TDataNodeLocation node, TConsensusGroupId regionId) {
-    storeRegionState(req, RegionMigrateState.DATA_COPYING);
-    LOGGER.debug("start to migrate region {}", regionId);
-    TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
-      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find region replica nodes, region: " + regionId);
-      return status;
-    }
-
-    // will migrate the region to the new node, which should not be same raft
-    Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
-    if (!newNode.isPresent()) {
-      LOGGER.warn("No enough Data node to migrate region: {}", regionId);
-      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("No enough Data node to migrate region, region: " + regionId);
-      return status;
-    }
-
-    status = addNewNodeToRegionConsensusGroup(regionId, regionReplicaNodes, newNode.get());
-    if (isFailed(status)) {
-      return status;
-    }
-
-    // TODO if region replica is 1, the new leader is null, it also need to migrate
-    Optional<TDataNodeLocation> newLeaderNode =
-        regionReplicaNodes.stream().filter(e -> !e.equals(node)).findAny();
-    if (!newLeaderNode.isPresent()) {
-      LOGGER.warn(
-          "No other Node to change region leader, check by show regions, region: {}", regionId);
-      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("No other Node to change region leader, check by show regions");
-      return status;
-    }
-
-    TMigrateRegionReq migrateRegionReq = new TMigrateRegionReq(regionId, node, newNode.get());
-    migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
-    status =
-        SyncDataNodeClientPool.getInstance()
-            .sendSyncRequestToDataNodeWithRetry(
-                node.getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.MIGRATE_REGION);
-    // maybe send rpc failed
-    if (isFailed(status)) {
-      return status;
-    }
-    LOGGER.debug("send region {} migrate action to {}, wait it finished", regionId, node);
-    // wait DN report the region migrate result, when DN reported, then will notify and continue
-    status = waitForTheRegionMigrateFinished();
-    // interrupt wait
-    if (isFailed(status)) {
-      return status;
-    }
-    LOGGER.debug(
-        "wait region {} migrate finished. migrate result: {}", regionId, lastRegionMigrateResult);
-    status = lastRegionMigrateResult.migrateResult;
-    if (isSucceed(status)) {
-      updateRegionLocationCache(regionId, node, newNode.get());
-    }
-    return status;
-  }
-
-  private List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
-    List<TRegionReplicaSet> regionReplicaSets =
-        configManager.getPartitionManager().getAllReplicaSets().stream()
-            .filter(rg -> rg.regionId.equals(regionId))
-            .collect(Collectors.toList());
-    if (regionReplicaSets.isEmpty()) {
-      LOGGER.warn("not find TRegionReplica for region: {}", regionId);
-      return Collections.emptyList();
-    }
-
-    return regionReplicaSets.get(0).getDataNodeLocations();
-  }
-
-  private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
-      List<TDataNodeLocation> regionReplicaNodes) {
-    return configManager.getLoadManager().getOnlineDataNodes(-1).stream()
-        .map(TDataNodeConfiguration::getLocation)
-        .filter(e -> !regionReplicaNodes.contains(e))
-        .findAny();
-  }
-
-  private TSStatus addNewNodeToRegionConsensusGroup(
-      TConsensusGroupId regionId,
-      List<TDataNodeLocation> regionReplicaNodes,
-      TDataNodeLocation newNode) {
-    String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
-    TSStatus status =
-        SyncDataNodeClientPool.getInstance()
-            .addToRegionConsensusGroup(
-                // TODO replace with real ttl
-                regionReplicaNodes, regionId, newNode, storageGroup, Long.MAX_VALUE);
-    LOGGER.debug("send add region {} consensus group to {}", regionId, newNode);
-    if (isFailed(status)) {
-      LOGGER.error(
-          "add new node {} to region {} consensus group failed,  result: {}",
-          newNode,
-          regionId,
-          status);
-    }
-    return status;
-  }
-
-  private boolean isSucceed(TSStatus status) {
-    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-  }
-
-  private boolean isFailed(TSStatus status) {
-    return !isSucceed(status);
-  }
-
-  /**
-   * register a RemoveDataNodeReq
-   *
-   * @param req RemoveDataNodeReq
-   * @return true if register succeed.
-   */
-  public synchronized boolean registerRequest(RemoveDataNodePlan req) {
-    if (!removeQueue.add(req)) {
-      LOGGER.error("register request failed");
-      return false;
-    }
-    ConsensusWriteResponse resp = configManager.getConsensusManager().write(req);
-    LOGGER.info("write register request to Consensus result : {} for req {}", resp, req);
-    return resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
-  }
-
-  /**
-   * remove a RemoveDataNodeReq
-   *
-   * @param req RemoveDataNodeReq
-   */
-  public void unRegisterRequest(RemoveDataNodePlan req) {
-    req.setFinished(true);
-    configManager.getConsensusManager().write(req);
-    reset();
-    LOGGER.info("unregister request succeed, remain {} request", removeQueue.size());
-  }
-
-  public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
-    LOGGER.debug("accept region {} migrate result, result: {}", req.getRegionId(), req);
-    notifyTheRegionMigrateFinished(req);
-  }
-
-  private TSStatus waitForTheRegionMigrateFinished() {
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    synchronized (regionMigrateLock) {
-      try {
-        // TODO set timeOut?
-        regionMigrateLock.wait();
-      } catch (InterruptedException e) {
-        LOGGER.error("region migrate {} interrupt", headNodeRegionIds.get(headRegionIndex), e);
-        Thread.currentThread().interrupt();
-        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-        status.setMessage("wait region migrate interrupt," + e.getMessage());
-      }
-    }
-    return status;
-  }
-
-  private void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq req) {
-    lastRegionMigrateResult = req;
-    synchronized (regionMigrateLock) {
-      regionMigrateLock.notify();
-    }
-  }
-
-  private void reset() {
-    this.headRequest = null;
-    this.headNodeIndex = -1;
-    this.headNodeState = DataNodeRemoveState.NORMAL;
-    this.headNodeRegionIds.clear();
-    this.headRegionIndex = -1;
-    this.headRegionState = RegionMigrateState.ONLINE;
-  }
-
-  private TSStatus stopDataNode(RemoveDataNodePlan req, TDataNodeLocation dataNode) {
-    LOGGER.info("begin to stop Data Node {} in request {}", dataNode, req);
-    storeDataNodeState(req, DataNodeRemoveState.STOP);
-    AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
-    TSStatus status =
-        SyncDataNodeClientPool.getInstance()
-            .sendSyncRequestToDataNodeWithRetry(
-                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
-    LOGGER.info("stop Data Node {} result: {}", dataNode, status);
-    return status;
-  }
-
-  /** stop the manager */
-  public void stop() {
-    stopped = true;
-    if (waitLeaderThread != null) {
-      waitLeaderThread.interrupt();
-    }
-    if (workThread != null) {
-      workThread.interrupt();
-    }
-    LOGGER.info("Data Node remove service is stopped");
-  }
-
-  public void setConfigManager(ConfigManager configManager) {
-    this.configManager = configManager;
-  }
-
-  /**
-   * check if the remove datanode request illegal
-   *
-   * @param removeDataNodePlan RemoveDataNodeReq
-   * @return SUCCEED_STATUS when request is legal.
-   */
-  public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) {
-    DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
-    dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
-    TSStatus status = checkRegionReplication(removeDataNodePlan);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataSet.setStatus(status);
-      return dataSet;
-    }
-
-    status = checkRequestLimit();
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataSet.setStatus(status);
-      return dataSet;
-    }
-
-    status = checkDataNodeExist(removeDataNodePlan);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataSet.setStatus(status);
-      return dataSet;
-    }
-
-    status = checkDuplicateRequest(removeDataNodePlan);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataSet.setStatus(status);
-      return dataSet;
-    }
-
-    status = checkDuplicateDataNodeAcrossRequests(removeDataNodePlan);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      dataSet.setStatus(status);
-      return dataSet;
-    }
-    return dataSet;
-  }
-
-  /**
-   * check if request exceed threshold
-   *
-   * @return SUCCEED_STATUS if not exceed threshold
-   */
-  private TSStatus checkRequestLimit() {
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    if (removeQueue.size() >= QUEUE_SIZE_LIMIT) {
-      status.setCode(TSStatusCode.REQUEST_SIZE_EXCEED.getStatusCode());
-      status.setMessage("remove Data Node request exceed threshold, reject this request");
-    }
-    return status;
-  }
-
-  /**
-   * check if the request repeat
-   *
-   * @param removeDataNodePlan RemoveDataNodeReq
-   * @return SUCCEED_STATUS if not repeat
-   */
-  private TSStatus checkDuplicateRequest(RemoveDataNodePlan removeDataNodePlan) {
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    if (removeQueue.contains(removeDataNodePlan)) {
-      status.setCode(TSStatusCode.DUPLICATE_REMOVE.getStatusCode());
-      status.setMessage(
-          "the remove datanode request is duplicate, wait the last same request finished");
-    }
-    return status;
-  }
-
-  /**
-   * check if has same Data Node amount different request
-   *
-   * @param removeDataNodePlan RemoveDataNodeReq
-   * @return SUCCEED_STATUS if not has
-   */
-  private TSStatus checkDuplicateDataNodeAcrossRequests(RemoveDataNodePlan removeDataNodePlan) {
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    boolean hasDuplicateDataNodeAcrossRequests =
-        removeQueue.stream()
-            .map(RemoveDataNodePlan::getDataNodeLocations)
-            .anyMatch(loc -> removeDataNodePlan.getDataNodeLocations().contains(loc));
-    if (hasDuplicateDataNodeAcrossRequests) {
-      TSStatus dataNodeDuplicate = new TSStatus(TSStatusCode.DUPLICATE_REMOVE.getStatusCode());
-      dataNodeDuplicate.setMessage(
-          "there exist duplicate Data Node between this request and other requests, can't run");
-    }
-    return status;
-  }
-
-  /**
-   * check if has removed Data Node but not exist in cluster
-   *
-   * @param removeDataNodePlan RemoveDataNodeReq
-   * @return SUCCEED_STATUS if not has
-   */
-  private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) {
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-
-    List<TDataNodeLocation> allDataNodes =
-        configManager.getNodeManager().getRegisteredDataNodes(-1).stream()
-            .map(TDataNodeConfiguration::getLocation)
-            .collect(Collectors.toList());
-    boolean hasNotExistNode =
-        removeDataNodePlan.getDataNodeLocations().stream()
-            .anyMatch(loc -> !allDataNodes.contains(loc));
-    if (hasNotExistNode) {
-      status.setCode(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode());
-      status.setMessage("there exist Data Node in request but not in cluster");
-    }
-    return status;
-  }
-
-  /**
-   * check if has enought replication in cluster
-   *
-   * @param removeDataNodePlan RemoveDataNodeReq
-   * @return SUCCEED_STATUS if not has
-   */
-  private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) {
-    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    int removedDataNodeSize = removeDataNodePlan.getDataNodeLocations().size();
-    int allDataNodeSize = configManager.getNodeManager().getRegisteredDataNodeCount();
-    if (allDataNodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
-      status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode());
-      status.setMessage(
-          "lack replication, allow most removed Data Node size : "
-              + (allDataNodeSize - NodeInfo.getMinimumDataNode()));
-    }
-    return status;
-  }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 42055dd18a..79da434580 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -110,11 +110,11 @@ public interface IManager {
   UDFManager getUDFManager();
 
   /**
-   * Get DataNodeRemoveManager
+   * Get ProcedureManager
    *
-   * @return DataNodeRemoveManager instance
+   * @return ProcedureManager instance
    */
-  DataNodeRemoveManager getDataNodeRemoveManager();
+  ProcedureManager getProcedureManager();
 
   /**
    * Register DataNode
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index cf82d79aaf..03219060df 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
 import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.persistence.NodeInfo;
+import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
@@ -60,7 +61,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -139,8 +139,11 @@ public class NodeManager {
    */
   public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
     LOGGER.info("Node manager start to remove DataNode {}", removeDataNodePlan);
+
+    DataNodeRemoveHandler dataNodeRemoveHandler =
+        new DataNodeRemoveHandler((ConfigManager) configManager);
     DataNodeToStatusResp preCheckStatus =
-        configManager.getDataNodeRemoveManager().checkRemoveDataNodeRequest(removeDataNodePlan);
+        dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
     if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.error(
           "the remove Data Node request check failed.  req: {}, check result: {}",
@@ -151,7 +154,7 @@ public class NodeManager {
     // if add request to queue, then return to client
     DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
     boolean registerSucceed =
-        configManager.getDataNodeRemoveManager().registerRequest(removeDataNodePlan);
+        configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
     TSStatus status;
     if (registerSucceed) {
       status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
@@ -227,25 +230,6 @@ public class NodeManager {
     }
     return dataNodesLocations;
   }
-
-  /**
-   * get data node remove request queue
-   *
-   * @return LinkedBlockingQueue
-   */
-  public LinkedBlockingQueue<RemoveDataNodePlan> getDataNodeRemoveRequestQueue() {
-    return nodeInfo.getDataNodeRemoveRequestQueue();
-  }
-
-  /**
-   * get head data node remove request
-   *
-   * @return RemoveDataNodeReq
-   */
-  public RemoveDataNodePlan getHeadRequestForDataNodeRemove() {
-    return nodeInfo.getHeadRequestForDataNodeRemove();
-  }
-
   /**
    * Provides ConfigNodeGroup information for the newly registered ConfigNode
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index bae3062c4d..7ed3527ab3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -24,19 +24,23 @@ import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
 import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
 import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.rpc.RpcUtils;
 
@@ -134,6 +138,23 @@ public class ProcedureManager {
     LOGGER.info("Submit to remove ConfigNode, {}", removeConfigNodePlan);
   }
 
+  /**
+   * generate a procedure, and execute remove datanode one by one
+   *
+   * @param removeDataNodePlan
+   * @return
+   */
+  public boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
+    removeDataNodePlan
+        .getDataNodeLocations()
+        .forEach(
+            tDataNodeLocation -> {
+              this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
+              LOGGER.info("Submit to remove data node procedure, {}", tDataNodeLocation);
+            });
+    return true;
+  }
+
   private static boolean getProcedureStatus(
       ProcedureExecutor executor, List<Long> procIds, List<TSStatus> statusList) {
     boolean isSucceed = true;
@@ -215,4 +236,17 @@ public class ProcedureManager {
   public void setEnv(ConfigNodeProcedureEnv env) {
     this.env = env;
   }
+
+  public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
+    this.executor.getProcedures().values().stream()
+        .forEach(
+            procedure -> {
+              if (procedure instanceof RegionMigrateProcedure) {
+                RegionMigrateProcedure regionMigrateProcedure = (RegionMigrateProcedure) procedure;
+                if (regionMigrateProcedure.getConsensusGroupId().equals(req.getRegionId())) {
+                  regionMigrateProcedure.notifyTheRegionMigrateFinished();
+                }
+              }
+            });
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index c996131e23..f62ac14fe9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -19,12 +19,9 @@
 package org.apache.iotdb.confignode.persistence;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.enums.DataNodeRemoveState;
-import org.apache.iotdb.commons.enums.RegionMigrateState;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -35,7 +32,6 @@ import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveConfigNodePlan;
 import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
 import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
 import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -68,7 +64,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -99,15 +94,12 @@ public class NodeInfo implements SnapshotProcessor {
   // TODO: implement
   private final Set<TDataNodeLocation> drainingDataNodes = new HashSet<>();
 
-  private final RemoveNodeInfo removeNodeInfo;
-
   private final String snapshotFileName = "node_info.bin";
 
   public NodeInfo() {
     this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
     this.configNodeInfoReadWriteLock = new ReentrantReadWriteLock();
     this.registeredConfigNodes = new HashSet<>();
-    removeNodeInfo = new RemoveNodeInfo();
   }
 
   public void addMetrics() {
@@ -197,30 +189,38 @@ public class NodeInfo implements SnapshotProcessor {
   }
 
   /**
-   * Persist Infomation about remove dataNode
+   * Persist Information about remove dataNode
    *
    * @param req RemoveDataNodeReq
    * @return TSStatus
    */
   public TSStatus removeDataNode(RemoveDataNodePlan req) {
-    TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    removeNodeInfo.removeDataNode(req);
-    return result;
+    try {
+      dataNodeInfoReadWriteLock.writeLock().lock();
+      req.getDataNodeLocations()
+          .forEach(
+              removeDataNodes -> {
+                registeredDataNodes.remove(removeDataNodes.getDataNodeId());
+              });
+    } finally {
+      dataNodeInfoReadWriteLock.writeLock().unlock();
+    }
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   /**
    * Get DataNode info
    *
-   * @param getDataNodeConfigurationPlan QueryDataNodeInfoPlan
+   * @param getDataNodeInfoPlan QueryDataNodeInfoPlan
    * @return The specific DataNode's info or all DataNode info if dataNodeId in
    *     QueryDataNodeInfoPlan is -1
    */
   public DataNodeConfigurationResp getDataNodeInfo(
-      GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
+      GetDataNodeConfigurationPlan getDataNodeInfoPlan) {
     DataNodeConfigurationResp result = new DataNodeConfigurationResp();
     result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
 
-    int dataNodeId = getDataNodeConfigurationPlan.getDataNodeId();
+    int dataNodeId = getDataNodeInfoPlan.getDataNodeId();
     dataNodeInfoReadWriteLock.readLock().lock();
     try {
       if (dataNodeId == -1) {
@@ -392,8 +392,6 @@ public class NodeInfo implements SnapshotProcessor {
 
       serializeDrainingDataNodes(fileOutputStream, protocol);
 
-      removeNodeInfo.serializeRemoveNodeInfo(fileOutputStream, protocol);
-
       fileOutputStream.flush();
 
       fileOutputStream.close();
@@ -467,8 +465,6 @@ public class NodeInfo implements SnapshotProcessor {
 
       deserializeDrainingDataNodes(fileInputStream, protocol);
 
-      removeNodeInfo.deserializeRemoveNodeInfo(fileInputStream, protocol);
-
     } finally {
       configNodeInfoReadWriteLock.writeLock().unlock();
       dataNodeInfoReadWriteLock.writeLock().unlock();
@@ -534,162 +530,5 @@ public class NodeInfo implements SnapshotProcessor {
     registeredDataNodes.clear();
     drainingDataNodes.clear();
     registeredConfigNodes.clear();
-    removeNodeInfo.clear();
-  }
-
-  /**
-   * get data node remove request queue
-   *
-   * @return LinkedBlockingQueue
-   */
-  public LinkedBlockingQueue<RemoveDataNodePlan> getDataNodeRemoveRequestQueue() {
-    return removeNodeInfo.getDataNodeRemoveRequestQueue();
-  }
-
-  /**
-   * get head data node remove request
-   *
-   * @return RemoveDataNodeReq
-   */
-  public RemoveDataNodePlan getHeadRequestForDataNodeRemove() {
-    return removeNodeInfo.getHeadRequest();
-  }
-
-  /** storage remove Data Node request Info */
-  private class RemoveNodeInfo {
-    private LinkedBlockingQueue<RemoveDataNodePlan> dataNodeRemoveRequestQueue =
-        new LinkedBlockingQueue<>();
-
-    // which request is running
-    private RemoveDataNodePlan headRequest = null;
-
-    public RemoveNodeInfo() {}
-
-    private void removeDataNode(RemoveDataNodePlan req) {
-      if (!dataNodeRemoveRequestQueue.contains(req)) {
-        dataNodeRemoveRequestQueue.add(req);
-      } else {
-        updateRemoveState(req);
-      }
-      LOGGER.info("request detail: {}", req);
-    }
-
-    private void removeSoppedDDataNode(TDataNodeLocation node) {
-      try {
-        dataNodeInfoReadWriteLock.writeLock().lock();
-        registeredDataNodes.remove(node.getDataNodeId());
-      } finally {
-        dataNodeInfoReadWriteLock.writeLock().unlock();
-      }
-    }
-
-    private void updateRemoveState(RemoveDataNodePlan req) {
-      if (!req.isUpdate()) {
-        LOGGER.warn("request is not in update status: {}", req);
-        return;
-      }
-      this.headRequest = req;
-
-      if (req.getExecDataNodeState() == DataNodeRemoveState.STOP) {
-        // headNodeState = DataNodeRemoveState.STOP;
-        int headNodeIndex = req.getExecDataNodeIndex();
-        TDataNodeLocation stopNode = req.getDataNodeLocations().get(headNodeIndex);
-        removeSoppedDDataNode(stopNode);
-        LOGGER.info(
-            "the Data Node {} remove succeed, now the registered Data Node size: {}",
-            stopNode.getInternalEndPoint(),
-            registeredDataNodes.size());
-      }
-
-      if (req.isFinished()) {
-        this.dataNodeRemoveRequestQueue.remove(req);
-        this.headRequest = null;
-      }
-    }
-
-    private void serializeRemoveNodeInfo(OutputStream outputStream, TProtocol protocol)
-        throws IOException, TException {
-      // request queue
-      ReadWriteIOUtils.write(dataNodeRemoveRequestQueue.size(), outputStream);
-      for (RemoveDataNodePlan req : dataNodeRemoveRequestQueue) {
-        TDataNodeRemoveReq tReq = new TDataNodeRemoveReq(req.getDataNodeLocations());
-        tReq.write(protocol);
-      }
-      // -1 means headRequest is null,  1 means headRequest is not null
-      if (headRequest == null) {
-        ReadWriteIOUtils.write(-1, outputStream);
-        return;
-      }
-
-      ReadWriteIOUtils.write(1, outputStream);
-      TDataNodeRemoveReq tHeadReq = new TDataNodeRemoveReq(headRequest.getDataNodeLocations());
-      tHeadReq.write(protocol);
-
-      ReadWriteIOUtils.write(headRequest.getExecDataNodeIndex(), outputStream);
-      ReadWriteIOUtils.write(headRequest.getExecDataNodeState().getCode(), outputStream);
-
-      ReadWriteIOUtils.write(headRequest.getExecDataNodeRegionIds().size(), outputStream);
-      for (TConsensusGroupId regionId : headRequest.getExecDataNodeRegionIds()) {
-        regionId.write(protocol);
-      }
-      ReadWriteIOUtils.write(headRequest.getExecRegionIndex(), outputStream);
-      ReadWriteIOUtils.write(headRequest.getExecRegionState().getCode(), outputStream);
-    }
-
-    private void deserializeRemoveNodeInfo(InputStream inputStream, TProtocol protocol)
-        throws IOException, TException {
-      int queueSize = ReadWriteIOUtils.readInt(inputStream);
-      dataNodeRemoveRequestQueue = new LinkedBlockingQueue<>();
-      for (int i = 0; i < queueSize; i++) {
-        TDataNodeRemoveReq tReq = new TDataNodeRemoveReq();
-        tReq.read(protocol);
-        dataNodeRemoveRequestQueue.add(new RemoveDataNodePlan(tReq.getDataNodeLocations()));
-      }
-      boolean headRequestExist = ReadWriteIOUtils.readInt(inputStream) == 1;
-      if (!headRequestExist) {
-        headRequest = null;
-        return;
-      }
-
-      TDataNodeRemoveReq tHeadReq = new TDataNodeRemoveReq();
-      tHeadReq.read(protocol);
-      headRequest = new RemoveDataNodePlan(tHeadReq.getDataNodeLocations());
-      headRequest.setUpdate(true);
-      headRequest.setFinished(false);
-
-      int headNodeIndex = ReadWriteIOUtils.readInt(inputStream);
-      DataNodeRemoveState headNodeState =
-          DataNodeRemoveState.getStateByCode(ReadWriteIOUtils.readInt(inputStream));
-      headRequest.setExecDataNodeIndex(headNodeIndex);
-      headRequest.setExecDataNodeState(headNodeState);
-
-      int headNodeRegionSize = ReadWriteIOUtils.readInt(inputStream);
-      List<TConsensusGroupId> headNodeRegionIds = new ArrayList<>();
-      for (int i = 0; i < headNodeRegionSize; i++) {
-        TConsensusGroupId regionId = new TConsensusGroupId();
-        regionId.read(protocol);
-        headNodeRegionIds.add(regionId);
-      }
-      headRequest.setExecDataNodeRegionIds(headNodeRegionIds);
-
-      int headRegionIndex = ReadWriteIOUtils.readInt(inputStream);
-      RegionMigrateState headRegionState =
-          RegionMigrateState.getStateByCode(ReadWriteIOUtils.readInt(inputStream));
-      headRequest.setExecRegionIndex(headRegionIndex);
-      headRequest.setExecRegionState(headRegionState);
-    }
-
-    private void clear() {
-      dataNodeRemoveRequestQueue.clear();
-      headRequest = null;
-    }
-
-    public LinkedBlockingQueue<RemoveDataNodePlan> getDataNodeRemoveRequestQueue() {
-      return dataNodeRemoveRequestQueue;
-    }
-
-    public RemoveDataNodePlan getHeadRequest() {
-      return headRequest;
-    }
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index ff3ff0fe8b..ba316a95da 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroup
 import org.apache.iotdb.confignode.exception.AddPeerException;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -49,13 +50,17 @@ public class ConfigNodeProcedureEnv {
 
   private static final Logger LOG = LoggerFactory.getLogger(ConfigNodeProcedureEnv.class);
 
-  /** add and remove config node lock */
-  private final ReentrantLock configNodeLock = new ReentrantLock();
+  /** add or remove node lock */
+  private final LockQueue nodeLock = new LockQueue();
+
+  private final ReentrantLock schedulerLock = new ReentrantLock();
 
   private final ConfigManager configManager;
 
   private final ProcedureScheduler scheduler;
 
+  private final DataNodeRemoveHandler dataNodeRemoveHandler;
+
   private static boolean skipForTest = false;
 
   private static boolean invalidCacheResult = true;
@@ -71,6 +76,7 @@ public class ConfigNodeProcedureEnv {
   public ConfigNodeProcedureEnv(ConfigManager configManager, ProcedureScheduler scheduler) {
     this.configManager = configManager;
     this.scheduler = scheduler;
+    this.dataNodeRemoveHandler = new DataNodeRemoveHandler(configManager);
   }
 
   public ConfigManager getConfigManager() {
@@ -245,11 +251,23 @@ public class ConfigNodeProcedureEnv {
             ConfigNodeRequestType.NOTIFY_REGISTER_SUCCESS);
   }
 
-  public ReentrantLock getConfigNodeLock() {
-    return configNodeLock;
+  public LockQueue getNodeLock() {
+    return nodeLock;
   }
 
   public ProcedureScheduler getScheduler() {
     return scheduler;
   }
+
+  public LockQueue getRegionMigrateLock() {
+    return dataNodeRemoveHandler.getRegionMigrateLock();
+  }
+
+  public ReentrantLock getSchedulerLock() {
+    return schedulerLock;
+  }
+
+  public DataNodeRemoveHandler getDataNodeRemoveHandler() {
+    return dataNodeRemoveHandler;
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
new file mode 100644
index 0000000000..4663ed47d3
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.procedure.env;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
+import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
+import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.NodeInfo;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class DataNodeRemoveHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class);
+
+  private ConfigManager configManager;
+
+  /** region migrate lock */
+  private final LockQueue regionMigrateLock = new LockQueue();
+
+  public DataNodeRemoveHandler(ConfigManager configManager) {
+    this.configManager = configManager;
+  }
+
+  /**
+   * Get all consensus group id in this node
+   *
+   * @param dataNodeLocation data node location
+   * @return group id list
+   */
+  public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) {
+    return configManager.getPartitionManager().getAllReplicaSets().stream()
+        .filter(
+            rg ->
+                rg.getDataNodeLocations().contains(dataNodeLocation)
+                    && rg.regionId.getType() != TConsensusGroupType.PartitionRegion)
+        .map(TRegionReplicaSet::getRegionId)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * broadcast these datanode in RemoveDataNodeReq are disabled, so they will not accept read/write
+   * request
+   *
+   * @param disabledDataNode TDataNodeLocation
+   */
+  public TSStatus broadcastDisableDataNode(TDataNodeLocation disabledDataNode) {
+    LOGGER.info(
+        "DataNodeRemoveService start send disable the Data Node to cluster, {}", disabledDataNode);
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    List<TEndPoint> otherOnlineDataNodes =
+        configManager.getLoadManager().getOnlineDataNodes(-1).stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .filter(loc -> !loc.equals(disabledDataNode))
+            .map(TDataNodeLocation::getInternalEndPoint)
+            .collect(Collectors.toList());
+
+    for (TEndPoint server : otherOnlineDataNodes) {
+      TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
+      status =
+          SyncDataNodeClientPool.getInstance()
+              .sendSyncRequestToDataNodeWithRetry(
+                  server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE);
+      if (!isSucceed(status)) {
+        return status;
+      }
+    }
+    LOGGER.info(
+        "DataNodeRemoveService finished send disable the Data Node to cluster, {}",
+        disabledDataNode);
+    status.setMessage("Succeed disable the Data Node from cluster");
+    return status;
+  }
+
+  /**
+   * Find dest data node
+   *
+   * @param regionId region id
+   * @return dest data node location
+   */
+  public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
+    TSStatus status;
+    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    if (regionReplicaNodes.isEmpty()) {
+      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage("not find region replica nodes, region: " + regionId);
+      return null;
+    }
+
+    // will migrate the region to the new node, which should not be same raft
+    Optional<TDataNodeLocation> newNode = pickNewReplicaNodeForRegion(regionReplicaNodes);
+    if (!newNode.isPresent()) {
+      LOGGER.warn("No enough Data node to migrate region: {}", regionId);
+    }
+    return newNode.get();
+  }
+
+  /**
+   * Send to DataNode, migrate region from originalDataNode to destDataNode
+   *
+   * @param originalDataNode old location data node
+   * @param destDataNode dest data node
+   * @param regionId region id
+   * @return migrate status
+   */
+  public TSStatus migrateRegion(
+      TDataNodeLocation originalDataNode,
+      TDataNodeLocation destDataNode,
+      TConsensusGroupId regionId) {
+    TSStatus status;
+    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    if (regionReplicaNodes.isEmpty()) {
+      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage("not find region replica nodes, region: " + regionId);
+      return status;
+    }
+
+    // TODO if region replica is 1, the new leader is null, it also need to migrate
+    Optional<TDataNodeLocation> newLeaderNode =
+        regionReplicaNodes.stream().filter(e -> !e.equals(originalDataNode)).findAny();
+    if (!newLeaderNode.isPresent()) {
+      LOGGER.warn(
+          "No other Node to change region leader, check by show regions, region: {}", regionId);
+      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage("No other Node to change region leader, check by show regions");
+      return status;
+    }
+
+    TMigrateRegionReq migrateRegionReq =
+        new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
+    migrateRegionReq.setNewLeaderNode(newLeaderNode.get());
+    status =
+        SyncDataNodeClientPool.getInstance()
+            .sendSyncRequestToDataNodeWithRetry(
+                originalDataNode.getInternalEndPoint(),
+                migrateRegionReq,
+                DataNodeRequestType.MIGRATE_REGION);
+    LOGGER.debug(
+        "send region {} migrate action to {}, wait it finished", regionId, originalDataNode);
+    return status;
+  }
+
+  /**
+   * Update region location cache
+   *
+   * @param regionId region id
+   * @param originalDataNode old location data node
+   * @param destDataNode dest data node
+   */
+  public void updateRegionLocationCache(
+      TConsensusGroupId regionId,
+      TDataNodeLocation originalDataNode,
+      TDataNodeLocation destDataNode) {
+    LOGGER.debug(
+        "start to update region {} location from {} to {} when it migrate succeed",
+        regionId,
+        originalDataNode.getInternalEndPoint().getIp(),
+        destDataNode.getInternalEndPoint().getIp());
+    UpdateRegionLocationPlan req =
+        new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
+    TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
+    LOGGER.info(
+        "update region {} location finished, result:{}, old:{}, new:{}",
+        regionId,
+        status,
+        originalDataNode.getInternalEndPoint().getIp(),
+        destDataNode.getInternalEndPoint().getIp());
+  }
+
+  /**
+   * Find region replication Nodes
+   *
+   * @param regionId region id
+   * @return data node location
+   */
+  public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
+    List<TRegionReplicaSet> regionReplicaSets =
+        configManager.getPartitionManager().getAllReplicaSets().stream()
+            .filter(rg -> rg.regionId.equals(regionId))
+            .collect(Collectors.toList());
+    if (regionReplicaSets.isEmpty()) {
+      LOGGER.warn("not find TRegionReplica for region: {}", regionId);
+      return Collections.emptyList();
+    }
+
+    return regionReplicaSets.get(0).getDataNodeLocations();
+  }
+
+  private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
+      List<TDataNodeLocation> regionReplicaNodes) {
+    return configManager.getLoadManager().getOnlineDataNodes(-1).stream()
+        .map(TDataNodeConfiguration::getLocation)
+        .filter(e -> !regionReplicaNodes.contains(e))
+        .findAny();
+  }
+
+  /**
+   * add region Consensus group in new node
+   *
+   * @param regionId region id
+   * @param destDataNode dest data node
+   * @return status
+   */
+  public TSStatus addNewNodeToRegionConsensusGroup(
+      TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
+    TSStatus status;
+    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    if (regionReplicaNodes.isEmpty()) {
+      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+      status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+      status.setMessage("not find region replica nodes, region: " + regionId);
+      return status;
+    }
+
+    String storageGroup = configManager.getPartitionManager().getRegionStorageGroup(regionId);
+    status =
+        SyncDataNodeClientPool.getInstance()
+            .addToRegionConsensusGroup(
+                // TODO replace with real ttl
+                regionReplicaNodes, regionId, destDataNode, storageGroup, Long.MAX_VALUE);
+    LOGGER.debug("send add region {} consensus group to {}", regionId, destDataNode);
+    if (isFailed(status)) {
+      LOGGER.error(
+          "add new node {} to region {} consensus group failed,  result: {}",
+          destDataNode,
+          regionId,
+          status);
+    }
+    return status;
+  }
+
+  private boolean isSucceed(TSStatus status) {
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
+  }
+
+  private boolean isFailed(TSStatus status) {
+    return !isSucceed(status);
+  }
+
+  /**
+   * Stop old data node
+   *
+   * @param dataNode old data node
+   * @return status
+   * @throws ProcedureException procedure exception
+   */
+  public TSStatus stopDataNode(TDataNodeLocation dataNode) throws ProcedureException {
+    LOGGER.info("begin to stop Data Node {}", dataNode);
+    AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
+    TSStatus status =
+        SyncDataNodeClientPool.getInstance()
+            .sendSyncRequestToDataNodeWithRetry(
+                dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
+    LOGGER.info("stop Data Node {} result: {}", dataNode, status);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ProcedureException("Failed to stop data node");
+    }
+    return status;
+  }
+
+  /**
+   * check if the remove datanode request illegal
+   *
+   * @param removeDataNodePlan RemoveDataNodeReq
+   * @return SUCCEED_STATUS when request is legal.
+   */
+  public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) {
+    DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
+    dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    TSStatus status = checkRegionReplication(removeDataNodePlan);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataSet.setStatus(status);
+      return dataSet;
+    }
+
+    status = checkDataNodeExist(removeDataNodePlan);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      dataSet.setStatus(status);
+      return dataSet;
+    }
+
+    return dataSet;
+  }
+
+  /**
+   * check if has removed Data Node but not exist in cluster
+   *
+   * @param removeDataNodePlan RemoveDataNodeReq
+   * @return SUCCEED_STATUS if not has
+   */
+  private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) {
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+
+    List<TDataNodeLocation> allDataNodes =
+        configManager.getNodeManager().getRegisteredDataNodes(-1).stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toList());
+    boolean hasNotExistNode =
+        removeDataNodePlan.getDataNodeLocations().stream()
+            .anyMatch(loc -> !allDataNodes.contains(loc));
+    if (hasNotExistNode) {
+      status.setCode(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode());
+      status.setMessage("there exist Data Node in request but not in cluster");
+    }
+    return status;
+  }
+
+  /**
+   * check if has enought replication in cluster
+   *
+   * @param removeDataNodePlan RemoveDataNodeReq
+   * @return SUCCEED_STATUS if not has
+   */
+  private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) {
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    int removedDataNodeSize = removeDataNodePlan.getDataNodeLocations().size();
+    int allDataNodeSize = configManager.getNodeManager().getRegisteredDataNodeCount();
+    if (allDataNodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
+      status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode());
+      status.setMessage(
+          "lack replication, allow most removed Data Node size : "
+              + (allDataNodeSize - NodeInfo.getMinimumDataNode()));
+    }
+    return status;
+  }
+
+  public LockQueue getRegionMigrateLock() {
+    return regionMigrateLock;
+  }
+
+  /**
+   * Remove data node in node info
+   *
+   * @param tDataNodeLocation data node location
+   */
+  public void removeDataNodePersistence(TDataNodeLocation tDataNodeLocation) {
+    List<TDataNodeLocation> removeDataNodes = new ArrayList<>();
+    removeDataNodes.add(tDataNodeLocation);
+    configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java
new file mode 100644
index 0000000000..15a256143e
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AbstractNodeProcedure.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Node procedure */
+public abstract class AbstractNodeProcedure<TState>
+    extends StateMachineProcedure<ConfigNodeProcedureEnv, TState> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeProcedure.class);
+
+  @Override
+  protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    configNodeProcedureEnv.getSchedulerLock().lock();
+    try {
+      if (configNodeProcedureEnv.getNodeLock().tryLock(this)) {
+        LOG.info("{} acquire lock.", getProcId());
+        return ProcedureLockState.LOCK_ACQUIRED;
+      }
+      configNodeProcedureEnv.getNodeLock().waitProcedure(this);
+      LOG.info("{} wait for lock.", getProcId());
+      return ProcedureLockState.LOCK_EVENT_WAIT;
+    } finally {
+      configNodeProcedureEnv.getSchedulerLock().unlock();
+    }
+  }
+
+  @Override
+  protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    configNodeProcedureEnv.getSchedulerLock().lock();
+    try {
+      LOG.info("{} release lock.", getProcId());
+      if (configNodeProcedureEnv.getNodeLock().releaseLock(this)) {
+        configNodeProcedureEnv
+            .getNodeLock()
+            .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler());
+      }
+    } finally {
+      configNodeProcedureEnv.getSchedulerLock().unlock();
+    }
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
index dd544c9815..7edda907cc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/AddConfigNodeProcedure.java
@@ -22,12 +22,9 @@ package org.apache.iotdb.confignode.procedure.impl;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.state.AddConfigNodeState;
-import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 
 import org.slf4j.Logger;
@@ -38,8 +35,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /** add config node procedure */
-public class AddConfigNodeProcedure
-    extends StateMachineProcedure<ConfigNodeProcedureEnv, AddConfigNodeState> {
+public class AddConfigNodeProcedure extends AbstractNodeProcedure<AddConfigNodeState> {
   private static final Logger LOG = LoggerFactory.getLogger(AddConfigNodeProcedure.class);
   private static final int retryThreshold = 5;
 
@@ -121,33 +117,6 @@ public class AddConfigNodeProcedure
     return false;
   }
 
-  @Override
-  protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    if (configNodeProcedureEnv.getConfigNodeLock().tryLock()) {
-      LOG.info("{} acquire lock.", getProcId());
-      return ProcedureLockState.LOCK_ACQUIRED;
-    }
-    SimpleProcedureScheduler simpleProcedureScheduler =
-        (SimpleProcedureScheduler) configNodeProcedureEnv.getScheduler();
-    simpleProcedureScheduler.addWaiting(this);
-    LOG.info("{} wait for lock.", getProcId());
-    return ProcedureLockState.LOCK_EVENT_WAIT;
-  }
-
-  @Override
-  protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    LOG.info("{} release lock.", getProcId());
-    configNodeProcedureEnv.getConfigNodeLock().unlock();
-    SimpleProcedureScheduler simpleProcedureScheduler =
-        (SimpleProcedureScheduler) configNodeProcedureEnv.getScheduler();
-    simpleProcedureScheduler.releaseWaiting();
-  }
-
-  @Override
-  protected boolean holdLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    return configNodeProcedureEnv.getConfigNodeLock().isHeldByCurrentThread();
-  }
-
   @Override
   protected AddConfigNodeState getState(int stateId) {
     return AddConfigNodeState.values()[stateId];
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
new file mode 100644
index 0000000000..9a8c13505f
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RegionMigrateProcedure.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
+import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** region migrate procedure */
+public class RegionMigrateProcedure
+    extends StateMachineProcedure<ConfigNodeProcedureEnv, RegionTransitionState> {
+  private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class);
+  private static final int retryThreshold = 5;
+
+  /** Wait region migrate finished */
+  private final Object regionMigrateLock = new Object();
+
+  private TConsensusGroupId consensusGroupId;
+
+  private TDataNodeLocation originalDataNode;
+
+  private TDataNodeLocation destDataNode;
+
+  public RegionMigrateProcedure() {
+    super();
+  }
+
+  public RegionMigrateProcedure(
+      TConsensusGroupId consensusGroupId,
+      TDataNodeLocation originalDataNode,
+      TDataNodeLocation destDataNode) {
+    super();
+    this.consensusGroupId = consensusGroupId;
+    this.originalDataNode = originalDataNode;
+    this.destDataNode = destDataNode;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, RegionTransitionState state) {
+    if (consensusGroupId == null) {
+      return Flow.NO_MORE_STATE;
+    }
+    try {
+      switch (state) {
+        case REGION_MIGRATE_PREPARE:
+          setNextState(RegionTransitionState.ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP);
+          break;
+        case ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP:
+          env.getDataNodeRemoveHandler()
+              .addNewNodeToRegionConsensusGroup(consensusGroupId, destDataNode);
+          setNextState(RegionTransitionState.MIGRATE_REGION);
+          break;
+        case MIGRATE_REGION:
+          env.getDataNodeRemoveHandler()
+              .migrateRegion(originalDataNode, destDataNode, consensusGroupId);
+          setNextState(RegionTransitionState.WAIT_FOR_REGION_MIGRATE_FINISHED);
+          break;
+        case WAIT_FOR_REGION_MIGRATE_FINISHED:
+          waitForTheRegionMigrateFinished(consensusGroupId);
+          setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE);
+          LOG.info("Wait for region migrate finished");
+          break;
+        case UPDATE_REGION_LOCATION_CACHE:
+          env.getDataNodeRemoveHandler()
+              .updateRegionLocationCache(consensusGroupId, originalDataNode, destDataNode);
+          return Flow.NO_MORE_STATE;
+      }
+    } catch (Exception e) {
+      if (isRollbackSupported(state)) {
+        setFailure(new ProcedureException("Region migrate failed " + state));
+      } else {
+        LOG.error(
+            "Retrievable error trying to region migrate {}, state {}", originalDataNode, state, e);
+        if (getCycles() > retryThreshold) {
+          setFailure(new ProcedureException("State stuck at " + state));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, RegionTransitionState state)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  protected boolean isRollbackSupported(RegionTransitionState state) {
+    return false;
+  }
+
+  @Override
+  protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    configNodeProcedureEnv.getSchedulerLock().lock();
+    try {
+      if (configNodeProcedureEnv.getRegionMigrateLock().tryLock(this)) {
+        LOG.info("{} acquire lock.", getProcId());
+        return ProcedureLockState.LOCK_ACQUIRED;
+      }
+      configNodeProcedureEnv.getRegionMigrateLock().waitProcedure(this);
+
+      LOG.info("{} wait for lock.", getProcId());
+      return ProcedureLockState.LOCK_EVENT_WAIT;
+    } finally {
+      configNodeProcedureEnv.getSchedulerLock().unlock();
+    }
+  }
+
+  @Override
+  protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    configNodeProcedureEnv.getSchedulerLock().lock();
+    try {
+      LOG.info("{} release lock.", getProcId());
+      if (configNodeProcedureEnv.getRegionMigrateLock().releaseLock(this)) {
+        configNodeProcedureEnv
+            .getRegionMigrateLock()
+            .wakeWaitingProcedures(configNodeProcedureEnv.getScheduler());
+      }
+    } finally {
+      configNodeProcedureEnv.getSchedulerLock().unlock();
+    }
+  }
+
+  @Override
+  protected RegionTransitionState getState(int stateId) {
+    return RegionTransitionState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(RegionTransitionState regionTransitionState) {
+    return regionTransitionState.ordinal();
+  }
+
+  @Override
+  protected RegionTransitionState getInitialState() {
+    return RegionTransitionState.REGION_MIGRATE_PREPARE;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeInt(ProcedureFactory.ProcedureType.REGION_MIGRATE_PROCEDURE.ordinal());
+    super.serialize(stream);
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(originalDataNode, stream);
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(destDataNode, stream);
+    ThriftCommonsSerDeUtils.serializeTConsensusGroupId(consensusGroupId, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    try {
+      originalDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+      destDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+      consensusGroupId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+    } catch (ThriftSerDeException e) {
+      LOG.error("Error in deserialize RemoveConfigNodeProcedure", e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof RegionMigrateProcedure) {
+      RegionMigrateProcedure thatProc = (RegionMigrateProcedure) that;
+      return thatProc.getProcId() == this.getProcId()
+          && thatProc.getState() == this.getState()
+          && thatProc.originalDataNode.equals(this.originalDataNode)
+          && thatProc.destDataNode.equals(this.destDataNode)
+          && thatProc.consensusGroupId.equals(this.consensusGroupId);
+    }
+    return false;
+  }
+
+  public TSStatus waitForTheRegionMigrateFinished(TConsensusGroupId consensusGroupId) {
+    TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    synchronized (regionMigrateLock) {
+      try {
+        // TODO set timeOut?
+        regionMigrateLock.wait();
+      } catch (InterruptedException e) {
+        LOG.error("region migrate {} interrupt", consensusGroupId, e);
+        Thread.currentThread().interrupt();
+        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
+        status.setMessage("wait region migrate interrupt," + e.getMessage());
+      }
+    }
+    return status;
+  }
+
+  public void notifyTheRegionMigrateFinished() {
+    synchronized (regionMigrateLock) {
+      regionMigrateLock.notify();
+    }
+  }
+
+  public TConsensusGroupId getConsensusGroupId() {
+    return consensusGroupId;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
index d7d3be33db..52f13e4aa6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveConfigNodeProcedure.java
@@ -22,11 +22,8 @@ package org.apache.iotdb.confignode.procedure.impl;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
-import org.apache.iotdb.confignode.procedure.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
-import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import org.apache.iotdb.confignode.procedure.state.RemoveConfigNodeState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 
@@ -38,8 +35,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /** remove config node procedure */
-public class RemoveConfigNodeProcedure
-    extends StateMachineProcedure<ConfigNodeProcedureEnv, RemoveConfigNodeState> {
+public class RemoveConfigNodeProcedure extends AbstractNodeProcedure<RemoveConfigNodeState> {
   private static final Logger LOG = LoggerFactory.getLogger(RemoveConfigNodeProcedure.class);
   private static final int retryThreshold = 5;
 
@@ -105,33 +101,6 @@ public class RemoveConfigNodeProcedure
     return true;
   }
 
-  @Override
-  protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    if (configNodeProcedureEnv.getConfigNodeLock().tryLock()) {
-      LOG.info("{} acquire lock.", getProcId());
-      return ProcedureLockState.LOCK_ACQUIRED;
-    }
-    SimpleProcedureScheduler simpleProcedureScheduler =
-        (SimpleProcedureScheduler) configNodeProcedureEnv.getScheduler();
-    simpleProcedureScheduler.addWaiting(this);
-    LOG.info("{} wait for lock.", getProcId());
-    return ProcedureLockState.LOCK_EVENT_WAIT;
-  }
-
-  @Override
-  protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    LOG.info("{} release lock.", getProcId());
-    configNodeProcedureEnv.getConfigNodeLock().unlock();
-    SimpleProcedureScheduler simpleProcedureScheduler =
-        (SimpleProcedureScheduler) configNodeProcedureEnv.getScheduler();
-    simpleProcedureScheduler.releaseWaiting();
-  }
-
-  @Override
-  protected boolean holdLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    return configNodeProcedureEnv.getConfigNodeLock().isHeldByCurrentThread();
-  }
-
   @Override
   protected RemoveConfigNodeState getState(int stateId) {
     return RemoveConfigNodeState.values()[stateId];
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
new file mode 100644
index 0000000000..14348a3cc1
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/RemoveDataNodeProcedure.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/** remove data node procedure */
+public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNodeState> {
+  private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class);
+  private static final int retryThreshold = 5;
+
+  private TDataNodeLocation tDataNodeLocation;
+
+  private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>();
+
+  public RemoveDataNodeProcedure() {
+    super();
+  }
+
+  public RemoveDataNodeProcedure(TDataNodeLocation tDataNodeLocation) {
+    super();
+    this.tDataNodeLocation = tDataNodeLocation;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) {
+    if (tDataNodeLocation == null) {
+      return Flow.NO_MORE_STATE;
+    }
+    try {
+      switch (state) {
+        case REMOVE_DATA_NODE_PREPARE:
+          execDataNodeRegionIds =
+              env.getDataNodeRemoveHandler().getDataNodeRegionIds(tDataNodeLocation);
+          LOG.info("DataNode region id is {}", execDataNodeRegionIds);
+          setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE);
+          break;
+        case BROADCAST_DISABLE_DATA_NODE:
+          env.getDataNodeRemoveHandler().broadcastDisableDataNode(tDataNodeLocation);
+          setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE);
+          break;
+        case SUBMIT_REGION_MIGRATE:
+          submitChildRegionMigrate(env);
+          setNextState(RemoveDataNodeState.STOP_DATA_NODE);
+          break;
+        case STOP_DATA_NODE:
+          env.getDataNodeRemoveHandler().stopDataNode(tDataNodeLocation);
+          env.getDataNodeRemoveHandler().removeDataNodePersistence(tDataNodeLocation);
+          return Flow.NO_MORE_STATE;
+      }
+    } catch (Exception e) {
+      if (isRollbackSupported(state)) {
+        setFailure(new ProcedureException("Remove Data Node failed " + state));
+      } else {
+        LOG.error(
+            "Retrievable error trying to remove data node {}, state {}",
+            tDataNodeLocation,
+            state,
+            e);
+        if (getCycles() > retryThreshold) {
+          setFailure(new ProcedureException("State stuck at " + state));
+        }
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, RemoveDataNodeState state)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  protected boolean isRollbackSupported(RemoveDataNodeState state) {
+    return false;
+  }
+
+  /**
+   * Used to keep procedure lock even when the procedure is yielded or suspended.
+   *
+   * @param env env
+   * @return true if hold the lock
+   */
+  protected boolean holdLock(ConfigNodeProcedureEnv env) {
+    return true;
+  }
+
+  @Override
+  protected RemoveDataNodeState getState(int stateId) {
+    return RemoveDataNodeState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(RemoveDataNodeState removeDataNodeState) {
+    return removeDataNodeState.ordinal();
+  }
+
+  @Override
+  protected RemoveDataNodeState getInitialState() {
+    return RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeInt(ProcedureFactory.ProcedureType.REMOVE_DATA_NODE_PROCEDURE.ordinal());
+    super.serialize(stream);
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(tDataNodeLocation, stream);
+    stream.writeInt(execDataNodeRegionIds.size());
+    execDataNodeRegionIds.forEach(
+        tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream));
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    try {
+      tDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+      int regionSize = byteBuffer.getInt();
+      execDataNodeRegionIds = new ArrayList<>(regionSize);
+      for (int i = 0; i < regionSize; i++) {
+        execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer));
+      }
+    } catch (ThriftSerDeException e) {
+      LOG.error("Error in deserialize RemoveConfigNodeProcedure", e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof RemoveDataNodeProcedure) {
+      RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that;
+      return thatProc.getProcId() == this.getProcId()
+          && thatProc.getState() == this.getState()
+          && thatProc.tDataNodeLocation.equals(this.tDataNodeLocation);
+    }
+    return false;
+  }
+
+  private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) {
+    execDataNodeRegionIds.forEach(
+        regionId -> {
+          TDataNodeLocation destDataNode =
+              env.getDataNodeRemoveHandler().findDestDataNode(regionId);
+          if (destDataNode != null) {
+            RegionMigrateProcedure regionMigrateProcedure =
+                new RegionMigrateProcedure(regionId, tDataNodeLocation, destDataNode);
+            addChildProcedure(regionMigrateProcedure);
+            LOG.info("Submit child procedure, {}", regionMigrateProcedure);
+          }
+        });
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
new file mode 100644
index 0000000000..6980a2054d
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.scheduler;
+
+import org.apache.iotdb.confignode.procedure.Procedure;
+
+import java.util.ArrayDeque;
+
+/** Lock Queue for procedure of the same type */
+public class LockQueue {
+  private final ArrayDeque<Procedure> deque = new ArrayDeque<>();
+
+  private Procedure<?> lockOwnerProcedure = null;
+
+  public boolean tryLock(Procedure<?> procedure) {
+    if (lockOwnerProcedure == null) {
+      lockOwnerProcedure = procedure;
+      return true;
+    }
+    if (procedure.getProcId() == lockOwnerProcedure.getProcId()) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean releaseLock(Procedure<?> procedure) {
+    if (lockOwnerProcedure == null || lockOwnerProcedure.getProcId() != procedure.getProcId()) {
+      return false;
+    }
+    lockOwnerProcedure = null;
+    return true;
+  }
+
+  public void waitProcedure(Procedure procedure) {
+    deque.addLast(procedure);
+  }
+
+  public int wakeWaitingProcedures(ProcedureScheduler procedureScheduler) {
+    int count = deque.size();
+    while (!deque.isEmpty()) {
+      procedureScheduler.addFront(deque.pollFirst());
+    }
+    deque.clear();
+    return count;
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
new file mode 100644
index 0000000000..0c83cd2411
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum RegionTransitionState {
+  REGION_MIGRATE_PREPARE,
+  ADD_NEW_NODE_TO_REGION_CONSENSUS_GROUP,
+  MIGRATE_REGION,
+  WAIT_FOR_REGION_MIGRATE_FINISHED,
+  UPDATE_REGION_LOCATION_CACHE
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveDataNodeState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveDataNodeState.java
new file mode 100644
index 0000000000..30d2476056
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveDataNodeState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum RemoveDataNodeState {
+  REMOVE_DATA_NODE_PREPARE,
+  BROADCAST_DISABLE_DATA_NODE,
+  SUBMIT_REGION_MIGRATE,
+  STOP_DATA_NODE
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 1b5398b77e..dd28aebfdb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.confignode.procedure.store;
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.impl.AddConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
+import org.apache.iotdb.confignode.procedure.impl.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.impl.RemoveConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.RemoveDataNodeProcedure;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -47,6 +49,12 @@ public class ProcedureFactory implements IProcedureFactory {
       case REMOVE_CONFIG_NODE_PROCEDURE:
         procedure = new RemoveConfigNodeProcedure();
         break;
+      case REMOVE_DATA_NODE_PROCEDURE:
+        procedure = new RemoveDataNodeProcedure();
+        break;
+      case REGION_MIGRATE_PROCEDURE:
+        procedure = new RegionMigrateProcedure();
+        break;
       default:
         throw new IOException("unknown Procedure type: " + typeNum);
     }
@@ -61,6 +69,10 @@ public class ProcedureFactory implements IProcedureFactory {
       return ProcedureType.ADD_CONFIG_NODE_PROCEDURE;
     } else if (procedure instanceof RemoveConfigNodeProcedure) {
       return ProcedureType.REMOVE_CONFIG_NODE_PROCEDURE;
+    } else if (procedure instanceof RemoveDataNodeProcedure) {
+      return ProcedureType.REMOVE_DATA_NODE_PROCEDURE;
+    } else if (procedure instanceof RegionMigrateProcedure) {
+      return ProcedureType.REGION_MIGRATE_PROCEDURE;
     }
     return null;
   }
@@ -69,6 +81,8 @@ public class ProcedureFactory implements IProcedureFactory {
     DELETE_STORAGE_GROUP_PROCEDURE,
     ADD_CONFIG_NODE_PROCEDURE,
     REMOVE_CONFIG_NODE_PROCEDURE,
+    REMOVE_DATA_NODE_PROCEDURE,
+    REGION_MIGRATE_PROCEDURE
   }
 
   private static class ProcedureFactoryHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index b818bade24..a8b6648055 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -21,20 +21,17 @@ package org.apache.iotdb.confignode.service;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.sync.confignode.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
 import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
@@ -161,7 +158,6 @@ public class ConfigNode implements ConfigNodeMBean {
     // Setup MetricsService
     registerManager.register(MetricsService.getInstance());
     MetricsService.getInstance().startAllReporter();
-    configManager.getDataNodeRemoveManager().start();
 
     LOGGER.info("Successfully setup internal services.");
   }
@@ -231,29 +227,6 @@ public class ConfigNode implements ConfigNodeMBean {
     LOGGER.info("{} is deactivated.", ConfigNodeConstant.GLOBAL_NAME);
   }
 
-  public void doRemoveNode(String[] args) throws IOException {
-    LOGGER.info("Starting to remove {}...", ConfigNodeConstant.GLOBAL_NAME);
-    if (args.length != 3) {
-      LOGGER.info("Usage: -r <ip>:<rpcPort>");
-      return;
-    }
-
-    try {
-      TEndPoint endPoint = NodeUrlUtils.parseTEndPointUrl(args[2]);
-      TConfigNodeLocation removeConfigNodeLocation =
-          ConfigNodeRemoveCheck.getInstance().removeCheck(endPoint);
-      if (removeConfigNodeLocation == null) {
-        LOGGER.error("The ConfigNode not in the Cluster.");
-        return;
-      }
-
-      ConfigNodeRemoveCheck.getInstance().removeConfigNode(removeConfigNodeLocation);
-    } catch (BadNodeUrlException e) {
-      LOGGER.warn("No ConfigNodes need to be removed.", e);
-    }
-    LOGGER.info("{} is removed.", ConfigNodeConstant.GLOBAL_NAME);
-  }
-
   private static class ConfigNodeHolder {
 
     private static final ConfigNode INSTANCE = new ConfigNode();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
index 421ea4b961..124b765449 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
@@ -18,10 +18,16 @@
  */
 package org.apache.iotdb.confignode.service;
 
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.StartupChecks;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
+import org.apache.iotdb.confignode.conf.ConfigNodeRemoveCheck;
 import org.apache.iotdb.confignode.conf.ConfigNodeStartupCheck;
 
 import org.slf4j.Logger;
@@ -76,7 +82,7 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
     } else if (MODE_REMOVE.equals(mode)) {
       // remove node
       try {
-        ConfigNode.getInstance().doRemoveNode(args);
+        doRemoveNode(args);
       } catch (IOException e) {
         LOGGER.error("Meet error when doing remove", e);
         return -1;
@@ -85,4 +91,27 @@ public class ConfigNodeCommandLine extends ServerCommandLine {
 
     return 0;
   }
+
+  private void doRemoveNode(String[] args) throws IOException {
+    LOGGER.info("Starting to remove {}...", ConfigNodeConstant.GLOBAL_NAME);
+    if (args.length != 3) {
+      LOGGER.info("Usage: -r <ip>:<rpcPort>");
+      return;
+    }
+
+    try {
+      TEndPoint endPoint = NodeUrlUtils.parseTEndPointUrl(args[2]);
+      TConfigNodeLocation removeConfigNodeLocation =
+          ConfigNodeRemoveCheck.getInstance().removeCheck(endPoint);
+      if (removeConfigNodeLocation == null) {
+        LOGGER.error("The ConfigNode not in the Cluster.");
+        return;
+      }
+
+      ConfigNodeRemoveCheck.getInstance().removeConfigNode(removeConfigNodeLocation);
+    } catch (BadNodeUrlException e) {
+      LOGGER.warn("No ConfigNodes need to be removed.", e);
+    }
+    LOGGER.info("{} is removed.", ConfigNodeConstant.GLOBAL_NAME);
+  }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 295e01737d..1d104f23f7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -176,7 +176,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
 
   @Override
   public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) throws TException {
-    configManager.getDataNodeRemoveManager().reportRegionMigrateResult(req);
+    configManager.getProcedureManager().reportRegionMigrateResult(req);
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlanTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlanTest.java
index bb4977a1f7..85e026d18f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlanTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/write/RemoveDataNodePlanTest.java
@@ -153,7 +153,6 @@ public class RemoveDataNodePlanTest {
   private RemoveDataNodePlan runPlanSerializeAndDeSerialize(boolean update) throws IOException {
     try (DataOutputStream outputStream =
         new DataOutputStream(Files.newOutputStream(Paths.get(serializedFileName)))) {
-      req1.setUpdate(update);
       req1.serializeImpl(outputStream);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 54ff85eef6..49305421cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeResource;
 import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.JMXService;
@@ -35,18 +34,14 @@ import org.apache.iotdb.commons.service.StartupChecks;
 import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.client.ConfigNodeClient;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.IoTDBStartCheck;
-import org.apache.iotdb.db.conf.IoTDBStopCheck;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -78,10 +73,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 
 public class DataNode implements DataNodeMBean {
   private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
@@ -144,103 +136,6 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  /**
-   * remove datanodes from cluster
-   *
-   * @param args IPs for removed datanodes, split with ','
-   */
-  protected void doRemoveNode(String[] args) {
-    try {
-      removePrepare(args);
-      removeNodesFromCluster(args);
-      removeTail();
-    } catch (Exception e) {
-      logger.error("remove Data Nodes error", e);
-    }
-  }
-
-  private void removePrepare(String[] args) throws BadNodeUrlException {
-    ConfigNodeInfo.getInstance()
-        .updateConfigNodeList(IoTDBDescriptor.getInstance().getConfig().getTargetConfigNodeList());
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
-      TDataNodeConfigurationResp resp = configNodeClient.getDataNodeConfiguration(-1);
-      // 1. online Data Node size - removed Data Node size < replication,NOT ALLOW remove
-      //   But replication size is set in Config Node's configuration, so check it in remote Config
-      // Node
-
-      // 2. removed Data Node IP not contained in below map, CAN NOT remove.
-      Map<Integer, TDataNodeConfiguration> nodeIdToNodeConfiguration =
-          resp.getDataNodeConfigurationMap();
-      List<String> removedDataNodeIps = Arrays.asList(args[1].split(","));
-      List<String> onlineDataNodeIps =
-          nodeIdToNodeConfiguration.values().stream()
-              .map(TDataNodeConfiguration::getLocation)
-              .map(TDataNodeLocation::getInternalEndPoint)
-              .map(TEndPoint::getIp)
-              .collect(Collectors.toList());
-      IoTDBStopCheck.getInstance().checkDuplicateIp(removedDataNodeIps);
-      IoTDBStopCheck.getInstance().checkIpInCluster(removedDataNodeIps, onlineDataNodeIps);
-    } catch (TException e) {
-      logger.error("remove Data Nodes check failed", e);
-    }
-  }
-
-  private void removeNodesFromCluster(String[] args) {
-    logger.info("start to remove DataNode from cluster");
-    List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]);
-    if (dataNodeLocations.isEmpty()) {
-      logger.error("data nodes location is empty");
-      // throw Exception OR return?
-      return;
-    }
-    logger.info(
-        "there has data nodes location will be removed. size is: {}, detail: {}",
-        dataNodeLocations.size(),
-        dataNodeLocations);
-    TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
-      TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
-      logger.info("Remove result {} ", removeResp.toString());
-    } catch (TException e) {
-      logger.error("send remove Data Node request failed!", e);
-    }
-  }
-
-  /**
-   * fetch all datanode info from ConfigNode, then compare with input 'ips'
-   *
-   * @param ips data node ip, split with ','
-   * @return TDataNodeLocation list
-   */
-  private List<TDataNodeLocation> buildDataNodeLocations(String ips) {
-    List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
-    if (ips == null || ips.trim().isEmpty()) {
-      return dataNodeLocations;
-    }
-
-    List<String> dataNodeIps = Arrays.asList(ips.split(","));
-    try (ConfigNodeClient client = new ConfigNodeClient()) {
-      dataNodeLocations =
-          client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
-              .map(TDataNodeConfiguration::getLocation)
-              .filter(location -> dataNodeIps.contains(location.getInternalEndPoint().getIp()))
-              .collect(Collectors.toList());
-    } catch (TException e) {
-      logger.error("get data node locations failed", e);
-    }
-
-    if (dataNodeIps.size() != dataNodeLocations.size()) {
-      logger.error(
-          "build DataNode locations error, "
-              + "because number of input ip NOT EQUALS the number of fetched DataNodeLocations, will return empty locations");
-      return dataNodeLocations;
-    }
-
-    return dataNodeLocations;
-  }
-
-  private void removeTail() {}
-
   /** initialize the current node and its services */
   public boolean initLocalEngines() {
     IoTDB.setClusterMode();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index 3900236413..debe51b5ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -18,13 +18,30 @@
  */
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.ConfigurationException;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.IoTDBStopCheck;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 public class DataNodeServerCommandLine extends ServerCommandLine {
 
@@ -75,10 +92,111 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
     if (MODE_START.equals(mode)) {
       dataNode.doAddNode(args);
     } else if (MODE_REMOVE.equals(mode)) {
-      dataNode.doRemoveNode(args);
+      doRemoveNode(args);
     } else {
       logger.error("Unrecognized mode {}", mode);
     }
     return 0;
   }
+
+  /**
+   * remove datanodes from cluster
+   *
+   * @param args IPs for removed datanodes, split with ','
+   */
+  private void doRemoveNode(String[] args) {
+    try {
+      removePrepare(args);
+      removeNodesFromCluster(args);
+      removeTail();
+    } catch (Exception e) {
+      logger.error("remove Data Nodes error", e);
+    }
+  }
+
+  private void removePrepare(String[] args) throws BadNodeUrlException {
+    ConfigNodeInfo.getInstance()
+        .updateConfigNodeList(IoTDBDescriptor.getInstance().getConfig().getTargetConfigNodeList());
+    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      TDataNodeConfigurationResp resp = configNodeClient.getDataNodeConfiguration(-1);
+      // 1. online Data Node size - removed Data Node size < replication,NOT ALLOW remove
+      //   But replication size is set in Config Node's configuration, so check it in remote Config
+      // Node
+
+      // 2. removed Data Node IP not contained in below map, CAN NOT remove.
+      Map<Integer, TDataNodeConfiguration> nodeIdToNodeConfiguration =
+          resp.getDataNodeConfigurationMap();
+      List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args[1]);
+      List<String> removedDataNodeIps =
+          endPoints.stream().map(TEndPoint::getIp).collect(Collectors.toList());
+
+      List<String> onlineDataNodeIps =
+          nodeIdToNodeConfiguration.values().stream()
+              .map(TDataNodeConfiguration::getLocation)
+              .map(TDataNodeLocation::getInternalEndPoint)
+              .map(TEndPoint::getIp)
+              .collect(Collectors.toList());
+      IoTDBStopCheck.getInstance().checkIpInCluster(removedDataNodeIps, onlineDataNodeIps);
+    } catch (TException e) {
+      logger.error("remove Data Nodes check failed", e);
+    }
+  }
+
+  private void removeNodesFromCluster(String[] args) throws BadNodeUrlException {
+    logger.info("start to remove DataNode from cluster");
+    List<TDataNodeLocation> dataNodeLocations = buildDataNodeLocations(args[1]);
+    if (dataNodeLocations.isEmpty()) {
+      logger.error("data nodes location is empty");
+      // throw Exception OR return?
+      return;
+    }
+    logger.info(
+        "there has data nodes location will be removed. size is: {}, detail: {}",
+        dataNodeLocations.size(),
+        dataNodeLocations);
+    TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
+    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
+      logger.info("Remove result {} ", removeResp.toString());
+    } catch (TException e) {
+      logger.error("send remove Data Node request failed!", e);
+    }
+  }
+
+  /**
+   * fetch all datanode info from ConfigNode, then compare with input 'ips'
+   *
+   * @param endPorts data node ip:port, split with ','
+   * @return TDataNodeLocation list
+   */
+  private List<TDataNodeLocation> buildDataNodeLocations(String endPorts)
+      throws BadNodeUrlException {
+    List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    if (endPorts == null || endPorts.trim().isEmpty()) {
+      return dataNodeLocations;
+    }
+
+    List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(endPorts);
+
+    try (ConfigNodeClient client = new ConfigNodeClient()) {
+      dataNodeLocations =
+          client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
+              .map(TDataNodeConfiguration::getLocation)
+              .filter(location -> endPoints.contains(location.getClientRpcEndPoint()))
+              .collect(Collectors.toList());
+    } catch (TException e) {
+      logger.error("get data node locations failed", e);
+    }
+
+    if (endPoints.size() != dataNodeLocations.size()) {
+      logger.error(
+          "build DataNode locations error, "
+              + "because number of input ip NOT EQUALS the number of fetched DataNodeLocations, will return empty locations");
+      return dataNodeLocations;
+    }
+
+    return dataNodeLocations;
+  }
+
+  private void removeTail() {}
 }