You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/05 04:05:01 UTC
[incubator-iotdb] 01/01: first implementation
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2a842f0c582946cf45289885fce208efa3bd46bd
Author: jt2594838 <jt...@163.com>
AuthorDate: Wed Feb 5 11:44:13 2020 +0800
first implementation
---
.../java/org/apache/iotdb/cluster/ClusterMain.java | 87 +++++--
.../apache/iotdb/cluster/client/MetaClient.java | 4 +-
.../cluster/exception/MemberReadOnlyException.java | 11 +
.../java/org/apache/iotdb/cluster/log/Log.java | 2 +-
.../iotdb/cluster/log/applier/MetaLogApplier.java | 6 +-
.../iotdb/cluster/log/logtypes/RemoveNodeLog.java | 41 ++++
.../manage/FilePartitionedSnapshotLogManager.java | 2 +-
.../log/manage/PartitionedSnapshotLogManager.java | 1 +
.../iotdb/cluster/partition/NodeRemovalResult.java | 41 ++++
.../iotdb/cluster/partition/PartitionTable.java | 6 +
.../cluster/partition/SlotPartitionTable.java | 66 +++++-
.../iotdb/cluster/server/DataClusterServer.java | 44 +++-
.../iotdb/cluster/server/MetaClusterServer.java | 5 +
.../org/apache/iotdb/cluster/server/Response.java | 1 +
.../cluster/server/member/DataGroupMember.java | 48 +++-
.../cluster/server/member/MetaGroupMember.java | 151 +++++++++---
.../iotdb/cluster/server/member/RaftMember.java | 15 ++
.../apache/iotdb/cluster/utils/StatusUtils.java | 4 +
.../iotdb/cluster/common/TestPartitionTable.java | 6 +
.../server/heartbeat/MetaHeartBeatThreadTest.java | 6 +
.../cluster/server/member/DataGroupMemberTest.java | 2 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +-
service-rpc/src/main/thrift/cluster.thrift | 7 +
session/pom.xml | 256 ++++++++++-----------
24 files changed, 618 insertions(+), 197 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index e9bb374..8e76928 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -19,8 +19,21 @@
package org.apache.iotdb.cluster;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.iotdb.cluster.client.MetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +46,8 @@ public class ClusterMain {
private static final String MODE_START = "-s";
// join an established cluster
private static final String MODE_ADD = "-a";
+ // join an established cluster
+ private static final String MODE_REMOVE = "-r";
public static MetaClusterServer metaServer;
@@ -43,24 +58,70 @@ public class ClusterMain {
}
String mode = args[0];
+ logger.info("Running mode {}", mode);
try {
- metaServer = new MetaClusterServer();
- metaServer.start();
- } catch (TTransportException | IOException | StartupException e) {
- logger.error("Cannot set up service", e);
+ if (MODE_START.equals(mode)) {
+ metaServer = new MetaClusterServer();
+ metaServer.start();
+ metaServer.buildCluster();
+ } else if (MODE_ADD.equals(mode)) {
+ metaServer = new MetaClusterServer();
+ metaServer.start();
+ if (!metaServer.joinCluster()) {
+ metaServer.stop();
+ }
+ } else if (MODE_REMOVE.equals(mode)) {
+ doRemoveNode(args);
+ } else {
+ logger.error("Unrecognized mode {}", mode);
+ }
+ } catch (IOException | TTransportException | StartupException e) {
+ logger.error("Fail to start meta server", e);
+ }
+ }
+
+ private static void doRemoveNode(String[] args) throws IOException {
+ if (args.length != 3) {
+ logger.error("Usage: -r <ip> <metaPort>");
return;
}
+ String ip = args[1];
+ int metaPort = Integer.parseInt(args[2]);
+ ClusterConfig config = ClusterDescriptor.getINSTANCE().getConfig();
+ TProtocolFactory factory = config
+ .isRpcThriftCompressionEnabled() ? new TCompactProtocol.Factory() : new Factory();
+ Node nodeToRemove = new Node();
+ nodeToRemove.setIp(ip).setMetaPort(metaPort);
+ // try sending the request to each seed node
+ for (String url : config.getSeedNodeUrls()) {
+ String[] splits = url.split(":");
+ Node node = new Node();
+ node.setIp(splits[0]).setMetaPort(Integer.parseInt(splits[1]));
+ MetaClient client = new MetaClient(factory, new TAsyncClientManager(), node, null);
- logger.info("Running mode {}", mode);
- if (MODE_START.equals(mode)) {
- metaServer.buildCluster();
- } else if (MODE_ADD.equals(mode)) {
- if (!metaServer.joinCluster()) {
- metaServer.stop();
+ AtomicReference<Long> responseRef = new AtomicReference<>();
+ GenericHandler handler = new GenericHandler(node, responseRef);
+ try {
+ synchronized (responseRef) {
+ client.removeNode(nodeToRemove, handler);
+ responseRef.wait(RaftServer.connectionTimeoutInMS);
+ }
+ Long response = responseRef.get();
+ if (response != null) {
+ if (response == Response.RESPONSE_AGREE) {
+ logger.info("Node {} is successfully removed", nodeToRemove);
+ return;
+ } else if (response == Response.RESPONSE_CLUSTER_TOO_SMALL) {
+ logger.error("Cluster size is too small, cannot remove any node");
+ return;
+ } else if (response == Response.RESPONSE_REJECT) {
+ logger.error("Node {} is not found in the cluster, please check", node);
+ return;
+ }
+ }
+ } catch (TException | InterruptedException e) {
+ logger.warn("Cannot send remove node request through {}, try next node", node);
}
- } else {
- logger.error("Unrecognized mode {}", mode);
}
-
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
index 28c558d..625567d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/MetaClient.java
@@ -46,7 +46,9 @@ public class MetaClient extends AsyncClient {
public void onComplete() {
super.onComplete();
// return itself to the pool if the job is done
- pool.putClient(node, this);
+ if (pool != null) {
+ pool.putClient(node, this);
+ }
}
public static class Factory implements ClientFactory {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java
new file mode 100644
index 0000000..8fc3ed4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/MemberReadOnlyException.java
@@ -0,0 +1,11 @@
+package org.apache.iotdb.cluster.exception;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+public class MemberReadOnlyException extends Exception{
+
+ public MemberReadOnlyException(Node node) {
+ super(String.format("The node %s has been set readonly for the partitions, please retry to find "
+ + "a new node", node));
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 8bfe5b4..354d322 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -41,7 +41,7 @@ public abstract class Log {
public enum Types {
// TODO-Cluster#348 support more logs
- ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE
+ ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE, REMOVE_NODE
}
public long getPreviousLogIndex() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index b9f2a62..ffd236f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -23,12 +23,12 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +39,6 @@ public class MetaLogApplier extends BaseApplier {
private static final Logger logger = LoggerFactory.getLogger(MetaLogApplier.class);
private MetaGroupMember member;
- private QueryProcessExecutor queryExecutor;
public MetaLogApplier(MetaGroupMember member) {
super(member);
@@ -64,6 +63,9 @@ public class MetaLogApplier extends BaseApplier {
logger.error("Cannot close {} file in {}", closeFileLog.isSeq() ? "seq" : "unseq",
closeFileLog.getStorageGroupName());
}
+ } else if (log instanceof RemoveNodeLog) {
+ RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log);
+ member.applyRemoveNode(removeNodeLog.getRemovedNode());
} else {
// TODO-Cluster#348 support more types of logs
logger.error("Unsupported log: {}", log);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
new file mode 100644
index 0000000..204e675
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -0,0 +1,41 @@
+package org.apache.iotdb.cluster.log.logtypes;
+
+import java.io.IOException;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.SerializeUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+public class RemoveNodeLog extends Log {
+ private Node removedNode;
+
+ @Override
+ public ByteBuffer serialize() {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ try {
+ dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
+ } catch (IOException e) {
+ // ignored
+ }
+ SerializeUtils.serialize(removedNode, dataOutputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ removedNode = new Node();
+ SerializeUtils.deserialize(removedNode, buffer);
+ }
+
+ public Node getRemovedNode() {
+ return removedNode;
+ }
+
+ public void setRemovedNode(Node removedNode) {
+ this.removedNode = removedNode;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 28e310e..6eed911 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
/**
* Different from PartitionedSnapshotLogManager, FilePartitionedSnapshotLogManager does not store
- * the committed in memory, it considers the logs are contained in the TsFiles so it will record
+ * the committed in memory after snapshots, it considers the logs are contained in the TsFiles so it will record
* every TsFiles in the slot instead.
*/
public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogManager<FileSnapshot> {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index e740c86..5b0c432 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -96,6 +96,7 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
public void setSnapshot(T snapshot, int slot) {
synchronized (slotSnapshots) {
+ // TODO-Cluster#451: persist the remote snapshot so the pulling can be recovered in restart
slotSnapshots.put(slot, snapshot);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
new file mode 100644
index 0000000..df5228b
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -0,0 +1,41 @@
+package org.apache.iotdb.cluster.partition;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+/**
+ * NodeRemovalResult stores the removed partition group and who will take over its slots.
+ */
+public class NodeRemovalResult {
+ private PartitionGroup removedGroup;
+ // if the removed group contains the local node, the local node should join a new group to
+ // preserve the replication number
+ private PartitionGroup newGroup;
+ private Map<Node, List<Integer>> newSlotOwners;
+
+ public PartitionGroup getRemovedGroup() {
+ return removedGroup;
+ }
+
+ public void setRemovedGroup(PartitionGroup group) {
+ this.removedGroup = group;
+ }
+
+ public Map<Node, List<Integer>> getNewSlotOwners() {
+ return newSlotOwners;
+ }
+
+ public void setNewSlotOwners(
+ Map<Node, List<Integer>> newSlotOwners) {
+ this.newSlotOwners = newSlotOwners;
+ }
+
+ public PartitionGroup getNewGroup() {
+ return newGroup;
+ }
+
+ public void setNewGroup(PartitionGroup newGroup) {
+ this.newGroup = newGroup;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 130f48b..5810f34 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -68,6 +68,12 @@ public interface PartitionTable {
PartitionGroup addNode(Node node);
/**
+ * Remove a node and update the partition table.
+ * @param node
+ */
+ NodeRemovalResult removeNode(Node node);
+
+ /**
*
* @return All data groups where all VNodes of this node is the header. The first index
* indicates the VNode and the second index indicates the data group of one VNode.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
index d2d3016..b1d7e31 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
@@ -91,8 +91,7 @@ public class SlotPartitionTable implements PartitionTable {
int nodeNum = nodeRing.size();
int slotsPerNode = slotNum / nodeNum;
for (Node node : nodeRing) {
- List<Integer> nodeSlots = new ArrayList<>();
- nodeSlotMap.put(node, nodeSlots);
+ nodeSlotMap.put(node, new ArrayList<>());
}
for (int i = 0; i < slotNum; i++) {
@@ -150,10 +149,6 @@ public class SlotPartitionTable implements PartitionTable {
return ret;
}
- private int nextIndex(int currIndex) {
- return currIndex == nodeRing.size() - 1 ? 0 : currIndex + 1;
- }
-
@Override
public PartitionGroup route(String storageGroupName, long timestamp) {
synchronized (nodeRing) {
@@ -359,4 +354,63 @@ public class SlotPartitionTable implements PartitionTable {
public int hashCode() {
return 0;
}
+
+ @Override
+ public NodeRemovalResult removeNode(Node target) {
+ synchronized (nodeRing) {
+ if (!nodeRing.contains(target)) {
+ return null;
+ }
+
+ NodeRemovalResult result = new NodeRemovalResult();
+ result.setRemovedGroup(getHeaderGroup(target));
+ nodeRing.remove(target);
+
+ // if the node belongs to a group that headed by target, this group should be removed
+ // and other groups containing target should be updated
+ int removedGroupIdx = -1;
+ for(int i = 0; i < localGroups.size(); i++) {
+ PartitionGroup oldGroup = localGroups.get(i);
+ Node header = oldGroup.getHeader();
+ if (header.equals(target)) {
+ removedGroupIdx = i;
+ } else {
+ PartitionGroup newGrp = getHeaderGroup(header);
+ localGroups.set(i, newGrp);
+ result.setNewGroup(newGrp);
+ }
+ }
+ if (removedGroupIdx != -1) {
+ localGroups.remove(removedGroupIdx);
+ // each node exactly joins replicationNum groups, so when a group is removed, the node
+ // should join a new one
+ int thisNodeIdx = nodeRing.indexOf(thisNode);
+ // this node must be the last node of the new group
+ int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
+ headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx;
+ Node header = nodeRing.get(headerNodeIdx);
+ PartitionGroup newGrp = getHeaderGroup(header);
+ localGroups.add(newGrp);
+ }
+
+ // the slots movement is only done logically, the new node itself will pull data from the
+ // old node
+ Map<Node, List<Integer>> nodeListMap = retrieveSlots(target);
+ result.setNewSlotOwners(nodeListMap);
+ return result;
+ }
+ }
+
+ private Map<Node, List<Integer>> retrieveSlots(Node target) {
+ Map<Node, List<Integer>> newHolderSlotMap = new HashMap<>();
+ List<Integer> slots = nodeSlotMap.remove(target);
+ for (int i = 0; i < slots.size(); i++) {
+ int slot = slots.get(i);
+ Node newHolder = nodeRing.get(i % nodeRing.size());
+ slotNodeMap.put(slot, newHolder);
+ nodeSlotMap.get(newHolder).add(slot);
+ newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
+ }
+ return newHolderSlotMap;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 2ca212f..3a89b66 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.cluster.exception.NoHeaderNodeException;
import org.apache.iotdb.cluster.exception.NotInSameGroupException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -88,7 +89,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
if (partitionTable != null) {
try {
member = createNewMember(header);
- } catch (NotInSameGroupException e) {
+ } catch (NotInSameGroupException | TTransportException e) {
ex = e;
}
} else {
@@ -103,7 +104,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
}
- private DataGroupMember createNewMember(Node header) throws NotInSameGroupException {
+ private DataGroupMember createNewMember(Node header)
+ throws NotInSameGroupException, TTransportException {
DataGroupMember member;
synchronized (partitionTable) {
// it may be that the header and this node are in the same group, but it is the first time
@@ -114,6 +116,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
member = dataMemberFactory.create(partitionGroup, thisNode);
headerGroupMap.put(header, member);
logger.info("Created a member for header {}", header);
+ member.start();
} else {
logger.info("This node {} does not belong to the group {}", thisNode, partitionGroup);
throw new NotInSameGroupException(partitionTable.getHeaderGroup(header),
@@ -298,6 +301,41 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
}
+ public void removeNode(Node node, NodeRemovalResult removalResult) {
+ Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator();
+ synchronized (headerGroupMap) {
+ while (entryIterator.hasNext()) {
+ Entry<Node, DataGroupMember> entry = entryIterator.next();
+ DataGroupMember dataGroupMember = entry.getValue();
+ if (dataGroupMember.getHeader().equals(node)) {
+ // the group is removed as the node is removed, so new writes should be rejected as
+ // they belong to the new holder, but the member is kept alive for other nodes to pull
+ // snapshots
+ dataGroupMember.setReadOnly();
+ //TODO-Cluster: when to call removeLocalData?
+ } else {
+ if (node.equals(thisNode)) {
+ // this node is removed, it is no more replica of other groups
+ List<Integer> nodeSlots = partitionTable.getNodeSlots(dataGroupMember.getHeader());
+ dataGroupMember.removeLocalData(nodeSlots);
+ entryIterator.remove();
+ } else {
+ // the group should be updated and pull new slots from the removed node
+ dataGroupMember.removeNode(node, removalResult);
+ }
+ }
+ }
+ PartitionGroup newGroup = removalResult.getNewGroup();
+ if (newGroup != null) {
+ try {
+ createNewMember(newGroup.getHeader());
+ } catch (NotInSameGroupException | TTransportException e) {
+ // ignored
+ }
+ }
+ }
+ }
+
@Override
public void pullTimeSeriesSchema(PullSchemaRequest request,
AsyncMethodCallback<PullSchemaResp> resultHandler) {
@@ -319,7 +357,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
public void pullSnapshots() {
List<Integer> slots = partitionTable.getNodeSlots(thisNode);
DataGroupMember dataGroupMember = headerGroupMap.get(thisNode);
- dataGroupMember.pullSnapshots(slots, thisNode);
+ dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 9ec26cd..61b1042 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.RegisterManager;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;
@@ -176,4 +177,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
return member;
}
+ @Override
+ public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) throws TException {
+ member.removeNode(node, resultHandler);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 7064c73..e0fab54 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -34,6 +34,7 @@ public class Response {
public static final long RESPONSE_NO_CONNECTION = -6;
public static final long RESPONSE_META_LOG_STALE = -7;
public static final long RESPONSE_LEADER_STILL_ONLINE = -8;
+ public static final long RESPONSE_CLUSTER_TOO_SMALL = -9;
private Response() {
// enum class
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 5bcb8de..3e901a5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PullSnapshotTask;
import org.apache.iotdb.cluster.log.snapshot.RemoteFileSnapshot;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.ClusterQueryParser;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
@@ -229,7 +230,6 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
long thisMetaLastLogIndex = metaGroupMember.getLogManager().getLastLogIndex();
long thisMetaLastLogTerm = metaGroupMember.getLogManager().getLastLogTerm();
- long thisMetaTerm = metaGroupMember.getTerm().get();
// term of the electors's MetaGroupMember is not verified, so 0 and 1 are used to make sure
// the verification does not fail
@@ -479,7 +479,12 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
}
}
- public void pullSnapshots(List<Integer> slots, Node newNode) {
+ /**
+ * Pull snapshots from the previous holders after newNode joins the cluster.
+ * @param slots
+ * @param newNode
+ */
+ public void pullNodeAdditionSnapshots(List<Integer> slots, Node newNode) {
synchronized (logManager) {
logger.info("{} pulling {} slots from remote", name, slots.size());
PartitionedSnapshot snapshot = (PartitionedSnapshot) logManager.getSnapshot();
@@ -499,17 +504,14 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
Node node = entry.getKey();
List<Integer> nodeSlots = entry.getValue();
- pullFileSnapshot(node, nodeSlots);
+ pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node), nodeSlots);
}
}
}
- private void pullFileSnapshot(Node node, List<Integer> nodeSlots) {
- PartitionGroup prevHolders =
- new PartitionGroup(metaGroupMember.getPartitionTable().getHeaderGroup(node));
-
+ private void pullFileSnapshot(PartitionGroup prevHolders, List<Integer> nodeSlots) {
Future<Map<Integer, FileSnapshot>> snapshotFuture =
- pullSnapshotService.submit(new PullSnapshotTask(node, nodeSlots, this,
+ pullSnapshotService.submit(new PullSnapshotTask(prevHolders.getHeader(), nodeSlots, this,
prevHolders, FileSnapshot::new));
for (int slot : nodeSlots) {
logManager.setSnapshot(new RemoteFileSnapshot(snapshotFuture), slot);
@@ -718,7 +720,6 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
}
try {
if (reader.hasNextBatch()) {
- int count = 0;
BatchData batchData = reader.nextBatch();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -743,5 +744,34 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
resultHandler.onError(e);
}
}
+
+ /**
+ * When the node does not play a member in a group any more, the corresponding local data should
+ * be removed.
+ */
+ public void removeLocalData(List<Integer> slots) {
+ for (Integer slot : slots) {
+ //TODO-Cluster: remove the data in the slot
+ }
+ }
+
+ /**
+ * When a node is removed and it is not the header of the group, the member should take over
+ * some slots from the removed group, and add a new node to the group the removed node was in the
+ * group.
+ */
+ public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+ synchronized (allNodes) {
+ if (allNodes.contains(removedNode)) {
+ // update the group if the deleted node was in it
+ allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+ }
+ List<Integer> slotsToPull = removalResult.getNewSlotOwners().get(getHeader());
+ if (slotsToPull != null) {
+ // pull the slots that should be taken over
+ pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull);
+ }
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 19c520c..5cdf481 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.AddSelfException;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.NotInSameGroupException;
+import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.RequestTimeOutException;
import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
import org.apache.iotdb.cluster.log.Log;
@@ -62,8 +63,10 @@ import org.apache.iotdb.cluster.log.applier.DataLogApplier;
import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.SlotPartitionTable;
@@ -72,17 +75,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.query.reader.RemoteSeriesReaderByTimestamp;
import org.apache.iotdb.cluster.query.reader.RemoteSimpleSeriesReader;
-import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
-import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
-import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
-import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
-import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.cluster.rpc.thrift.*;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
import org.apache.iotdb.cluster.server.ClientServer;
import org.apache.iotdb.cluster.server.DataClusterServer;
@@ -138,8 +131,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
public static final int REPLICATION_NUM =
ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
- private TProtocolFactory protocolFactory;
-
// blind nodes are nodes that does not know the nodes in the cluster
private Set<Node> blindNodes = new HashSet<>();
private Set<Node> idConflictNodes = new HashSet<>();
@@ -150,7 +141,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
private ClientServer clientServer;
private LogApplier metaLogApplier = new MetaLogApplier(this);
- private LogApplier dataLogApplier = new DataLogApplier(this);
private DataGroupMember.Factory dataMemberFactory;
private MetaSingleSnapshotLogManager logManager;
@@ -165,8 +155,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
throws IOException {
super("Meta", new ClientPool(new MetaClient.Factory(new TAsyncClientManager(), factory)));
allNodes = new ArrayList<>();
- this.protocolFactory = factory;
- dataMemberFactory = new Factory(protocolFactory, this, dataLogApplier,
+ LogApplier dataLogApplier = new DataLogApplier(this);
+ dataMemberFactory = new Factory(factory, this, dataLogApplier,
new TAsyncClientManager());
dataClientPool =
new ClientPool(new DataClient.Factory(new TAsyncClientManager(), factory));
@@ -292,7 +282,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
getDataClusterServer().addDataGroupMember(dataGroupMember);
dataGroupMember.start();
- dataGroupMember.pullSnapshots(partitionTable.getNodeSlots(newNode), newNode);
+ dataGroupMember.pullNodeAdditionSnapshots(partitionTable.getNodeSlots(newNode), newNode);
} catch (TTransportException e) {
logger.error("Fail to create data newMember for new header {}", newNode, e);
}
@@ -610,20 +600,12 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
switch (result) {
case OK:
logger.info("Join request of {} is accepted", node);
- // add node is instantly applied to update the partition table
- try {
- logManager.getApplier().apply(addNodeLog);
- } catch (QueryProcessException e) {
- logManager.removeLastLog();
- resultHandler.onError(e);
- return true;
- }
+ logManager.commitLog(addNodeLog.getCurrLogIndex());
synchronized (partitionTable) {
response.setPartitionTableBytes(partitionTable.serialize());
}
response.setRespNum((int) Response.RESPONSE_AGREE);
resultHandler.onComplete(response);
- logManager.commitLog(logManager.getLastLogIndex());
return true;
case TIME_OUT:
logger.info("Join request of {} timed out", node);
@@ -1220,10 +1202,6 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
return nodeStatus;
}
- ClientServer getClientServer() {
- return clientServer;
- }
-
@Override
public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
resultHandler.onComplete(new TNodeStatus());
@@ -1242,4 +1220,117 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
dataClusterServer.setPartitionTable(partitionTable);
}
}
+
+ @Override
+ public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
+ if (partitionTable == null) {
+ logger.info("Cannot add node now because the partition table is not set");
+ resultHandler.onError(new PartitionTableUnavailableException(thisNode));
+ return;
+ }
+
+ // try to process the request locally, if it cannot be processed locally, forward it
+ if (processRemoveNodeLocally(node, resultHandler)) {
+ return;
+ }
+
+ if (character == NodeCharacter.FOLLOWER && leader != null) {
+ logger.info("Forward the node removal request of {} to leader {}", node, leader);
+ if (forwardRemoveNode(node, resultHandler)) {
+ return;
+ }
+ }
+ resultHandler.onError(new LeaderUnknownException(getAllNodes()));
+ }
+
+ private boolean forwardRemoveNode(Node node, AsyncMethodCallback resultHandler) {
+ TSMetaService.AsyncClient client = (TSMetaService.AsyncClient) connectNode(leader);
+ if (client != null) {
+ try {
+ client.removeNode(node, new GenericForwardHandler(resultHandler));
+ return true;
+ } catch (TException e) {
+ logger.warn("Cannot connect to node {}", node, e);
+ }
+ }
+ return false;
+ }
+
+ private boolean processRemoveNodeLocally(Node node, AsyncMethodCallback resultHandler) {
+ if (character == NodeCharacter.LEADER) {
+ if (allNodes.size() <= ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum()) {
+ resultHandler.onComplete(Response.RESPONSE_CLUSTER_TOO_SMALL);
+ return true;
+ }
+
+ Node target = null;
+ synchronized (allNodes) {
+ for (Node n : allNodes) {
+ if (n.ip.equals(node.ip) && n.metaPort == node.metaPort){
+ target = n;
+ break;
+ }
+ }
+ }
+
+ if (target == null) {
+ logger.debug("Node {} is not in the cluster", node);
+ resultHandler.onComplete(Response.RESPONSE_REJECT);
+ return true;
+ }
+
+ // node removal must be serialized
+ synchronized (logManager) {
+ RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+ removeNodeLog.setCurrLogTerm(getTerm().get());
+ removeNodeLog.setPreviousLogIndex(logManager.getLastLogIndex());
+ removeNodeLog.setPreviousLogTerm(logManager.getLastLogTerm());
+ removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+ removeNodeLog.setRemovedNode(target);
+
+ logManager.appendLog(removeNodeLog);
+
+ logger.info("Send the node removal request of {} to other nodes", target);
+ AppendLogResult result = sendLogToAllGroups(removeNodeLog);
+
+ switch (result) {
+ case OK:
+ logger.info("Join request of {} is accepted", target);
+ logManager.commitLog(removeNodeLog.getCurrLogIndex());
+ resultHandler.onComplete(Response.RESPONSE_AGREE);
+ return true;
+ case TIME_OUT:
+ logger.info("Join request of {} timed out", target);
+ resultHandler.onError(new RequestTimeOutException(removeNodeLog));
+ logManager.removeLastLog();
+ return true;
+ case LEADERSHIP_STALE:
+ default:
+ logManager.removeLastLog();
+ // if the leader is found, forward to it
+ }
+ }
+ }
+ return false;
+ }
+
+ public void applyRemoveNode(Node oldNode) {
+ synchronized (allNodes) {
+ if (allNodes.contains(oldNode)) {
+ logger.debug("Removing a node {} from {}", oldNode, allNodes);
+ allNodes.remove(oldNode);
+ idNodeMap.remove(oldNode.nodeIdentifier);
+
+ // update the partition table
+ NodeRemovalResult result = partitionTable.removeNode(oldNode);
+
+ getDataClusterServer().removeNode(oldNode, result);
+ if (oldNode == leader) {
+ setCharacter(NodeCharacter.ELECTOR);
+ lastHeartBeatReceivedTime = Long.MIN_VALUE;
+ }
+ }
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 6deb034..f4789bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -105,6 +105,11 @@ public abstract class RaftMember implements RaftService.AsyncIface {
// know if this node is synchronized with the leader
private Object syncLock = new Object();
+ // when the header of the group is removed from the cluster, the members of the group should no
+ // longer accept writes, but they still can be read candidates for weak consistency reads and
+ // provide snapshots for the new holders
+ private volatile boolean readOnly = false;
+
public RaftMember() {
}
@@ -652,6 +657,10 @@ public abstract class RaftMember implements RaftService.AsyncIface {
TSStatus processPlanLocally(PhysicalPlan plan) {
logger.debug("{}: Processing plan {}", name, plan);
synchronized (logManager) {
+ if (readOnly) {
+ return StatusUtils.NODE_READ_ONLY;
+ }
+
PhysicalPlanLog log = new PhysicalPlanLog();
log.setCurrLogTerm(getTerm().get());
log.setPreviousLogIndex(logManager.getLastLogIndex());
@@ -798,4 +807,10 @@ public abstract class RaftMember implements RaftService.AsyncIface {
resultHandler.onError(e);
}
}
+
+ public void setReadOnly() {
+ synchronized (logManager) {
+ readOnly = true;
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index 99fdd48..718a7e0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -39,6 +39,7 @@ public class StatusUtils {
public static final TSStatus EXECUTE_STATEMENT_ERROR =
getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
public static final TSStatus NO_STORAGE_GROUP = getStatus(TSStatusCode.STORAGE_GROUP_ERROR);
+ public static final TSStatus NODE_READ_ONLY = getStatus(TSStatusCode.NODE_READ_ONLY);
private static TSStatus getStatus(TSStatusCode statusCode) {
TSStatusType tsStatusType = new TSStatusType();
@@ -55,6 +56,9 @@ public class StatusUtils {
case PARTITION_NOT_READY:
tsStatusType.setMessage("Partition table is not ready");
break;
+ case NODE_READ_ONLY:
+ tsStatusType.setMessage("Current node is read-only, please retry to find another available node");
+ break;
default:
tsStatusType.setMessage("");
break;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
index ef14b1d..aeda017 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionTable.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.common;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -39,6 +40,11 @@ public class TestPartitionTable implements PartitionTable {
}
@Override
+ public NodeRemovalResult removeNode(Node node) {
+ return null;
+ }
+
+ @Override
public List<PartitionGroup> getLocalGroups() {
return null;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
index 457fffe..45d372e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThreadTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.common.TestClient;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.log.LogManager;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -61,6 +62,11 @@ public class MetaHeartBeatThreadTest extends HeartBeatThreadTest {
}
@Override
+ public NodeRemovalResult removeNode(Node node) {
+ return null;
+ }
+
+ @Override
public List<PartitionGroup> getLocalGroups() {
return null;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 9ea7028..3caf096 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -427,7 +427,7 @@ public class DataGroupMemberTest extends MemberTest {
hasInitialSnapshots = false;
partitionTable.addNode(TestUtils.getNode(10));
List<Integer> requiredSlots = Arrays.asList(19, 39, 59, 79, 99);
- dataGroupMember.pullSnapshots(requiredSlots, TestUtils.getNode(10));
+ dataGroupMember.pullNodeAdditionSnapshots(requiredSlots, TestUtils.getNode(10));
assertEquals(requiredSlots.size(), receivedSnapshots.size());
for (Integer requiredSlot : requiredSlots) {
receivedSnapshots.get(requiredSlot).getRemoteSnapshot();
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index fd9019a..87a4171 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -65,7 +65,8 @@ public enum TSStatusCode {
PARTITION_NOT_READY(700),
TIME_OUT(701),
NO_LEADER(702),
- UNSUPPORTED_OPERATION(703);
+ UNSUPPORTED_OPERATION(703),
+ NODE_READ_ONLY(704);
private int statusCode;
diff --git a/service-rpc/src/main/thrift/cluster.thrift b/service-rpc/src/main/thrift/cluster.thrift
index eb734cd..f3b4b3d 100644
--- a/service-rpc/src/main/thrift/cluster.thrift
+++ b/service-rpc/src/main/thrift/cluster.thrift
@@ -271,6 +271,13 @@ service TSMetaService extends RaftService {
**/
AddNodeResponse addNode(1: Node node)
+ /**
+ * Remove a node from the cluster. If the node is not in the cluster or the cluster size will
+ * less than replication number, the request will be rejected.
+ * return -1(RESPONSE_AGREE) or -3(RESPONSE_REJECT) or -9(RESPONSE_CLUSTER_TOO_SMALL)
+ **/
+ long removeNode(1: Node node)
+
TNodeStatus queryNodeStatus()
Node checkAlive()
diff --git a/session/pom.xml b/session/pom.xml
index 8df9ae6..dab30aa 100644
--- a/session/pom.xml
+++ b/session/pom.xml
@@ -19,134 +19,132 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>iotdb-parent</artifactId>
- <groupId>org.apache.iotdb</groupId>
- <version>0.10.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>iotdb-session</artifactId>
- <name>IoTDB Session</name>
- <properties>
- <session.test.skip>false</session.test.skip>
- <session.it.skip>${session.test.skip}</session.it.skip>
- <session.ut.skip>${session.test.skip}</session.ut.skip>
- </properties>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.1.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <!-- this is used for inheritance merges -->
- <phase>package</phase>
- <!-- bind to the packaging phase -->
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!--using `mvn test` to run UT, `mvn verify` to run ITs
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-parent</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>0.10.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>iotdb-session</artifactId>
+ <name>IoTDB Session</name>
+ <properties>
+ <session.test.skip>false</session.test.skip>
+ <session.it.skip>${session.test.skip}</session.it.skip>
+ <session.ut.skip>${session.test.skip}</session.ut.skip>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!--using `mvn test` to run UT, `mvn verify` to run ITs
Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skipTests>${session.ut.skip}</skipTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <executions>
- <execution>
- <id>run-integration-tests</id>
- <phase>integration-test</phase>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <skipTests>${session.test.skip}</skipTests>
- <skipITs>${session.it.skip}</skipITs>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>service-rpc</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>tsfile</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-jdbc</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
- </dependencies>
- <profiles>
- <profile>
- <id>skipSessionTests</id>
- <activation>
- <property>
- <name>skipTests</name>
- <value>true</value>
- </property>
- </activation>
- <properties>
- <session.test.skip>true</session.test.skip>
- <session.ut.skip>true</session.ut.skip>
- <session.it.skip>true</session.it.skip>
- </properties>
- </profile>
- <profile>
- <id>skipUT_SessionTests</id>
- <activation>
- <property>
- <name>skipUTs</name>
- <value>true</value>
- </property>
- </activation>
- <properties>
- <session.ut.skip>true</session.ut.skip>
- </properties>
- </profile>
- </profiles>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>${session.ut.skip}</skipTests>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>run-integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <skipTests>${session.test.skip}</skipTests>
+ <skipITs>${session.it.skip}</skipITs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>service-rpc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-jdbc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>skipSessionTests</id>
+ <activation>
+ <property>
+ <name>skipTests</name>
+ <value>true</value>
+ </property>
+ </activation>
+ <properties>
+ <session.test.skip>true</session.test.skip>
+ <session.ut.skip>true</session.ut.skip>
+ <session.it.skip>true</session.it.skip>
+ </properties>
+ </profile>
+ <profile>
+ <id>skipUT_SessionTests</id>
+ <activation>
+ <property>
+ <name>skipUTs</name>
+ <value>true</value>
+ </property>
+ </activation>
+ <properties>
+ <session.ut.skip>true</session.ut.skip>
+ </properties>
+ </profile>
+ </profiles>
</project>