You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/02/24 14:07:18 UTC

[iotdb] 01/02: This pr fix following bugs: 1. LogPlan serialize and deserialize bug 2. add shell scripts to remove nodes 3. enrich the function of node tool 4. fix some bugs of adding new nodes

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4609bc00ff6d808dd0dbac64fb7edba81a07d4c1
Author: lta <li...@163.com>
AuthorDate: Tue Feb 23 20:32:01 2021 +0800

    This pr fix following bugs:
    1. LogPlan serialize and deserialize bug
    2. add shell scripts to remove nodes
    3. enrich the function of node tool
    4. fix some bugs of adding new nodes
---
 cluster/src/assembly/resources/sbin/add-node.bat   |   2 +-
 cluster/src/assembly/resources/sbin/add-node.sh    |   2 +-
 .../sbin/{start-node.bat => remove-node.bat}       |  16 +--
 .../resources/sbin/{add-node.sh => remove-node.sh} |  28 +++--
 cluster/src/assembly/resources/sbin/start-node.bat |   2 +-
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  11 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   3 +
 .../iotdb/cluster/partition/PartitionGroup.java    |   4 +
 .../partition/balancer/DefaultSlotBalancer.java    |   2 +
 .../iotdb/cluster/query/ClusterPlanRouter.java     |   2 +-
 .../org/apache/iotdb/cluster/server/Response.java  |   2 +
 .../server/handlers/caller/NodeStatusHandler.java  |   9 +-
 .../cluster/server/member/MetaGroupMember.java     | 139 +++++++++++++--------
 .../iotdb/cluster/server/member/RaftMember.java    |   1 -
 .../cluster/utils/nodetool/ClusterMonitor.java     |   2 +-
 .../utils/nodetool/ClusterMonitorMBean.java        |   4 +-
 .../cluster/utils/nodetool/function/Host.java      |   2 +-
 .../cluster/utils/nodetool/function/Status.java    |  24 +++-
 .../apache/iotdb/cluster/log/LogParserTest.java    |  14 +++
 .../apache/iotdb/db/qp/physical/sys/LogPlan.java   |  11 +-
 20 files changed, 179 insertions(+), 101 deletions(-)

diff --git a/cluster/src/assembly/resources/sbin/add-node.bat b/cluster/src/assembly/resources/sbin/add-node.bat
index 958f16f..452f1c3 100755
--- a/cluster/src/assembly/resources/sbin/add-node.bat
+++ b/cluster/src/assembly/resources/sbin/add-node.bat
@@ -19,7 +19,7 @@
 
 @echo off
 echo ````````````````````````
-echo Starting IoTDB
+echo Starting IoTDB (Cluster Mode)
 echo ````````````````````````
 
 PATH %PATH%;%JAVA_HOME%\bin\
diff --git a/cluster/src/assembly/resources/sbin/add-node.sh b/cluster/src/assembly/resources/sbin/add-node.sh
index 807175b..935abde 100755
--- a/cluster/src/assembly/resources/sbin/add-node.sh
+++ b/cluster/src/assembly/resources/sbin/add-node.sh
@@ -20,7 +20,7 @@
 
 
 echo ---------------------
-echo Starting IoTDB
+echo "Starting IoTDB (Cluster Mode)"
 echo ---------------------
 
 if [ -z "${IOTDB_HOME}" ]; then
diff --git a/cluster/src/assembly/resources/sbin/start-node.bat b/cluster/src/assembly/resources/sbin/remove-node.bat
similarity index 91%
copy from cluster/src/assembly/resources/sbin/start-node.bat
copy to cluster/src/assembly/resources/sbin/remove-node.bat
index f9a7d1f..a2b0564 100755
--- a/cluster/src/assembly/resources/sbin/start-node.bat
+++ b/cluster/src/assembly/resources/sbin/remove-node.bat
@@ -19,7 +19,7 @@
 
 @echo off
 echo ````````````````````````
-echo Starting IoTDB
+echo Starting to remove a node (Cluster Mode)
 echo ````````````````````````
 
 PATH %PATH%;%JAVA_HOME%\bin\
@@ -57,20 +57,8 @@ popd
 set IOTDB_CONF=%IOTDB_HOME%\conf
 set IOTDB_LOGS=%IOTDB_HOME%\logs
 
