You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/02/10 09:56:09 UTC
[incubator-iotdb] branch cluster_node_deletion updated: add unit
tests
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_node_deletion
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_node_deletion by this push:
new ad5be9f add unit tests
ad5be9f is described below
commit ad5be9ff3b8794cbcbd4c1391615bc393ee92d58
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Feb 10 17:55:54 2020 +0800
add unit tests
---
cluster/partitions.tmp | Bin 0 -> 511 bytes
cluster/src/assembly/resources/conf/logback.xml | 1 +
.../java/org/apache/iotdb/cluster/ClusterMain.java | 2 +-
.../apache/iotdb/cluster/config/ClusterConfig.java | 2 +-
.../java/org/apache/iotdb/cluster/log/Log.java | 1 +
.../org/apache/iotdb/cluster/log/LogParser.java | 6 +
.../iotdb/cluster/log/logtypes/RemoveNodeLog.java | 9 +
.../cluster/log/snapshot/PullSnapshotTask.java | 9 +-
.../cluster/partition/SlotPartitionTable.java | 3 +-
.../cluster/query/manage/ClusterQueryManager.java | 11 ++
.../iotdb/cluster/server/DataClusterServer.java | 16 +-
.../iotdb/cluster/server/MetaClusterServer.java | 7 +-
.../apache/iotdb/cluster/server/NodeReport.java | 111 ++++++++++++
.../handlers/caller/AppendGroupEntryHandler.java | 5 +-
.../server/handlers/caller/HeartBeatHandler.java | 4 +-
.../cluster/server/heartbeat/HeartBeatThread.java | 1 +
.../server/heartbeat/MetaHeartBeatThread.java | 1 +
.../cluster/server/member/DataGroupMember.java | 52 ++++--
.../cluster/server/member/MetaGroupMember.java | 70 +++++++-
.../iotdb/cluster/server/member/RaftMember.java | 28 +--
.../apache/iotdb/cluster/log/LogParserTest.java | 15 ++
.../log/log/snapshot/PullSnapshotTaskTest.java | 5 +-
.../cluster/log/logtypes/SerializeLogTest.java | 26 +++
.../log/partition/SlotPartitionTableTest.java | 32 +++-
.../cluster/log/snapshot/PullSnapshotTaskTest.java | 97 ----------
.../log/snapshot/SnapshotSerializeTest.java | 195 ---------------------
.../cluster/server/member/DataGroupMemberTest.java | 84 ++++++++-
.../iotdb/cluster/server/member/MemberTest.java | 12 +-
.../cluster/server/member/MetaGroupMemberTest.java | 174 +++++++++++++++++-
service-rpc/src/main/thrift/cluster.thrift | 11 ++
30 files changed, 627 insertions(+), 363 deletions(-)
diff --git a/cluster/partitions.tmp b/cluster/partitions.tmp
new file mode 100644
index 0000000..569a578
Binary files /dev/null and b/cluster/partitions.tmp differ
diff --git a/cluster/src/assembly/resources/conf/logback.xml b/cluster/src/assembly/resources/conf/logback.xml
index 4401a8b..37e8d32 100644
--- a/cluster/src/assembly/resources/conf/logback.xml
+++ b/cluster/src/assembly/resources/conf/logback.xml
@@ -159,4 +159,5 @@
<logger level="debug" name="org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter">
<appender-ref ref="FILE_DYNAMIC_PARAMETER"/>
</logger>
+ <logger level="info" name="org.apache.iotdb.cluster.server.heartbeat"/>
</configuration>
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 8e76928..bcd6690 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -115,7 +115,7 @@ public class ClusterMain {
logger.error("Cluster size is too small, cannot remove any node");
return;
} else if (response == Response.RESPONSE_REJECT) {
- logger.error("Node {} is not found in the cluster, please check", node);
+ logger.error("Node {} is not found in the cluster, please check", nodeToRemove);
return;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 576e373..e3e7537 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -40,7 +40,7 @@ public class ClusterConfig {
private int maxConcurrentClientNum = 1024;
@ClusterConsistent
- private int replicationNum = 2;
+ private int replicationNum = 3;
private int connectionTimeoutInMS = 20 * 1000;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 354d322..d84f315 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -41,6 +41,7 @@ public abstract class Log {
public enum Types {
// TODO-Cluster#348 support more logs
+ // DO CHECK LogParser when you add a new type of log
ADD_NODE, PHYSICAL_PLAN, CLOSE_FILE, REMOVE_NODE
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index 4dc708c..1bda735 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.log.Log.Types;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +75,11 @@ public class LogParser {
closeFileLog.deserialize(buffer);
log = closeFileLog;
break;
+ case REMOVE_NODE:
+ RemoveNodeLog removeNodeLog = new RemoveNodeLog();
+ removeNodeLog.deserialize(buffer);
+ log = removeNodeLog;
+ break;
default:
throw new IllegalArgumentException(type.toString());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 204e675..6a12af0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -18,6 +18,10 @@ public class RemoveNodeLog extends Log {
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
+ dataOutputStream.writeLong(getPreviousLogIndex());
+ dataOutputStream.writeLong(getPreviousLogTerm());
+ dataOutputStream.writeLong(getCurrLogIndex());
+ dataOutputStream.writeLong(getCurrLogTerm());
} catch (IOException e) {
// ignored
}
@@ -27,6 +31,11 @@ public class RemoveNodeLog extends Log {
@Override
public void deserialize(ByteBuffer buffer) {
+ setPreviousLogIndex(buffer.getLong());
+ setPreviousLogTerm(buffer.getLong());
+ setCurrLogIndex(buffer.getLong());
+ setCurrLogTerm(buffer.getLong());
+
removedNode = new Node();
SerializeUtils.deserialize(removedNode, buffer);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 3c00dd5..333c710 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -53,17 +53,23 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
private List<Node> previousHolders;
// the header of the old members
private Node header;
+ // set to true if the previous holder has been removed from the cluster.
+ // This will make the previous holder read-only so that different new
+ // replicas can pull the same snapshot.
+ private boolean requireReadOnly;
private PullSnapshotRequest request;
private SnapshotFactory snapshotFactory;
public PullSnapshotTask(Node header, List<Integer> slots,
- DataGroupMember newMember, List<Node> previousHolders, SnapshotFactory snapshotFactory) {
+ DataGroupMember newMember, List<Node> previousHolders, SnapshotFactory snapshotFactory,
+ boolean requireReadOnly) {
this.header = header;
this.slots = slots;
this.newMember = newMember;
this.previousHolders = previousHolders;
this.snapshotFactory = snapshotFactory;
+ this.requireReadOnly = requireReadOnly;
}
private boolean pullSnapshot(AtomicReference<Map<Integer, T>> snapshotRef, int nodeIndex)
@@ -103,6 +109,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Map<Intege
request = new PullSnapshotRequest();
request.setHeader(header);
request.setRequiredSlots(slots);
+ request.setRequireReadOnly(requireReadOnly);
AtomicReference<Map<Integer, T>> snapshotRef = new AtomicReference<>();
boolean finished = false;
int nodeIndex = -1;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
index b1d7e31..03c9593 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/SlotPartitionTable.java
@@ -317,7 +317,6 @@ public class SlotPartitionTable implements PartitionTable {
return previousNodeMap.get(node);
}
-
@Override
public List<Integer> getNodeSlots(Node header) {
return nodeSlotMap.get(header);
@@ -377,7 +376,6 @@ public class SlotPartitionTable implements PartitionTable {
} else {
PartitionGroup newGrp = getHeaderGroup(header);
localGroups.set(i, newGrp);
- result.setNewGroup(newGrp);
}
}
if (removedGroupIdx != -1) {
@@ -391,6 +389,7 @@ public class SlotPartitionTable implements PartitionTable {
Node header = nodeRing.get(headerNodeIdx);
PartitionGroup newGrp = getHeaderGroup(header);
localGroups.add(newGrp);
+ result.setNewGroup(newGrp);
}
// the slots movement is only done logically, the new node itself will pull data from the
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManager.java
index 3ec5863..7f7f2fb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/ClusterQueryManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.query.manage;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -94,4 +95,14 @@ public class ClusterQueryManager {
public IReaderByTimestamp getReaderByTimestamp(long readerId) {
return seriesReaderByTimestampMap.get(readerId);
}
+
+ public void endAllQueries() throws StorageEngineException {
+ for (Map<Long, RemoteQueryContext> contextMap : queryContextMap.values()) {
+ for (RemoteQueryContext context : contextMap.values()) {
+ QueryResourceManager.getInstance().endQuery(context.getQueryId());
+ }
+ }
+ seriesReaderByTimestampMap.clear();
+ seriesReaderMap.clear();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 3a89b66..27deddf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.server;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -46,6 +47,7 @@ import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncProcessor;
+import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -69,6 +71,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
public void addDataGroupMember(DataGroupMember dataGroupMember) {
+ DataGroupMember removedMember = headerGroupMap.remove(dataGroupMember.getHeader());
+ if (removedMember != null) {
+ removedMember.stop();
+ }
headerGroupMap.put(dataGroupMember.getHeader(), dataGroupMember);
}
@@ -319,6 +325,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
List<Integer> nodeSlots = partitionTable.getNodeSlots(dataGroupMember.getHeader());
dataGroupMember.removeLocalData(nodeSlots);
entryIterator.remove();
+ dataGroupMember.stop();
} else {
// the group should be updated and pull new slots from the removed node
dataGroupMember.removeNode(node, removalResult);
@@ -327,6 +334,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
PartitionGroup newGroup = removalResult.getNewGroup();
if (newGroup != null) {
+ logger.info("{} should join a new group {}", thisNode, newGroup);
try {
createNewMember(newGroup.getHeader());
} catch (NotInSameGroupException | TTransportException e) {
@@ -360,5 +368,11 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
dataGroupMember.pullNodeAdditionSnapshots(slots, thisNode);
}
-
+ public List<DataMemberReport> genMemberReports() {
+ List<DataMemberReport> dataMemberReports = new ArrayList<>();
+ for (DataGroupMember value : headerGroupMap.values()) {
+ dataMemberReports.add(value.genReport());
+ }
+ return dataMemberReports;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 61b1042..c3957b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -178,7 +178,12 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
}
@Override
- public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) throws TException {
+ public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
member.removeNode(node, resultHandler);
}
+
+ @Override
+ public void exile(AsyncMethodCallback<Void> resultHandler) {
+ member.exile(resultHandler);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
new file mode 100644
index 0000000..7dacae6
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
@@ -0,0 +1,111 @@
+package org.apache.iotdb.cluster.server;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+
+/**
+ * A node report collects the current runtime information of the local node, which contains:
+ * 1. The MetaMemberReport of the meta member.
+ * 2. The DataMemberReports of each data member.
+ */
+public class NodeReport {
+
+ private Node thisNode;
+ private MetaMemberReport metaMemberReport;
+ private List<DataMemberReport> dataMemberReportList;
+
+ public NodeReport(Node thisNode) {
+ this.thisNode = thisNode;
+ dataMemberReportList = new ArrayList<>();
+ }
+
+ public void setMetaMemberReport(
+ MetaMemberReport metaMemberReport) {
+ this.metaMemberReport = metaMemberReport;
+ }
+
+ public void setDataMemberReportList(
+ List<DataMemberReport> dataMemberReportList) {
+ this.dataMemberReportList = dataMemberReportList;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("Report of ").append(thisNode).append(System.lineSeparator());
+ stringBuilder.append(metaMemberReport).append(System.lineSeparator());
+ for (DataMemberReport dataMemberReport : dataMemberReportList) {
+ stringBuilder.append(dataMemberReport).append(System.lineSeparator());
+ }
+ return stringBuilder.toString();
+ }
+
+ /**
+ * A RaftMemberReport contains the character, leader, term, last log term/index of a raft member.
+ */
+ public static class RaftMemberReport {
+ NodeCharacter character;
+ Node leader;
+ long term;
+ long lastLogTerm;
+ long lastLogIndex;
+ boolean isReadOnly;
+
+ public RaftMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
+ long lastLogIndex, boolean isReadOnly) {
+ this.character = character;
+ this.leader = leader;
+ this.term = term;
+ this.lastLogTerm = lastLogTerm;
+ this.lastLogIndex = lastLogIndex;
+ this.isReadOnly = isReadOnly;
+ }
+ }
+
+ public static class MetaMemberReport extends RaftMemberReport {
+
+ public MetaMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
+ long lastLogIndex, boolean isReadOnly) {
+ super(character, leader, term, lastLogTerm, lastLogIndex, isReadOnly);
+ }
+
+ @Override
+ public String toString() {
+ return "MetaMemberReport{" +
+ "character=" + character +
+ ", Leader=" + leader +
+ ", term=" + term +
+ ", lastLogTerm=" + lastLogTerm +
+ ", lastLogIndex=" + lastLogIndex +
+ ", readOnly=" + isReadOnly +
+ '}';
+ }
+ }
+
+ /**
+ * A DataMemberReport additionally contains header.
+ */
+ public static class DataMemberReport extends RaftMemberReport {
+ Node header;
+
+ public DataMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm,
+ long lastLogIndex, Node header, boolean isReadOnly) {
+ super(character, leader, term, lastLogTerm, lastLogIndex, isReadOnly);
+ this.header = header;
+ }
+
+ @Override
+ public String toString() {
+ return "DataMemberReport{" +
+ "header=" + header +
+ ", character=" + character +
+ ", Leader=" + leader +
+ ", term=" + term +
+ ", lastLogTerm=" + lastLogTerm +
+ ", lastLogIndex=" + lastLogIndex +
+ ", readOnly=" + isReadOnly +
+ '}';
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
index 893bec5..07b8c76 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
@@ -119,9 +119,6 @@ public class AppendGroupEntryHandler implements AsyncMethodCallback<Long> {
@Override
public void onError(Exception exception) {
- synchronized (groupReceivedCounter) {
- logger.error("Cannot send the add node request to node {}", receiverNode, exception);
- groupReceivedCounter.notifyAll();
- }
+ logger.error("Cannot send the add node request to node {}", receiverNode, exception);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartBeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartBeatHandler.java
index 7834e4f..cf225db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartBeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartBeatHandler.java
@@ -49,7 +49,7 @@ public class HeartBeatHandler implements AsyncMethodCallback<HeartBeatResponse>
@Override
public void onComplete(HeartBeatResponse resp) {
- logger.debug("{}: Received a heartbeat response", memberName);
+ logger.trace("{}: Received a heartbeat response", memberName);
long followerTerm = resp.getTerm();
if (followerTerm == RESPONSE_AGREE) {
// current leadership is still valid
@@ -60,7 +60,7 @@ public class HeartBeatHandler implements AsyncMethodCallback<HeartBeatResponse>
long lastLogTerm = resp.getLastLogTerm();
long localLastLogIdx = localMember.getLogManager().getLastLogIndex();
long localLastLogTerm = localMember.getLogManager().getLastLogTerm();
- logger.debug("{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
+ logger.trace("{}: Node {} is still alive, log index: {}/{}, log term: {}/{}",
memberName, follower, lastLogIdx
,localLastLogIdx, lastLogTerm, localLastLogTerm);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartBeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartBeatThread.java
index f96815d..6df5c4a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartBeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartBeatThread.java
@@ -178,6 +178,7 @@ public class HeartBeatThread implements Runnable {
requestVote(localMember.getAllNodes(), electionRequest, nextTerm, quorum,
electionTerminated, electionValid);
+ electionRequest.unsetLastLogIndex();
try {
logger.info("{}: Wait for {}ms until election time out", memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThread.java
index 735e0db..e969bbc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartBeatThread.java
@@ -58,5 +58,6 @@ public class MetaHeartBeatThread extends HeartBeatThread {
}
super.sendHeartbeat(node, client);
+ request.unsetPartitionTableBytes();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 305158a..cb01ec5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
@@ -144,6 +145,11 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
super.stop();
pullSnapshotService.shutdownNow();
pullSnapshotService = null;
+ try {
+ queryManager.endAllQueries();
+ } catch (StorageEngineException e) {
+ logger.error("Cannot release queries of {}", name, e);
+ }
}
/**
@@ -452,29 +458,26 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
}
return;
}
+ if (request.isRequireReadOnly()) {
+ setReadOnly();
+ }
// this synchronized should work with the one in AppendEntry when a log is going to commit,
// which may prevent the newly arrived data from being invisible to the new header.
synchronized (logManager) {
List<Integer> requiredSlots = request.getRequiredSlots();
logger.debug("{}: {} slots are requested", name, requiredSlots.size());
- // check whether this slot is held by the node
- List<Integer> heldSlots = metaGroupMember.getPartitionTable().getNodeSlots(getHeader());
PullSnapshotResp resp = new PullSnapshotResp();
Map<Integer, ByteBuffer> resultMap = new HashMap<>();
logManager.takeSnapshot();
+ PartitionedSnapshot allSnapshot = (PartitionedSnapshot) logManager.getSnapshot();
for (int requiredSlot : requiredSlots) {
- if (!heldSlots.contains(requiredSlot)) {
- // logger.debug("{}: the required slot {} is not held by the node", name,
- // requiredSlot);
- continue;
- }
-
- PartitionedSnapshot allSnapshot = (PartitionedSnapshot) logManager.getSnapshot();
Snapshot snapshot = allSnapshot.getSnapshot(requiredSlot);
- resultMap.put(requiredSlot, snapshot.serialize());
+ if (snapshot != null) {
+ resultMap.put(requiredSlot, snapshot.serialize());
+ }
}
resp.setSnapshotBytes(resultMap);
logger.debug("{}: Sending {} snapshots to the requester", name, resultMap.size());
@@ -507,15 +510,23 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
for (Entry<Node, List<Integer>> entry : holderSlotsMap.entrySet()) {
Node node = entry.getKey();
List<Integer> nodeSlots = entry.getValue();
- pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node), nodeSlots);
+ pullFileSnapshot(metaGroupMember.getPartitionTable().getHeaderGroup(node), nodeSlots, false);
}
}
}
- private void pullFileSnapshot(PartitionGroup prevHolders, List<Integer> nodeSlots) {
+ /**
+ *
+ * @param prevHolders
+ * @param nodeSlots
+ * @param requireReadOnly set to true if the previous holder has been removed from the cluster.
+ * This will make the previous holder read-only so that different new
+ * replicas can pull the same snapshot.
+ */
+ private void pullFileSnapshot(PartitionGroup prevHolders, List<Integer> nodeSlots, boolean requireReadOnly) {
Future<Map<Integer, FileSnapshot>> snapshotFuture =
pullSnapshotService.submit(new PullSnapshotTask(prevHolders.getHeader(), nodeSlots, this,
- prevHolders, FileSnapshot::new));
+ prevHolders, FileSnapshot::new, requireReadOnly));
for (int slot : nodeSlots) {
logManager.setSnapshot(new RemoteFileSnapshot(snapshotFuture), slot);
}
@@ -759,7 +770,7 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
}
/**
- * When a node is removed and it is not the header of the group, the member should take over
+ * When a node is removed and IT IS NOT THE HEADER of the group, the member should take over
* some slots from the removed group, and add a new node to the group the removed node was in the
* group.
*/
@@ -768,13 +779,24 @@ public class DataGroupMember extends RaftMember implements TSDataService.AsyncIf
if (allNodes.contains(removedNode)) {
// update the group if the deleted node was in it
allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+ if (removedNode.equals(leader)) {
+ synchronized (term) {
+ setCharacter(NodeCharacter.ELECTOR);
+ setLastHeartBeatReceivedTime(Long.MIN_VALUE);
+ }
+ }
}
List<Integer> slotsToPull = removalResult.getNewSlotOwners().get(getHeader());
if (slotsToPull != null) {
// pull the slots that should be taken over
- pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull);
+ pullFileSnapshot(removalResult.getRemovedGroup(), slotsToPull, true);
}
}
}
+
+ public DataMemberReport genReport() {
+ return new DataMemberReport(character, leader, term.get(),
+ logManager.getLastLogTerm(), logManager.getLastLogIndex(), getHeader(), readOnly);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 5cdf481..9069a95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -35,6 +35,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -42,6 +43,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -80,6 +84,8 @@ import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
import org.apache.iotdb.cluster.server.ClientServer;
import org.apache.iotdb.cluster.server.DataClusterServer;
import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.NodeReport;
+import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler;
@@ -128,6 +134,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
private static final Logger logger = LoggerFactory.getLogger(MetaGroupMember.class);
private static final int DEFAULT_JOIN_RETRY = 10;
+ private static final int REPORT_INTERVAL_SEC = 10;
public static final int REPLICATION_NUM =
ClusterDescriptor.getINSTANCE().getConfig().getReplicationNum();
@@ -147,6 +154,8 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
private ClientPool dataClientPool;
+ private ScheduledExecutorService reportThread;
+
@TestOnly
public MetaGroupMember() {
}
@@ -220,6 +229,10 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
queryProcessor = new ClusterQueryParser(this);
QueryCoordinator.getINSTANCE().setMetaGroupMember(this);
StorageEngine.getInstance().setFileFlushPolicy(new ClusterFileFlushPolicy(this));
+ reportThread = Executors.newSingleThreadScheduledExecutor(n -> new Thread(n,
+ "NodeReportThread"));
+ reportThread.scheduleAtFixedRate(()-> logger.info(genNodeReport().toString()),
+ REPORT_INTERVAL_SEC, REPORT_INTERVAL_SEC, TimeUnit.SECONDS);
}
@Override
@@ -229,6 +242,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
getDataClusterServer().stop();
clientServer.stop();
}
+ reportThread.shutdownNow();
}
private void initSubServers() throws TTransportException, StartupException {
@@ -634,9 +648,7 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
// each node will form a group
int nodeSize = nodeRing.size();
int[] groupRemainings = new int[nodeSize];
- for (int i = 0; i < groupRemainings.length; i++) {
- groupRemainings[i] = groupQuorum;
- }
+ Arrays.fill(groupRemainings, groupQuorum);
AtomicLong newLeaderTerm = new AtomicLong(term.get());
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
@@ -1296,12 +1308,12 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
switch (result) {
case OK:
- logger.info("Join request of {} is accepted", target);
+ logger.info("Removal request of {} is accepted", target);
logManager.commitLog(removeNodeLog.getCurrLogIndex());
resultHandler.onComplete(Response.RESPONSE_AGREE);
return true;
case TIME_OUT:
- logger.info("Join request of {} timed out", target);
+ logger.info("Removal request of {} timed out", target);
resultHandler.onError(new RequestTimeOutException(removeNodeLog));
logManager.removeLastLog();
return true;
@@ -1326,11 +1338,57 @@ public class MetaGroupMember extends RaftMember implements TSMetaService.AsyncIf
NodeRemovalResult result = partitionTable.removeNode(oldNode);
getDataClusterServer().removeNode(oldNode, result);
- if (oldNode == leader) {
+ if (oldNode.equals(leader)) {
setCharacter(NodeCharacter.ELECTOR);
lastHeartBeatReceivedTime = Long.MIN_VALUE;
}
+
+ if (oldNode.equals(thisNode)) {
+ // use super.stop() so that the data server will not be closed
+ super.stop();
+ if (clientServer != null) {
+ clientServer.stop();
+ }
+ } else if (thisNode.equals(leader)) {
+ // as the old node is removed, it cannot know this by heartbeat, so it should be
+ // directly kicked out of the cluster
+ MetaClient metaClient = (MetaClient) connectNode(oldNode);
+ try {
+ metaClient.exile(new GenericHandler<>(oldNode, null));
+ } catch (TException e) {
+ logger.warn("Cannot inform {} its removal", oldNode, e);
+ }
+ }
+
+ savePartitionTable();
}
}
}
+
+ @Override
+ public void exile(AsyncMethodCallback<Void> resultHandler) {
+ applyRemoveNode(thisNode);
+ resultHandler.onComplete(null);
+ }
+
+ private MetaMemberReport genMemberReport() {
+ return new MetaMemberReport(character, leader, term.get(),
+ logManager.getLastLogTerm(), logManager.getLastLogIndex(), readOnly);
+ }
+
+ private NodeReport genNodeReport() {
+ NodeReport report = new NodeReport(thisNode);
+ report.setMetaMemberReport(genMemberReport());
+ report.setDataMemberReportList(dataClusterServer.genMemberReports());
+ return report;
+ }
+
+ @Override
+ public void setAllNodes(List<Node> allNodes) {
+ super.setAllNodes(allNodes);
+ idNodeMap = new HashMap<>();
+ for (Node node : allNodes) {
+ idNodeMap.put(node.getNodeIdentifier(), node);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f4789bf..ea021b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -25,10 +25,7 @@ import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -108,7 +105,7 @@ public abstract class RaftMember implements RaftService.AsyncIface {
// when the header of the group is removed from the cluster, the members of the group should no
// longer accept writes, but they still can be read candidates for weak consistency reads and
// provide snapshots for the new holders
- private volatile boolean readOnly = false;
+ volatile boolean readOnly = false;
public RaftMember() {
}
@@ -145,11 +142,12 @@ public abstract class RaftMember implements RaftService.AsyncIface {
catchUpService.shutdownNow();
catchUpService = null;
heartBeatService = null;
+ logger.info("{} stopped", name);
}
@Override
public void sendHeartBeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) {
- logger.debug("{} received a heartbeat", name);
+ logger.trace("{} received a heartbeat", name);
synchronized (term) {
long thisTerm = term.get();
long leaderTerm = request.getTerm();
@@ -158,8 +156,8 @@ public abstract class RaftMember implements RaftService.AsyncIface {
if (leaderTerm < thisTerm) {
// the leader is invalid
response.setTerm(thisTerm);
- if (logger.isDebugEnabled()) {
- logger.debug("{} received a heartbeat from a stale leader {}", name, request.getLeader());
+ if (logger.isTraceEnabled()) {
+ logger.trace("{} received a heartbeat from a stale leader {}", name, request.getLeader());
}
} else {
processValidHeartbeatReq(request, response);
@@ -184,8 +182,8 @@ public abstract class RaftMember implements RaftService.AsyncIface {
setCharacter(NodeCharacter.FOLLOWER);
}
setLastHeartBeatReceivedTime(System.currentTimeMillis());
- if (logger.isDebugEnabled()) {
- logger.debug("{} received heartbeat from a valid leader {}", name, request.getLeader());
+ if (logger.isTraceEnabled()) {
+ logger.trace("{} received heartbeat from a valid leader {}", name, request.getLeader());
}
}
resultHandler.onComplete(response);
@@ -445,8 +443,10 @@ public abstract class RaftMember implements RaftService.AsyncIface {
}
public void setLeader(Node leader) {
- logger.info("{} has become a {}", leader, character);
- this.leader = leader;
+ if (!Objects.equals(leader, this.leader)) {
+ logger.info("{} has become a follower of {}", getName(), leader);
+ this.leader = leader;
+ }
}
public Node getThisNode() {
@@ -813,4 +813,8 @@ public abstract class RaftMember implements RaftService.AsyncIface {
readOnly = true;
}
}
+
+ public void setAllNodes(List<Node> allNodes) {
+ this.allNodes = allNodes;
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index de2622e..cba67cc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.tsfile.read.common.Path;
import org.junit.Test;
@@ -77,4 +78,18 @@ public class LogParserTest {
Log serialized = logParser.parse(buffer);
assertEquals(log, serialized);
}
+
+ @Test
+ public void testRemoveNodeLog() throws UnknownLogTypeException {
+ RemoveNodeLog log = new RemoveNodeLog();
+ log.setRemovedNode(TestUtils.getNode(0));
+ log.setCurrLogIndex(8);
+ log.setCurrLogTerm(8);
+ log.setPreviousLogIndex(7);
+ log.setPreviousLogTerm(7);
+
+ ByteBuffer buffer = log.serialize();
+ Log serialized = logParser.parse(buffer);
+ assertEquals(log, serialized);
+ }
}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
index 16f1bab..55196a3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/log/snapshot/PullSnapshotTaskTest.java
@@ -90,8 +90,9 @@ public class PullSnapshotTaskTest {
slots.add(i);
previousHolders.add(TestUtils.getNode(i + 1));
}
- PullSnapshotTask<TestSnapshot> task = new PullSnapshotTask<>(TestUtils.getNode(0), slots,
- newMember, previousHolders, TestSnapshot::new);
+
+ PullSnapshotTask<TestSnapshot> task = new PullSnapshotTask<TestSnapshot>(TestUtils.getNode(0), slots,
+ newMember, previousHolders, TestSnapshot::new, false);
Map<Integer, TestSnapshot> result = task.call();
assertEquals(snapshotMap, result);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
index 0e4ac6f..e3f0fee 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
+import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogParser;
@@ -38,6 +39,10 @@ public class SerializeLogTest {
@Test
public void testPhysicalPlanLog() throws UnknownLogTypeException {
PhysicalPlanLog log = new PhysicalPlanLog();
+ log.setPreviousLogIndex(1);
+ log.setPreviousLogTerm(1);
+ log.setCurrLogIndex(2);
+ log.setCurrLogTerm(2);
InsertPlan plan = new InsertPlan();
plan.setDeviceId("root.d1");
plan.setMeasurements(new String[]{"s1,s2,s3"});
@@ -59,6 +64,10 @@ public class SerializeLogTest {
@Test
public void testAddNodeLog() throws UnknownLogTypeException {
AddNodeLog log = new AddNodeLog();
+ log.setPreviousLogIndex(1);
+ log.setPreviousLogTerm(1);
+ log.setCurrLogIndex(2);
+ log.setCurrLogTerm(2);
log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321));
ByteBuffer byteBuffer = log.serialize();
Log logPrime = LogParser.getINSTANCE().parse(byteBuffer);
@@ -68,6 +77,10 @@ public class SerializeLogTest {
@Test
public void testCloseFileLog() throws UnknownLogTypeException {
CloseFileLog log = new CloseFileLog("root.sg1", true);
+ log.setPreviousLogIndex(1);
+ log.setPreviousLogTerm(1);
+ log.setCurrLogIndex(2);
+ log.setCurrLogTerm(2);
ByteBuffer byteBuffer = log.serialize();
CloseFileLog logPrime = (CloseFileLog) LogParser.getINSTANCE().parse(byteBuffer);
assertTrue(logPrime.isSeq());
@@ -75,4 +88,17 @@ public class SerializeLogTest {
assertEquals(log, logPrime);
}
+ @Test
+ public void testRemoveNodeLog() throws UnknownLogTypeException {
+ RemoveNodeLog log = new RemoveNodeLog();
+ log.setPreviousLogIndex(1);
+ log.setPreviousLogTerm(1);
+ log.setCurrLogIndex(2);
+ log.setCurrLogTerm(2);
+ log.setRemovedNode(TestUtils.getNode(0));
+ ByteBuffer byteBuffer = log.serialize();
+ RemoveNodeLog logPrime = (RemoveNodeLog) LogParser.getINSTANCE().parse(byteBuffer);
+ assertEquals(log, logPrime);
+ }
+
}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
index 1ab526e..9bc08cf 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/partition/SlotPartitionTableTest.java
@@ -19,12 +19,16 @@
package org.apache.iotdb.cluster.log.partition;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -40,11 +44,15 @@ public class SlotPartitionTableTest {
@Before
public void setUp() {
List<Node> nodes = new ArrayList<>();
- IntStream.range(0, 20).forEach(i -> nodes.add(new Node("localhost", 30000 + i, i, 40000 + i)));
+ IntStream.range(0, 20).forEach(i -> nodes.add(getNode(i)));
ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(replica_size);
table = new SlotPartitionTable(nodes, nodes.get(3));
}
+ private Node getNode(int i) {
+ return new Node("localhost", 30000 + i, i, 40000 + i);
+ }
+
@After
public void tearDown() {
ClusterDescriptor.getINSTANCE().getConfig().setReplicationNum(3);
@@ -105,4 +113,26 @@ public class SlotPartitionTableTest {
@Test
public void getAllNodeSlots() {
}
+
+ @Test
+ public void testRemoveNode() {
+ List<Integer> nodeSlots = table.getNodeSlots(getNode(0));
+ NodeRemovalResult nodeRemovalResult = table.removeNode(getNode(0));
+ assertFalse(table.getAllNodes().contains(getNode(0)));
+ PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup();
+ for (int i = 0; i < 5; i++) {
+ assertTrue(removedGroup.contains(getNode(i)));
+ }
+ PartitionGroup newGroup = nodeRemovalResult.getNewGroup();
+ for (int i : new int[] {18, 19, 1, 2, 3}) {
+ assertTrue(newGroup.contains(getNode(i)));
+ }
+ // the slots owned by the removed one should be redistributed to other nodes
+ Map<Node, List<Integer>> newSlotOwners = nodeRemovalResult.getNewSlotOwners();
+ for (List<Integer> slots : newSlotOwners.values()) {
+ assertTrue(nodeSlots.containsAll(slots));
+ nodeSlots.removeAll(slots);
+ }
+ assertTrue(nodeSlots.isEmpty());
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
deleted file mode 100644
index 73c2cee..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.snapshot;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.iotdb.cluster.client.ClientPool;
-import org.apache.iotdb.cluster.client.DataClient;
-import org.apache.iotdb.cluster.common.TestSnapshot;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Snapshot;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
-import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.junit.Before;
-import org.junit.Test;
-
-public class PullSnapshotTaskTest {
-
- private ClientPool clientPool = new ClientPool(null);
- private Map<Integer, Snapshot> snapshotMap = new HashMap<>();
- private DataGroupMember newMember = new DataGroupMember() {
- @Override
- public AsyncClient connectNode(Node node) {
- try {
- return new DataClient(new TBinaryProtocol.Factory(), new TAsyncClientManager(),
- node, clientPool) {
- @Override
- public void pullSnapshot(PullSnapshotRequest request,
- AsyncMethodCallback<PullSnapshotResp> resultHandler) {
- new Thread(() -> {
- PullSnapshotResp resp = new PullSnapshotResp();
- Map<Integer, ByteBuffer> snapshotBytes = new HashMap<>();
- for (Entry<Integer, Snapshot> entry : snapshotMap.entrySet()) {
- snapshotBytes.put(entry.getKey(), entry.getValue().serialize());
- }
- resp.setSnapshotBytes(snapshotBytes);
- resultHandler.onComplete(resp);
- }).start();
- }
- };
- } catch (IOException e) {
- return null;
- }
- }
- };
-
- @Before
- public void setUp() {
- for (int i = 0; i < 10; i++) {
- snapshotMap.put(i, new TestSnapshot(i));
- }
- }
-
- @Test
- public void test() {
- List<Integer> slots = new ArrayList<>();
- List<Node> previousHolders = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- slots.add(i);
- previousHolders.add(TestUtils.getNode(i + 1));
- }
- PullSnapshotTask<TestSnapshot> task = new PullSnapshotTask<>(TestUtils.getNode(0), slots,
- newMember, previousHolders, TestSnapshot::new);
- Map<Integer, TestSnapshot> result = task.call();
- assertEquals(snapshotMap, result);
- }
-}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/SnapshotSerializeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/SnapshotSerializeTest.java
deleted file mode 100644
index c1eee48..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/SnapshotSerializeTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.log.snapshot;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.iotdb.cluster.common.TestSnapshot;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.log.Snapshot;
-import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.Test;
-
-public class SnapshotSerializeTest {
-
- @Test
- public void testFileSnapshot() {
- Set<MeasurementSchema> measurementSchemaList = new HashSet<>();
- FileSnapshot snapshot = new FileSnapshot();
- for (int i = 0; i < 10; i++) {
- measurementSchemaList.add(TestUtils.getTestSchema(i, i));
- TsFileResource tsFileResource = new TsFileResource(new File("TsFile" + i));
- tsFileResource.setHistoricalVersions(Collections.singleton((long) i));
- snapshot.addFile(tsFileResource, TestUtils.getNode(i));
- }
- snapshot.setTimeseriesSchemas(measurementSchemaList);
-
- ByteBuffer byteBuffer = snapshot.serialize();
- FileSnapshot deserializedSnapshot = new FileSnapshot();
- deserializedSnapshot.deserialize(byteBuffer);
- assertEquals(snapshot, deserializedSnapshot);
- }
-
- @Test
- public void testMetaSimpleSnapshot() {
- List<Log> logs = new ArrayList<>();
- List<String> sgs = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- MeasurementSchema schema = TestUtils.getTestSchema(i, 0);
- CreateTimeSeriesPlan createTimeSeriesPlan =
- new CreateTimeSeriesPlan(new Path(schema.getMeasurementId()), schema.getType(),
- schema.getEncodingType(), schema.getCompressor(), schema.getProps());
- PhysicalPlanLog log = new PhysicalPlanLog();
- log.setPlan(createTimeSeriesPlan);
- logs.add(log);
- log.setPreviousLogTerm(i - 1);
- log.setPreviousLogIndex(i - 1);
- log.setCurrLogIndex(i);
- log.setCurrLogTerm(i);
- sgs.add(TestUtils.getTestSg(i));
- }
- MetaSimpleSnapshot snapshot = new MetaSimpleSnapshot(logs, sgs);
- snapshot.setLastLogId(10);
- snapshot.setLastLogTerm(10);
-
- ByteBuffer byteBuffer = snapshot.serialize();
- MetaSimpleSnapshot deserializedSnapshot = new MetaSimpleSnapshot();
- deserializedSnapshot.deserialize(byteBuffer);
- assertEquals(snapshot, deserializedSnapshot);
- }
-
- @Test
- public void testPartitionedSnapshot() {
- PartitionedSnapshot<TestSnapshot> snapshot = new PartitionedSnapshot<>(TestSnapshot::new);
- snapshot.setLastLogTerm(10);
- snapshot.setLastLogId(10);
-
- Map<Integer, Snapshot> slotSnapshots = new HashMap<>();
- List<Integer> slots = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- TestSnapshot s = new TestSnapshot();
- slotSnapshots.put(i, s);
- snapshot.putSnapshot(i, s);
- slots.add(i);
- }
-
- for (Integer slot : slots) {
- TestSnapshot s = (TestSnapshot) snapshot.getSnapshot(slot);
- assertTrue(slotSnapshots.values().contains(s));
- }
-
- PartitionedSnapshot subSnapshots = snapshot.getSubSnapshots(slots);
- for (Integer slot : slots) {
- TestSnapshot s = (TestSnapshot) subSnapshots.getSnapshot(slot);
- assertTrue(slotSnapshots.values().contains(s));
- }
-
- ByteBuffer byteBuffer = snapshot.serialize();
- PartitionedSnapshot deserializedSnapshot = new PartitionedSnapshot(TestSnapshot::new);
- deserializedSnapshot.deserialize(byteBuffer);
- assertEquals(snapshot, deserializedSnapshot);
- }
-
- @Test
- public void testRemoteFileSnapshot() {
- AtomicBoolean remoteSnapshotGet = new AtomicBoolean(false);
- RemoteFileSnapshot snapshot = new RemoteFileSnapshot(
- new Future<Void>() {
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return false;
- }
-
- @Override
- public Void get() {
- remoteSnapshotGet.set(true);
- return null;
- }
-
- @Override
- public Void get(long timeout, TimeUnit unit) {
- remoteSnapshotGet.set(true);
- return null;
- }
- });
-
- assertNull(snapshot.serialize());
- snapshot.getRemoteSnapshot();
- assertTrue(remoteSnapshotGet.get());
- }
-
- @Test
- public void testSimpleSnapshot() {
- List<Log> logs = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- MeasurementSchema schema = TestUtils.getTestSchema(i, 0);
- CreateTimeSeriesPlan createTimeSeriesPlan =
- new CreateTimeSeriesPlan(new Path(schema.getMeasurementId()), schema.getType(),
- schema.getEncodingType(), schema.getCompressor(), schema.getProps());
- PhysicalPlanLog log = new PhysicalPlanLog();
- log.setPlan(createTimeSeriesPlan);
- logs.add(log);
- log.setPreviousLogTerm(i - 1);
- log.setPreviousLogIndex(i - 1);
- log.setCurrLogIndex(i);
- log.setCurrLogTerm(i);
- }
- SimpleSnapshot snapshot = new SimpleSnapshot(new ArrayList<>(logs.subList(0, 5)));
- snapshot.setLastLogId(10);
- snapshot.setLastLogTerm(10);
- for (Log log : logs.subList(5, logs.size())) {
- snapshot.add(log);
- }
-
- ByteBuffer byteBuffer = snapshot.serialize();
- SimpleSnapshot deserializedSnapshot = new SimpleSnapshot();
- deserializedSnapshot.deserialize(byteBuffer);
- assertEquals(snapshot, deserializedSnapshot);
- }
-}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 3caf096..52472c8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -33,8 +33,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.cluster.RemoteTsFileResource;
@@ -49,6 +51,7 @@ import org.apache.iotdb.cluster.log.manage.PartitionedSnapshotLogManager;
import org.apache.iotdb.cluster.log.snapshot.FileSnapshot;
import org.apache.iotdb.cluster.log.snapshot.PartitionedSnapshot;
import org.apache.iotdb.cluster.log.snapshot.RemoteFileSnapshot;
+import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -100,11 +103,12 @@ import org.junit.Test;
public class DataGroupMemberTest extends MemberTest {
private DataGroupMember dataGroupMember;
- private DataGroupMember.Factory factory;
private Map<Integer, FileSnapshot> snapshotMap;
private Map<Integer, RemoteFileSnapshot> receivedSnapshots;
+ private Set<Integer> pulledSnapshots;
private boolean hasInitialSnapshots;
private boolean enableSyncLeader;
+ private int numSlotsToPull = 2;
@Before
public void setUp() throws Exception {
@@ -117,9 +121,10 @@ public class DataGroupMemberTest extends MemberTest {
snapshotMap.put(i, fileSnapshot);
}
receivedSnapshots = new HashMap<>();
+ pulledSnapshots = new HashSet<>();
}
- private PartitionedSnapshotLogManager getLogManager() {
+ private PartitionedSnapshotLogManager getLogManager(PartitionGroup partitionGroup) {
return new TestPartitionedLogManager(new DataLogApplier(testMetaMember),
testMetaMember.getPartitionTable(), partitionGroup.getHeader(), FileSnapshot::new) {
@Override
@@ -141,8 +146,8 @@ public class DataGroupMemberTest extends MemberTest {
}
private DataGroupMember getDataGroupMember(Node node) throws IOException {
- return new DataGroupMember(new TCompactProtocol.Factory(), new PartitionGroup(partitionGroup)
- , node, getLogManager(),
+ PartitionGroup nodes = partitionTable.getHeaderGroup(node);
+ return new DataGroupMember(new TCompactProtocol.Factory(), nodes, node, getLogManager(nodes),
testMetaMember, new TAsyncClientManager()) {
@Override
public AsyncClient connectNode(Node node) {
@@ -173,7 +178,16 @@ public class DataGroupMemberTest extends MemberTest {
PullSnapshotResp resp = new PullSnapshotResp();
Map<Integer, ByteBuffer> snapshotBufferMap = new HashMap<>();
for (Integer requiredSlot : request.getRequiredSlots()) {
- snapshotBufferMap.put(requiredSlot, snapshotMap.get(requiredSlot).serialize());
+ FileSnapshot fileSnapshot = snapshotMap.get(requiredSlot);
+ if (fileSnapshot != null) {
+ snapshotBufferMap.put(requiredSlot, fileSnapshot.serialize());
+ }
+ synchronized (dataGroupMember) {
+ pulledSnapshots.add(requiredSlot);
+ if (pulledSnapshots.size() == numSlotsToPull) {
+ dataGroupMember.notifyAll();
+ }
+ }
}
resp.setSnapshotBytes(snapshotBufferMap);
resultHandler.onComplete(resp);
@@ -266,8 +280,8 @@ public class DataGroupMemberTest extends MemberTest {
assertTrue(lastMember.addNode(newNodeInGroup));
Node newNodeAfterGroup = TestUtils.getNode(101);
- assertFalse(firstMember.addNode(newNodeInGroup));
- assertFalse(midMember.addNode(newNodeInGroup));
+ assertFalse(firstMember.addNode(newNodeAfterGroup));
+ assertFalse(midMember.addNode(newNodeAfterGroup));
}
@Test
@@ -827,4 +841,60 @@ public class DataGroupMemberTest extends MemberTest {
return resource;
}
+ @Test
+ public void testRemoveLeader() throws TTransportException, InterruptedException {
+ Node nodeToRemove = TestUtils.getNode(10);
+ NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable()
+ .removeNode(nodeToRemove);
+ dataGroupMember.setLeader(nodeToRemove);
+ dataGroupMember.start();
+ numSlotsToPull = 2;
+
+ try {
+ synchronized (dataGroupMember) {
+ dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
+ dataGroupMember.wait(500);
+ }
+
+ assertEquals(NodeCharacter.ELECTOR, dataGroupMember.getCharacter());
+ assertEquals(Long.MIN_VALUE, dataGroupMember.getLastHeartBeatReceivedTime());
+ assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
+ assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove));
+ List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners().get(TestUtils.getNode(0));
+ assertEquals(newSlots.size(), pulledSnapshots.size());
+ for (Integer newSlot : newSlots) {
+ assertTrue(pulledSnapshots.contains(newSlot));
+ }
+ } finally {
+ dataGroupMember.stop();
+ }
+ }
+
+ @Test
+ public void testRemoveNonLeader() throws TTransportException, InterruptedException {
+ Node nodeToRemove = TestUtils.getNode(10);
+ NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable()
+ .removeNode(nodeToRemove);
+ dataGroupMember.setLeader(TestUtils.getNode(20));
+ dataGroupMember.start();
+ numSlotsToPull = 2;
+
+ try {
+ synchronized (dataGroupMember) {
+ dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
+ dataGroupMember.wait(500);
+ }
+
+ assertEquals(0, dataGroupMember.getLastHeartBeatReceivedTime());
+ assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
+ assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove));
+ List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners().get(TestUtils.getNode(0));
+ assertEquals(newSlots.size(), pulledSnapshots.size());
+ for (Integer newSlot : newSlots) {
+ assertTrue(pulledSnapshots.contains(newSlot));
+ }
+ } finally {
+ dataGroupMember.stop();
+ }
+ }
}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 9fcb55e..392c60f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -43,11 +43,10 @@ public class MemberTest {
MetaGroupMember testMetaMember;
LogManager metaLogManager;
PartitionTable partitionTable;
- PartitionGroup partitionGroup;
QueryProcessExecutor queryProcessExecutor;
private List<String> prevUrls;
- private List<Node> allNodes;
+ PartitionGroup allNodes;
@Before
public void setUp() throws Exception {
@@ -59,11 +58,6 @@ public class MemberTest {
}
ClusterDescriptor.getINSTANCE().getConfig().setSeedNodeUrls(testUrls);
- partitionGroup = new PartitionGroup();
- for (int i = 0; i < 100; i += 10) {
- partitionGroup.add(TestUtils.getNode(i));
- }
-
metaLogManager = new TestLogManager();
testMetaMember = new TestMetaGroupMember() {
@Override
@@ -72,8 +66,8 @@ public class MemberTest {
}
};
- allNodes = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
+ allNodes = new PartitionGroup();
+ for (int i = 0; i < 100; i += 10) {
allNodes.add(TestUtils.getNode(i));
}
partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0), 100);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 60e0f73..0e2bd42 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -37,9 +37,8 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.cluster.client.ClientPool;
@@ -48,6 +47,7 @@ import org.apache.iotdb.cluster.common.TestMetaClient;
import org.apache.iotdb.cluster.common.TestPartitionedLogManager;
import org.apache.iotdb.cluster.common.TestSnapshot;
import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
@@ -103,6 +103,7 @@ import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TCompactProtocol.Factory;
import org.apache.thrift.transport.TTransportException;
@@ -117,14 +118,16 @@ public class MetaGroupMemberTest extends MemberTest {
private DataClusterServer dataClusterServer;
private AtomicLong dummyResponse;
private boolean mockDataClusterServer;
+ private Node exiledNode;
@Before
public void setUp() throws Exception {
super.setUp();
dummyResponse = new AtomicLong(Response.RESPONSE_AGREE);
metaGroupMember = getMetaGroupMember(TestUtils.getNode(0));
+ metaGroupMember.setAllNodes(allNodes);
// a faked data member to respond requests
- dataGroupMember = getDataGroupMember(partitionGroup, TestUtils.getNode(0));
+ dataGroupMember = getDataGroupMember(allNodes, TestUtils.getNode(0));
dataGroupMember.setCharacter(LEADER);
dataClusterServer = new DataClusterServer(TestUtils.getNode(0),
new DataGroupMember.Factory(null, metaGroupMember, null, null) {
@@ -139,11 +142,12 @@ public class MetaGroupMemberTest extends MemberTest {
metaGroupMember.getThisNode().setNodeIdentifier(0);
mockDataClusterServer = false;
QueryCoordinator.getINSTANCE().setMetaGroupMember(metaGroupMember);
+ exiledNode = null;
}
private DataGroupMember getDataGroupMember(PartitionGroup group, Node node) {
return new DataGroupMember(null, group, node, new TestPartitionedLogManager(null,
- partitionTable, partitionGroup.getHeader(), TestSnapshot::new),
+ partitionTable, group.getHeader(), TestSnapshot::new),
metaGroupMember, null) {
@Override
public boolean syncLeader() {
@@ -250,8 +254,6 @@ public class MetaGroupMemberTest extends MemberTest {
@Override
public AsyncClient getClient(Node node) throws IOException {
return new TestDataClient(node) {
- private AtomicLong readerId = new AtomicLong();
- private Map<Long, IReaderByTimestamp> readerByTimestampMap = new HashMap<>();
@Override
public void querySingleSeries(SingleSeriesQueryRequest request,
@@ -378,6 +380,19 @@ public class MetaGroupMemberTest extends MemberTest {
public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
new Thread(() -> resultHandler.onComplete(new TNodeStatus())).start();
}
+
+ @Override
+ public void exile(AsyncMethodCallback<Void> resultHandler) {
+ new Thread(() -> exiledNode = node).start();
+ }
+
+ @Override
+ public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
+ new Thread(() -> {
+ metaGroupMember.applyRemoveNode(node);
+ resultHandler.onComplete(Response.RESPONSE_AGREE);
+ }).start();
+ }
};
} catch (IOException e) {
return null;
@@ -920,4 +935,151 @@ public class MetaGroupMemberTest extends MemberTest {
MetaGroupMember metaGroupMember = getMetaGroupMember(new Node());
assertEquals(100, metaGroupMember.getThisNode().getNodeIdentifier());
}
+
+ @Test
+ public void testRemoveNodeWithoutPartitionTable() throws InterruptedException {
+ metaGroupMember.setPartitionTable(null);
+ AtomicBoolean passed = new AtomicBoolean(false);
+ synchronized (metaGroupMember) {
+ metaGroupMember.removeNode(TestUtils.getNode(0), new AsyncMethodCallback<Long>() {
+ @Override
+ public void onComplete(Long aLong) {
+ synchronized (metaGroupMember) {
+ metaGroupMember.notifyAll();
+ }
+ }
+
+ @Override
+ public void onError(Exception e) {
+ new Thread(() -> {
+ synchronized (metaGroupMember) {
+ passed.set(e instanceof PartitionTableUnavailableException);
+ metaGroupMember.notifyAll();
+ }
+ }).start();
+ }
+ });
+ metaGroupMember.wait(500);
+ }
+
+ assertTrue(passed.get());
+ }
+
+ @Test
+ public void testRemoveThisNode() throws InterruptedException {
+ AtomicReference<Long> resultRef = new AtomicReference<>();
+ metaGroupMember.setLeader(metaGroupMember.getThisNode());
+ metaGroupMember.setCharacter(LEADER);
+ doRemoveNode(resultRef, metaGroupMember.getThisNode());
+ assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+ assertFalse(metaGroupMember.getAllNodes().contains(metaGroupMember.getThisNode()));
+ }
+
+ @Test
+ public void testRemoveLeader() throws InterruptedException {
+ AtomicReference<Long> resultRef = new AtomicReference<>();
+ metaGroupMember.setLeader(TestUtils.getNode(40));
+ metaGroupMember.setCharacter(FOLLOWER);
+ doRemoveNode(resultRef, TestUtils.getNode(40));
+ assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+ assertFalse(metaGroupMember.getAllNodes().contains(TestUtils.getNode(40)));
+ assertEquals(ELECTOR, metaGroupMember.getCharacter());
+ assertEquals(Long.MIN_VALUE, metaGroupMember.getLastHeartBeatReceivedTime());
+ }
+
+ @Test
+ public void testRemoveNonLeader() throws InterruptedException {
+ AtomicReference<Long> resultRef = new AtomicReference<>();
+ metaGroupMember.setLeader(TestUtils.getNode(40));
+ metaGroupMember.setCharacter(FOLLOWER);
+ doRemoveNode(resultRef, TestUtils.getNode(20));
+ assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+ assertFalse(metaGroupMember.getAllNodes().contains(TestUtils.getNode(20)));
+ assertEquals(0, metaGroupMember.getLastHeartBeatReceivedTime());
+ }
+
+ @Test
+ public void testRemoveNodeAsLeader() throws InterruptedException {
+ AtomicReference<Long> resultRef = new AtomicReference<>();
+ metaGroupMember.setLeader(metaGroupMember.getThisNode());
+ metaGroupMember.setCharacter(LEADER);
+ doRemoveNode(resultRef, TestUtils.getNode(20));
+ assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+ assertFalse(metaGroupMember.getAllNodes().contains(TestUtils.getNode(20)));
+ assertEquals(TestUtils.getNode(20), exiledNode);
+ }
+
+ @Test
+ public void testRemoveNonExistNode() throws InterruptedException {
+ AtomicBoolean passed = new AtomicBoolean(false);
+ metaGroupMember.setCharacter(LEADER);
+ metaGroupMember.setLeader(metaGroupMember.getThisNode());
+ synchronized (metaGroupMember) {
+ metaGroupMember.removeNode(TestUtils.getNode(120), new AsyncMethodCallback<Long>() {
+ @Override
+ public void onComplete(Long aLong) {
+ synchronized (metaGroupMember) {
+ passed.set(aLong.equals(Response.RESPONSE_REJECT));
+ metaGroupMember.notifyAll();
+ }
+ }
+
+ @Override
+ public void onError(Exception e) {
+ new Thread(() -> {
+ synchronized (metaGroupMember) {
+ e.printStackTrace();
+ metaGroupMember.notifyAll();
+ }
+ }).start();
+ }
+ });
+ metaGroupMember.wait(500);
+ }
+
+ assertTrue(passed.get());
+ }
+
+ @Test
+ public void testRemoveTooManyNodes() throws InterruptedException {
+ for (int i = 0; i < 7; i ++) {
+ AtomicReference<Long> resultRef = new AtomicReference<>();
+ metaGroupMember.setCharacter(LEADER);
+ doRemoveNode(resultRef, TestUtils.getNode(90 - i * 10));
+ assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
+ assertFalse(metaGroupMember.getAllNodes().contains(TestUtils.getNode(90 - i * 10)));
+ }
+ AtomicReference<Long> resultRef = new AtomicReference<>();
+ metaGroupMember.setCharacter(LEADER);
+ doRemoveNode(resultRef, TestUtils.getNode(20));
+ assertEquals(Response.RESPONSE_CLUSTER_TOO_SMALL, (long) resultRef.get());
+ assertTrue(metaGroupMember.getAllNodes().contains(TestUtils.getNode(20)));
+ }
+
+ private void doRemoveNode(AtomicReference<Long> resultRef, Node nodeToRemove) throws InterruptedException {
+ synchronized (resultRef) {
+ metaGroupMember.removeNode(nodeToRemove, new AsyncMethodCallback<Long>() {
+ @Override
+ public void onComplete(Long o) {
+ new Thread(() -> {
+ synchronized (resultRef) {
+ resultRef.set(o);
+ resultRef.notifyAll();
+ }
+ }).start();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ new Thread(() -> {
+ synchronized (resultRef) {
+ e.printStackTrace();
+ resultRef.notifyAll();
+ }
+ }).start();
+ }
+ });
+ resultRef.wait(500);
+ }
+ }
}
\ No newline at end of file
diff --git a/service-rpc/src/main/thrift/cluster.thrift b/service-rpc/src/main/thrift/cluster.thrift
index f3b4b3d..4195289 100644
--- a/service-rpc/src/main/thrift/cluster.thrift
+++ b/service-rpc/src/main/thrift/cluster.thrift
@@ -113,6 +113,10 @@ struct PullSnapshotRequest {
1: required list<int> requiredSlots
// for data group
2: optional Node header
+ // set to true if the previous holder has been removed from the cluster.
+ // This will make the previous holder read-only so that different new
+ // replicas can pull the same snapshot.
+ 3: required bool requireReadOnly
}
struct PullSnapshotResp {
@@ -278,6 +282,13 @@ service TSMetaService extends RaftService {
**/
long removeNode(1: Node node)
+ /**
+ * When a node is removed from the cluster, if it is not the meta leader, it cannot receive
+ * the commit command by heartbeat since it has been removed, so the leader should tell it
+ * directly that it is no longer in the cluster.
+ **/
+ void exile()
+
TNodeStatus queryNodeStatus()
Node checkAlive()