-
-IF EXIST "%IOTDB_CONF%\iotdb-env.bat" (
-    IF "%1" == "printgc" (
-      CALL "%IOTDB_CONF%\iotdb-env.bat" printgc
-      SHIFT
-    ) ELSE (
-      CALL "%IOTDB_CONF%\iotdb-env.bat"
-    )
-) ELSE (
-    echo "can't find %IOTDB_CONF%\iotdb-env.bat"
-)
-
 @setlocal ENABLEDELAYEDEXPANSION ENABLEEXTENSIONS
-set CONF_PARAMS=-s
+set CONF_PARAMS=-r
 set is_conf_path=false
 for %%i in (%*) do (
 	IF "%%i" == "-c" (
diff --git a/cluster/src/assembly/resources/sbin/add-node.sh b/cluster/src/assembly/resources/sbin/remove-node.sh
similarity index 82%
copy from cluster/src/assembly/resources/sbin/add-node.sh
copy to cluster/src/assembly/resources/sbin/remove-node.sh
index 807175b..65ee58b 100755
--- a/cluster/src/assembly/resources/sbin/add-node.sh
+++ b/cluster/src/assembly/resources/sbin/remove-node.sh
@@ -20,7 +20,7 @@
 
 
 echo ---------------------
-echo Starting IoTDB
+echo "Starting to remove a node(Cluster Mode)"
 echo ---------------------
 
 if [ -z "${IOTDB_HOME}" ]; then
@@ -28,13 +28,24 @@ if [ -z "${IOTDB_HOME}" ]; then
 fi
 
 IOTDB_CONF=${IOTDB_HOME}/conf
-# IOTDB_LOGS=${IOTDB_HOME}/logs
 
-if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
-    . "$IOTDB_CONF/iotdb-env.sh"
-else
-    echo "can't find $IOTDB_CONF/iotdb-env.sh"
-fi
+is_conf_path=false
+for arg do
+    shift
+    if [ "$arg" == "-c" ]; then
+        is_conf_path=true
+        continue
+    fi
+
+    if [ $is_conf_path == true ]; then
+        IOTDB_CONF=$arg
+        is_conf_path=false
+        continue
+    fi
+    set -- "$@" "$arg"
+done
+
+CONF_PARAMS="-r "$*
 
 if [ -n "$JAVA_HOME" ]; then
     for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
@@ -65,8 +76,9 @@ launch_service()
 	iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
 	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
 	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -DCLUSTER_CONF=${IOTDB_CONF}"
 	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
-	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" -a
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
 	return $?
 }
 
diff --git a/cluster/src/assembly/resources/sbin/start-node.bat b/cluster/src/assembly/resources/sbin/start-node.bat
index f9a7d1f..161cc2a 100755
--- a/cluster/src/assembly/resources/sbin/start-node.bat
+++ b/cluster/src/assembly/resources/sbin/start-node.bat
@@ -19,7 +19,7 @@
 
 @echo off
 echo ````````````````````````
-echo Starting IoTDB
+echo Starting IoTDB (Cluster Mode)
 echo ````````````````````````
 
 PATH %PATH%;%JAVA_HOME%\bin\
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 33d8a5d..48f8813 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -68,8 +68,8 @@ public class ClusterMain {
           + "[-internal_data_port <internal data port>] "
           + "[-cluster_rpc_port <cluster rpc port>] "
           + "[-seed_nodes <node1:meta_port:data_port:cluster_rpc_port,"
-          +               "node2:meta_port:data_port:cluster_rpc_port,"
-          +           "...,noden:meta_port:data_port:cluster_rpc_port>] "
+          + "node2:meta_port:data_port:cluster_rpc_port,"
+          + "...,noden:meta_port:data_port:cluster_rpc_port>] "
           + "[-sc] "
           + "[-rpc_port <rpc port>]");
       return;
@@ -276,6 +276,9 @@ public class ClusterMain {
       logger.error("Cluster size is too small, cannot remove any node");
     } else if (response == Response.RESPONSE_REJECT) {
       logger.error("Node {} is not found in the cluster, please check", nodeToRemove);
+    } else if (response == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) {
+      logger.warn(
+          "The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later");
     } else {
       logger.error("Unexpected response {}", response);
     }
@@ -302,7 +305,7 @@ public class ClusterMain {
       public int calculateSlotByTime(String storageGroupName, long timestamp, int maxSlotNum) {
         int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k;
         if (sgSerialNum >= 0) {
-          return (int)(maxSlotNum / k * (sgSerialNum + 0.5));
+          return (int) (maxSlotNum / k * (sgSerialNum + 0.5));
         } else {
           return defaultStrategy.calculateSlotByTime(storageGroupName, timestamp, maxSlotNum);
         }
@@ -313,7 +316,7 @@ public class ClusterMain {
           int maxSlotNum) {
         int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k;
         if (sgSerialNum >= 0) {
-          return (int)(maxSlotNum / k * (sgSerialNum + 0.5));
+          return (int) (maxSlotNum / k * (sgSerialNum + 0.5));
         } else {
           return defaultStrategy
               .calculateSlotByPartitionNum(storageGroupName, partitionId, maxSlotNum);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index f774567..b08d327 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -641,6 +641,9 @@ public abstract class RaftLogManager {
    */
   void applyEntries(List<Log> entries) {
     for (Log entry : entries) {
+      if (entry.isApplied()) {
+        continue;
+      }
       if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) {
         blockedUnappliedLogList.add(entry);
         continue;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index b35cc10..e7d039c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -98,4 +98,8 @@ public class PartitionGroup extends ArrayList<Node> {
     return id;
   }
 
+  @Override
+  public String toString() {
+    return String.format("PartitionGroup{id=%d, header=%s}", id, get(0));
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
index eb1825f..ad90d65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
@@ -79,6 +79,7 @@ public class DefaultSlotBalancer implements SlotBalancer {
           previousNodeMap.get(curNode).put(slot, table.getHeaderGroup(entry.getKey(), oldRing));
           slotNodes[slot] = curNode;
         }
+        slotsToMove.clear();
         transferNum -= numToMove;
         if (transferNum > 0) {
           curNode = new RaftNode(newNode, ++raftId);
@@ -89,6 +90,7 @@ public class DefaultSlotBalancer implements SlotBalancer {
             previousNodeMap.get(curNode).put(slot, table.getHeaderGroup(entry.getKey(), oldRing));
             slotNodes[slot] = curNode;
           }
+          slotsToMove.clear();
         }
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 65f66dc..8fbc772 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -161,7 +161,7 @@ public class ClusterPlanRouter {
       throw new UnsupportedPlanException(plan);
     }
     for (PartitionGroup partitionGroup: partitionTable.calculateGlobalGroups(oldRing)) {
-      result.put(plan, partitionGroup);
+      result.put(new LogPlan(plan), partitionGroup);
     }
     return result;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 373d535..8a9b710 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -49,6 +49,8 @@ public class Response {
   // the new node, which tries to join the cluster, contains conflicted parameters with the
   // cluster, so the operation is rejected.
   public static final long RESPONSE_NEW_NODE_PARAMETER_CONFLICT = -10;
+  // add/remove node operations should one by one
+  public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -11;
   // the request is not executed locally anc should be forwarded
   public static final long RESPONSE_NULL = Long.MIN_VALUE;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java
index f17bc6e..4a0c43f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java
@@ -27,11 +27,11 @@ import org.apache.thrift.async.AsyncMethodCallback;
 public class NodeStatusHandler implements AsyncMethodCallback<Node> {
 
 
-  private Map<Node, Boolean> nodeStatusMap;
+  private Map<Node, Integer> nodeStatusMap;
 
   private AtomicInteger countResponse;
 
-  public NodeStatusHandler(Map<Node, Boolean> nodeStatusMap) {
+  public NodeStatusHandler(Map<Node, Integer> nodeStatusMap) {
     this.nodeStatusMap = nodeStatusMap;
     this.countResponse = new AtomicInteger();
   }
@@ -39,7 +39,10 @@ public class NodeStatusHandler implements AsyncMethodCallback<Node> {
   @Override
   public void onComplete(Node response) {
     synchronized (nodeStatusMap) {
-      nodeStatusMap.put(response, true);
+      if (response == null) {
+        return;
+      }
+      nodeStatusMap.put(response, 0);
       // except for this node itself
       if(countResponse.incrementAndGet() == nodeStatusMap.size() - 1){
         nodeStatusMap.notifyAll();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 9dc5d67..dd5b419 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -19,7 +19,40 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -117,40 +150,6 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
-import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
-
 @SuppressWarnings("java:S1135")
 public class MetaGroupMember extends RaftMember {
 
@@ -472,6 +471,7 @@ public class MetaGroupMember extends RaftMember {
         partitionTable = new SlotPartitionTable(allNodes, thisNode);
         logger.info("Partition table is set up");
       }
+      initIdNodeMap();
       router = new ClusterPlanRouter(partitionTable);
       this.coordinator.setRouter(router);
       startSubServers();
@@ -604,6 +604,9 @@ public class MetaGroupMember extends RaftMember {
       setNodeIdentifier(genNodeIdentifier());
     } else if (resp.getRespNum() == Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT) {
       handleConfigInconsistency(resp);
+    } else if (resp.getRespNum() == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) {
+      logger.warn(
+          "The cluster is performing other change membership operations. Change membership operations should be performed one by one. Please try again later");
     } else {
       logger
           .warn("Joining the cluster is rejected by {} for response {}", node, resp.getRespNum());
@@ -867,18 +870,27 @@ public class MetaGroupMember extends RaftMember {
    * immediately. If the identifier of "node" conflicts with an existing node, the request will be
    * turned down.
    *
-   * @param node          cannot be the local node
+   * @param newNode          cannot be the local node
    * @param startUpStatus the start up status of the new node
    * @param response      the response that will be sent to "node"
    * @return true if the process is over, false if the request should be forwarded
    */
-  private boolean processAddNodeLocally(Node node, StartUpStatus startUpStatus,
+  private boolean processAddNodeLocally(Node newNode, StartUpStatus startUpStatus,
       AddNodeResponse response) throws LogExecutionException {
     if (character != NodeCharacter.LEADER) {
       return false;
     }
-    if (allNodes.contains(node)) {
-      logger.debug("Node {} is already in the cluster", node);
+    boolean nodeExistInPartitionTable = false;
+    for (Node node : partitionTable.getAllNodes()) {
+      if (node.ip.equals(newNode.ip) && newNode.dataPort == node.dataPort
+          && newNode.metaPort == node.metaPort && newNode.clientPort == node.clientPort) {
+        newNode.nodeIdentifier = node.nodeIdentifier;
+        nodeExistInPartitionTable = true;
+        break;
+      }
+    }
+    if (allNodes.contains(newNode)) {
+      logger.debug("Node {} is already in the cluster", newNode);
       response.setRespNum((int) Response.RESPONSE_AGREE);
       synchronized (partitionTable) {
         response.setPartitionTableBytes(partitionTable.serialize());
@@ -886,9 +898,14 @@ public class MetaGroupMember extends RaftMember {
       return true;
     }
 
-    Node idConflictNode = idNodeMap.get(node.getNodeIdentifier());
+    if (!nodeExistInPartitionTable && partitionTable.getAllNodes().size() != allNodes.size()) {
+      response.setRespNum((int) Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT);
+      return true;
+    }
+
+    Node idConflictNode = idNodeMap.get(newNode.getNodeIdentifier());
     if (idConflictNode != null) {
-      logger.debug("{}'s id conflicts with {}", node, idConflictNode);
+      logger.debug("{}'s id conflicts with {}", newNode, idConflictNode);
       response.setRespNum((int) Response.RESPONSE_IDENTIFIER_CONFLICT);
       return true;
     }
@@ -901,7 +918,7 @@ public class MetaGroupMember extends RaftMember {
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
-      partitionTable.addNode(node);
+      partitionTable.addNode(newNode);
       ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
 
       AddNodeLog addNodeLog = new AddNodeLog();
@@ -910,28 +927,28 @@ public class MetaGroupMember extends RaftMember {
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
       addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
 
-      addNodeLog.setNewNode(node);
+      addNodeLog.setNewNode(newNode);
 
       logManager.append(addNodeLog);
 
       int retryTime = 1;
       while (true) {
         logger
-            .info("Send the join request of {} to other nodes, retry time: {}", node, retryTime);
+            .info("Send the join request of {} to other nodes, retry time: {}", newNode, retryTime);
         AppendLogResult result = sendLogToFollowers(addNodeLog);
         switch (result) {
           case OK:
             commitLog(addNodeLog);
-            logger.info("Join request of {} is accepted", node);
+            logger.info("Join request of {} is accepted", newNode);
 
             synchronized (partitionTable) {
               response.setPartitionTableBytes(partitionTable.serialize());
             }
             response.setRespNum((int) Response.RESPONSE_AGREE);
-            logger.info("Sending join response of {}", node);
+            logger.info("Sending join response of {}", newNode);
             return true;
           case TIME_OUT:
-            logger.info("Join request of {} timed out", node);
+            logger.info("Join request of {} timed out", newNode);
             retryTime++;
             continue;
           case LEADERSHIP_STALE:
@@ -1606,6 +1623,7 @@ public class MetaGroupMember extends RaftMember {
         allRedirect = false;
       }
       if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("Fail to send log {} to data group {}", entry.getKey(), entry.getValue());
         // execution failed, record the error message
         errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
             tmpStatus.getCode(), entry.getValue().getHeader(),
@@ -1715,7 +1733,7 @@ public class MetaGroupMember extends RaftMember {
 
   private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
       throws IOException {
-    Client client = null;
+    Client client;
     try {
       client = getClientProvider().getSyncDataClient(receiver,
           RaftServer.getWriteOperationTimeoutMS());
@@ -1726,8 +1744,6 @@ public class MetaGroupMember extends RaftMember {
   }
 
   /**
-=======
->>>>>>> master
    * Get the data groups that should be queried when querying "path" with "filter". First, the time
    * interval qualified by the filter will be extracted. If any side of the interval is open, query
    * all groups. Otherwise compute all involved groups w.r.t. the time partitioning.
@@ -1776,14 +1792,14 @@ public class MetaGroupMember extends RaftMember {
   }
 
   @SuppressWarnings("java:S2274")
-  public Map<Node, Boolean> getAllNodeStatus() {
+  public Map<Node, Integer> getAllNodeStatus() {
     if (getPartitionTable() == null) {
       // the cluster is being built.
       return null;
     }
-    Map<Node, Boolean> nodeStatus = new HashMap<>();
+    Map<Node, Integer> nodeStatus = new HashMap<>();
     for (Node node : allNodes) {
-      nodeStatus.put(node, thisNode.equals(node));
+      nodeStatus.put(node, thisNode.equals(node) ? 0 : 1);
     }
 
     try {
@@ -1798,11 +1814,20 @@ public class MetaGroupMember extends RaftMember {
       Thread.currentThread().interrupt();
       logger.warn("Cannot get the status of all nodes", e);
     }
+
+    for (Node node: partitionTable.getAllNodes()) {
+      nodeStatus.putIfAbsent(node, 2);
+    }
+    for (Node node : allNodes) {
+      if (!partitionTable.getAllNodes().contains(node)) {
+        nodeStatus.put(node, 3);
+      }
+    }
     return nodeStatus;
   }
 
   @SuppressWarnings({"java:S2445", "java:S2274"})
-  private void getNodeStatusAsync(Map<Node, Boolean> nodeStatus)
+  private void getNodeStatusAsync(Map<Node, Integer> nodeStatus)
       throws TException, InterruptedException {
     NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus);
     synchronized (nodeStatus) {
@@ -1816,7 +1841,7 @@ public class MetaGroupMember extends RaftMember {
     }
   }
 
-  private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) {
+  private void getNodeStatusSync(Map<Node, Integer> nodeStatus) {
     NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus);
     for (Node node : allNodes) {
       SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
@@ -1902,10 +1927,14 @@ public class MetaGroupMember extends RaftMember {
       return Response.RESPONSE_REJECT;
     }
 
+    if (partitionTable.getAllNodes().contains(target) && partitionTable.getAllNodes().size() != allNodes.size()) {
+      return Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT;
+    }
+
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
-      partitionTable.addNode(node);
+      partitionTable.removeNode(node);
       ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
 
       RemoveNodeLog removeNodeLog = new RemoveNodeLog();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d05ddd9..2cf5f77 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -907,7 +907,6 @@ public abstract class RaftMember {
 
     try {
       if (appendLogInGroup(log)) {
-        TSStatus res = StatusUtils.OK;
         return StatusUtils.OK;
       }
     } catch (LogExecutionException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 890b402..f1f46d2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -134,7 +134,7 @@ public class ClusterMonitor implements ClusterMonitorMBean, IService {
   }
 
   @Override
-  public Map<Node, Boolean> getAllNodeStatus() {
+  public Map<Node, Integer> getAllNodeStatus() {
     MetaGroupMember metaGroupMember = getMetaGroupMember();
     if (metaGroupMember == null) {
       return null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
index ea52c28..cc3e7b7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
@@ -66,9 +66,9 @@ public interface ClusterMonitorMBean {
   /**
    * Get status of all nodes
    *
-   * @return key: node, value: live or not
+   * @return key: node, value: 0(live), 1(offline), 2(joining), 3(leaving)
    */
-  Map<Node, Boolean> getAllNodeStatus();
+  Map<Node, Integer> getAllNodeStatus();
 
   /**
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
index da4305d..d32b94d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
@@ -63,7 +63,7 @@ public class Host extends NodeToolCmd {
       for (int i = 1; i < raftGroup.size(); i++) {
         builder.append(", ").append(nodeToString(raftGroup.get(i)));
       }
-      builder.append(')');
+      builder.append("),id=").append(raftGroup.getId());
       msgPrintln(String.format("%-50s->%20s", builder.toString(), slotNum));
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java
index 700990a..fe8eb0f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java
@@ -22,6 +22,7 @@ import static org.apache.iotdb.cluster.utils.nodetool.Printer.msgPrintln;
 
 import io.airlift.airline.Command;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
 
@@ -30,15 +31,26 @@ public class Status extends NodeToolCmd {
 
   @Override
   public void execute(ClusterMonitorMBean proxy) {
-    Map<Node, Boolean> statusMap = proxy.getAllNodeStatus();
-    if(statusMap == null){
+    Map<Node, Integer> statusMap = proxy.getAllNodeStatus();
+    if (statusMap == null) {
       msgPrintln(BUILDING_CLUSTER_INFO);
       return;
     }
     msgPrintln(String.format("%-30s  %10s", "Node", "Status"));
-    statusMap.forEach(
-        (node, status) -> msgPrintln(String.format("%-30s->%10s", nodeToString(node),
-            (Boolean.TRUE.equals(status) ?
-            "on" : "off"))));
+    for (Entry<Node, Integer> entry : statusMap.entrySet()) {
+      Node node = entry.getKey();
+      Integer statusNum = entry.getValue();
+      String status;
+      if (statusNum == 0) {
+        status = "on";
+      } else if (statusNum == 1) {
+        status = "off";
+      } else if (statusNum == 2) {
+        status = "joining";
+      } else {
+        status = "leaving";
+      }
+      msgPrintln(String.format("%-30s->%10s", nodeToString(node), status));
+    }
   }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 76efe5f..59874b4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
@@ -29,8 +30,12 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.junit.Test;
 
@@ -98,4 +103,13 @@ public class LogParserTest {
     Log serialized = logParser.parse(byteBuffer);
     assertEquals(log, serialized);
   }
+
+  @Test
+  public void testLogPlan() throws IOException, IllegalPathException, UnknownLogTypeException {
+    AddNodeLog log = new AddNodeLog(TestUtils.seralizePartitionTable, TestUtils.getNode(0));
+    LogPlan logPlan = new LogPlan(log.serialize());
+    ByteBuffer buffer = ByteBuffer.wrap(PlanSerializer.getInstance().serialize(logPlan));
+    PhysicalPlan plan = PhysicalPlan.Factory.create(buffer);
+    LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+  }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
index bdc19c5..d0118db 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
@@ -43,6 +43,11 @@ public class LogPlan extends PhysicalPlan {
     this.log = log;
   }
 
+  public LogPlan(LogPlan plan) {
+    super(false);
+    this.log = plan.log;
+  }
+
   public ByteBuffer getLog() {
     log.clear();
     return log;
@@ -65,8 +70,10 @@ public class LogPlan extends PhysicalPlan {
   }
 
   @Override
-  public void serialize(ByteBuffer buffer) {
+  public void deserialize(ByteBuffer buffer) {
     int len = buffer.getInt();
-    log = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+    byte[] data = new byte[len];
+    System.arraycopy(buffer.array(), buffer.position(), data, 0, len);
+    log = ByteBuffer.wrap(data);
   }
 }