You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 08:09:42 UTC
[iotdb] 02/04: fix bugs of wrong previous groups,
pull snapshot from self and wrong remove local data
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 325e4f556cb3d520b6a365a89317e997291979c2
Author: lta <li...@163.com>
AuthorDate: Mon Jan 11 11:26:00 2021 +0800
fix bugs of wrong previous groups, pull snapshot from self and wrong remove local data
---
.../iotdb/cluster/log/snapshot/FileSnapshot.java | 6 +--
.../cluster/log/snapshot/PullSnapshotTask.java | 56 ++++++++++---------
.../iotdb/cluster/partition/NodeRemovalResult.java | 5 +-
.../iotdb/cluster/partition/PartitionTable.java | 6 +++
.../cluster/partition/slot/SlotPartitionTable.java | 59 ++++++++++++++------
.../iotdb/cluster/server/DataClusterServer.java | 30 ++++-------
.../cluster/server/member/DataGroupMember.java | 62 +++++++++++++++++-----
.../cluster/server/member/MetaGroupMember.java | 1 -
.../iotdb/cluster/server/member/RaftMember.java | 35 ++++++++++++
.../server/heartbeat/MetaHeartbeatThreadTest.java | 5 ++
thrift/src/main/thrift/cluster.thrift | 1 -
11 files changed, 185 insertions(+), 81 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index 9f1b562..b559879 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -227,12 +227,10 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
throw new SnapshotInstallationException(e);
}
- for (FileSnapshot value : snapshotMap.values()) {
- installFileSnapshotSchema(value);
- }
-
for (Entry<Integer, FileSnapshot> integerSnapshotEntry : snapshotMap.entrySet()) {
Integer slot = integerSnapshotEntry.getKey();
+ FileSnapshot snapshot = integerSnapshotEntry.getValue();
+ installFileSnapshotSchema(snapshot);
SlotStatus status = slotManager.getStatus(slot);
if (status == SlotStatus.PULLING) {
// as schemas are set, writes can proceed
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index 9dc6231..4a79485 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.log.Snapshot;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
@@ -163,30 +164,37 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
@Override
public Void call() {
- request = new PullSnapshotRequest();
- request.setHeader(descriptor.getPreviousHolders().getHeader());
- request.setRaftId(descriptor.getPreviousHolders().getId());
- request.setRequiredSlots(descriptor.getSlots());
- request.setRequireReadOnly(descriptor.isRequireReadOnly());
-
- boolean finished = false;
- int nodeIndex = -1;
- while (!finished) {
- try {
- // sequentially pick up a node that may have this slot
- nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
- finished = pullSnapshot(nodeIndex);
- if (!finished) {
- Thread
- .sleep(ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- finished = true;
- } catch (TException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
- descriptor.getPreviousHolders().get(nodeIndex), e);
+ // If this node is the member of previous holder, it's unnecessary to pull data again
+ if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) {
+ // inform the previous holders that one member has successfully pulled snapshot directly
+ newMember.registerPullSnapshotHint(descriptor);
+ } else {
+ request = new PullSnapshotRequest();
+ request.setHeader(descriptor.getPreviousHolders().getHeader());
+ request.setRaftId(descriptor.getPreviousHolders().getId());
+ request.setRequiredSlots(descriptor.getSlots());
+ request.setRequireReadOnly(descriptor.isRequireReadOnly());
+
+ boolean finished = false;
+ int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode());
+ while (!finished) {
+ try {
+ // sequentially pick up a node that may have this slot
+ nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size();
+ finished = pullSnapshot(nodeIndex);
+ if (!finished) {
+ Thread
+ .sleep(
+ ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ finished = true;
+ } catch (TException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(),
+ descriptor.getPreviousHolders().get(nodeIndex), e);
+ }
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 457af85..5493980 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.partition;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -26,10 +27,10 @@ import java.util.List;
*/
public class NodeRemovalResult {
- private List<PartitionGroup> removedGroupList;
+ private List<PartitionGroup> removedGroupList = new ArrayList<>();
// if the removed group contains the local node, the local node should join a new group to
// preserve the replication number
- private List<PartitionGroup> newGroupList;
+ private List<PartitionGroup> newGroupList = new ArrayList<>();
public PartitionGroup getRemovedGroup(int raftId) {
for (PartitionGroup group : removedGroupList) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index bd8e518..079aad1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -95,6 +95,12 @@ public interface PartitionTable {
List<PartitionGroup> getGlobalGroups();
/**
+ * Judge whether the data of slot is held by node
+ * @param node target node
+ */
+ boolean judgeHoldSlot(Node node, int slot);
+
+ /**
* @param path can be an incomplete path (but should contain a storage group name) e.g., if
* "root.sg" is a storage group, then path can not be "root".
* @param timestamp
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index f8f89b9..2a5ae3c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -62,7 +62,7 @@ public class SlotPartitionTable implements PartitionTable {
private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM];
// the nodes that each slot belongs to before a new node is added, used for the new node to
// find the data source
- private Map<RaftNode, Map<Integer, RaftNode>> previousNodeMap = new ConcurrentHashMap<>();
+ private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>();
//the filed is used for determining which nodes need to be a group.
// the data groups which this node belongs to.
@@ -164,8 +164,7 @@ public class SlotPartitionTable implements PartitionTable {
return ret;
}
- @Override
- public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+ private PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) {
PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
// assuming the nodes are [1,2,3,4,5]
@@ -187,6 +186,11 @@ public class SlotPartitionTable implements PartitionTable {
}
@Override
+ public PartitionGroup getHeaderGroup(RaftNode raftNode) {
+ return getHeaderGroup(raftNode, this.nodeRing);
+ }
+
+ @Override
public PartitionGroup getHeaderGroup(Node node) {
return getHeaderGroup(new RaftNode(node, 0));
}
@@ -228,11 +232,13 @@ public class SlotPartitionTable implements PartitionTable {
@Override
public NodeAdditionResult addNode(Node node) {
+ List<Node> oldRing;
synchronized (nodeRing) {
if (nodeRing.contains(node)) {
return null;
}
+ oldRing = new ArrayList<>(nodeRing);
nodeRing.add(node);
nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
@@ -277,7 +283,7 @@ public class SlotPartitionTable implements PartitionTable {
// the slots movement is only done logically, the new node itself will pull data from the
// old node
- result.setLostSlots(moveSlotsToNew(node));
+ result.setLostSlots(moveSlotsToNew(node, oldRing));
return result;
}
@@ -290,7 +296,7 @@ public class SlotPartitionTable implements PartitionTable {
* @param newNode
* @return a map recording what slots each group lost.
*/
- private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode) {
+ private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode, List<Node> oldRing) {
Map<RaftNode, Set<Integer>> result = new HashMap<>();
// as a node is added, the average slots for each node decrease
// move the slots to the new node if any previous node have more slots than the new average
@@ -315,7 +321,7 @@ public class SlotPartitionTable implements PartitionTable {
nodeSlotMap.get(curNode).addAll(slotsToMove);
for (Integer slot : slotsToMove) {
// record what node previously hold the integer
- previousNodeMap.get(curNode).put(slot, entry.getKey());
+ previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
slotNodes[slot] = curNode;
}
result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
@@ -326,7 +332,7 @@ public class SlotPartitionTable implements PartitionTable {
nodeSlotMap.get(curNode).addAll(slotsToMove);
for (Integer slot : slotsToMove) {
// record what node previously hold the integer
- previousNodeMap.get(curNode).put(slot, entry.getKey());
+ previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing));
slotNodes[slot] = curNode;
}
result.get(entry.getKey()).addAll(slotsToMove);
@@ -357,15 +363,19 @@ public class SlotPartitionTable implements PartitionTable {
}
dataOutputStream.writeInt(previousNodeMap.size());
- for (Entry<RaftNode, Map<Integer, RaftNode>> nodeMapEntry : previousNodeMap.entrySet()) {
+ for (Entry<RaftNode, Map<Integer, PartitionGroup>> nodeMapEntry : previousNodeMap.entrySet()) {
dataOutputStream.writeInt(nodeMapEntry.getKey().getNode().getNodeIdentifier());
dataOutputStream.writeInt(nodeMapEntry.getKey().getRaftId());
- Map<Integer, RaftNode> prevHolders = nodeMapEntry.getValue();
+ Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue();
dataOutputStream.writeInt(prevHolders.size());
- for (Entry<Integer, RaftNode> integerNodeEntry : prevHolders.entrySet()) {
+ for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) {
dataOutputStream.writeInt(integerNodeEntry.getKey());
- dataOutputStream.writeInt(integerNodeEntry.getValue().getNode().getNodeIdentifier());
- dataOutputStream.writeInt(integerNodeEntry.getValue().getRaftId());
+ PartitionGroup group = integerNodeEntry.getValue();
+ dataOutputStream.writeInt(group.getId());
+ dataOutputStream.writeInt(group.size());
+ for (Node node : group) {
+ dataOutputStream.writeInt(node.getNodeIdentifier());
+ }
}
}
@@ -402,12 +412,16 @@ public class SlotPartitionTable implements PartitionTable {
int nodeId = buffer.getInt();
RaftNode node = new RaftNode(idNodeMap.get(nodeId), buffer.getInt());
- Map<Integer, RaftNode> prevHolders = new HashMap<>();
+ Map<Integer, PartitionGroup> prevHolders = new HashMap<>();
int holderNum = buffer.getInt();
for (int i1 = 0; i1 < holderNum; i1++) {
int slot = buffer.getInt();
- RaftNode holder = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt());
- prevHolders.put(slot, holder);
+ PartitionGroup group = new PartitionGroup(buffer.getInt());
+ int nodeNum = buffer.getInt();
+ for (int i2 = 0 ; i2 < nodeNum; i2++) {
+ group.add(idNodeMap.get(buffer.getInt()));
+ }
+ prevHolders.put(slot, group);
}
previousNodeMap.put(node, prevHolders);
}
@@ -429,7 +443,7 @@ public class SlotPartitionTable implements PartitionTable {
return nodeRing;
}
- public Map<Integer, RaftNode> getPreviousNodeMap(RaftNode raftNode) {
+ public Map<Integer, PartitionGroup> getPreviousNodeMap(RaftNode raftNode) {
return previousNodeMap.get(raftNode);
}
@@ -503,6 +517,12 @@ public class SlotPartitionTable implements PartitionTable {
// each node exactly joins replicationNum groups, so when a group is removed, the node
// should join a new one
int thisNodeIdx = nodeRing.indexOf(thisNode);
+
+ // check if this node is to be removed
+ if (thisNodeIdx == -1) {
+ continue;
+ }
+
// this node must be the last node of the new group
int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx;
@@ -531,7 +551,7 @@ public class SlotPartitionTable implements PartitionTable {
int slot = slots.get(i);
RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId);
slotNodes[slot] = newHolder;
- nodeSlotMap.get(newHolder).add(slot);
+ nodeSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot);
}
}
@@ -549,6 +569,11 @@ public class SlotPartitionTable implements PartitionTable {
}
}
+ @Override
+ public boolean judgeHoldSlot(Node node, int slot) {
+ return getHeaderGroup(slotNodes[slot]).contains(node);
+ }
+
private void calculateGlobalGroups() {
globalGroups = new ArrayList<>();
for (Node node : getAllNodes()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 81ae373..b023c36 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -526,9 +526,15 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
}
}
+ /**
+ * Make sure the group will not receive new raft logs
+ * @param header
+ * @param dataGroupMember
+ */
private void removeMember(RaftNode header, DataGroupMember dataGroupMember) {
- dataGroupMember.syncLeader();
+ dataGroupMember.getStopStatus().setSyncSuccess(dataGroupMember.syncLeader());
dataGroupMember.setReadOnly();
+ dataGroupMember.waitFollowersToSync();
dataGroupMember.stop();
stoppedMemberManager.put(header, dataGroupMember);
}
@@ -578,8 +584,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
/**
* Try removing a node from the groups of each DataGroupMember. If the node is the header of some
* group, set the member to read only so that it can still provide data for other nodes that has
- * not yet pulled its data. If the node is the local node, remove all members whose group is not
- * headed by this node. Otherwise, just change the node list of the member and pull new data. And
+ * not yet pulled its data. Otherwise, just change the node list of the member and pull new data. And
* create a new DataGroupMember if this node should join a new group because of this removal.
*
* @param node
@@ -591,25 +596,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
while (entryIterator.hasNext()) {
Entry<RaftNode, DataGroupMember> entry = entryIterator.next();
DataGroupMember dataGroupMember = entry.getValue();
- if (dataGroupMember.getHeader().equals(node)) {
- // the group is removed as the node is removed, so new writes should be rejected as
- // they belong to the new holder, but the member is kept alive for other nodes to pull
- // snapshots
+ if (dataGroupMember.getHeader().equals(node) || node.equals(thisNode)) {
entryIterator.remove();
removeMember(entry.getKey(), entry.getValue());
} else {
- if (node.equals(thisNode)) {
- // this node is removed, it is no more replica of other groups
- List<Integer> nodeSlots =
- ((SlotPartitionTable) partitionTable)
- .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId());
- dataGroupMember.removeLocalData(nodeSlots);
- entryIterator.remove();
- dataGroupMember.stop();
- } else {
- // the group should be updated and pull new slots from the removed node
- dataGroupMember.removeNode(node, removalResult);
- }
+ // the group should be updated and pull new slots from the removed node
+ dataGroupMember.removeNode(node, removalResult);
}
}
for (PartitionGroup newGroup : removalResult.getNewGroupList()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 2cd675f..4737520 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -402,16 +402,27 @@ public class DataGroupMember extends RaftMember {
* @param request
*/
public PullSnapshotResp getSnapshot(PullSnapshotRequest request) throws IOException {
- waitLeader();
- if (character != NodeCharacter.LEADER && !readOnly) {
- return null;
- }
// if the requester pulls the snapshots because the header of the group is removed, then the
// member should no longer receive new data
if (request.isRequireReadOnly()) {
setReadOnly();
}
+ boolean canGetSnapshot;
+ /**
+ * There are two conditions that can get snapshot:
+ * 1. The raft member is stopped and sync status is successful which means it has synced leader successfully before stop.
+ * 2. The raft member is not stopped and syncing leader is successful.
+ */
+ if (stopStatus.stop) {
+ canGetSnapshot = stopStatus.syncSuccess;
+ } else {
+ canGetSnapshot = syncLeader();
+ }
+ if (!canGetSnapshot) {
+ return null;
+ }
+
List<Integer> requiredSlots = request.getRequiredSlots();
for (Integer requiredSlot : requiredSlots) {
// wait if the data of the slot is in another node
@@ -467,28 +478,26 @@ public class DataGroupMember extends RaftMember {
synchronized (logManager) {
logger.info("{} pulling {} slots from remote", name, slots.size());
PartitionedSnapshot<Snapshot> snapshot = (PartitionedSnapshot) logManager.getSnapshot();
- Map<Integer, RaftNode> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
+ Map<Integer, PartitionGroup> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable())
.getPreviousNodeMap(new RaftNode(newNode, getRaftGroupId()));
// group the slots by their owners
- Map<RaftNode, List<Integer>> holderSlotsMap = new HashMap<>();
+ Map<PartitionGroup, List<Integer>> holderSlotsMap = new HashMap<>();
for (int slot : slots) {
// skip the slot if the corresponding data is already replicated locally
if (snapshot.getSnapshot(slot) == null) {
- RaftNode raftNode = prevHolders.get(slot);
- if (raftNode != null) {
- holderSlotsMap.computeIfAbsent(raftNode, n -> new ArrayList<>()).add(slot);
+ PartitionGroup group = prevHolders.get(slot);
+ if (group != null) {
+ holderSlotsMap.computeIfAbsent(group, n -> new ArrayList<>()).add(slot);
}
}
}
// pull snapshots from each owner's data group
- for (Entry<RaftNode, List<Integer>> entry : holderSlotsMap.entrySet()) {
- RaftNode raftNode = entry.getKey();
+ for (Entry<PartitionGroup, List<Integer>> entry : holderSlotsMap.entrySet()) {
List<Integer> nodeSlots = entry.getValue();
PullSnapshotTaskDescriptor taskDescriptor =
- new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable()
- .getHeaderGroup(raftNode), nodeSlots, false);
+ new PullSnapshotTaskDescriptor(entry.getKey(), nodeSlots, false);
pullFileSnapshot(taskDescriptor, null);
}
}
@@ -760,6 +769,27 @@ public class DataGroupMember extends RaftMember {
}
}
+ public void waitFollowersToSync() {
+ if (character != NodeCharacter.LEADER) {
+ return;
+ }
+ for (Map.Entry<Node, Peer> entry: peerMap.entrySet()) {
+ Node node = entry.getKey();
+ Peer peer = entry.getValue();
+ while (peer.getMatchIndex() < logManager.getCommitLogIndex()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("{}: Unexpected interruption when waiting follower {} to sync, raft id is {}",
+ name, node, getRaftGroupId());
+ }
+ }
+ logger.info("{}: Follower {} has synced with leader, raft id is {}", name, node,
+ getRaftGroupId());
+ }
+ }
+
/**
* Generate a report containing the character, leader, term, last log term, last log index, header
* and readOnly or not of this member.
@@ -800,6 +830,12 @@ public class DataGroupMember extends RaftMember {
public boolean onSnapshotInstalled(List<Integer> slots) {
List<Integer> removableSlots = new ArrayList<>();
for (Integer slot : slots) {
+ /**
+ * If this slot is just held by different raft groups in the same node, it should keep the data of slot.
+ */
+ if (metaGroupMember.getPartitionTable().judgeHoldSlot(thisNode, slot)) {
+ continue;
+ }
int sentReplicaNum = slotManager.sentOneReplication(slot);
if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) {
removableSlots.add(slot);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index d93efc0..7e73f61 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -382,7 +382,6 @@ public class MetaGroupMember extends RaftMember {
logger.error("Unexpected interruption when waiting for hardlinkCleaner to end", e);
}
}
-
logger.info("{}: stopped", name);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 3a5b51b..0526285 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -249,6 +249,8 @@ public abstract class RaftMember {
*/
private LogDispatcher logDispatcher;
+ protected StopStatus stopStatus;
+
protected RaftMember() {
}
@@ -260,6 +262,7 @@ public abstract class RaftMember {
this.asyncHeartbeatClientPool = asyncHeartbeatPool;
this.syncHeartbeatClientPool = syncHeartbeatPool;
this.asyncSendLogClientPool = asyncClientPool;
+ this.stopStatus = new StopStatus();
}
protected RaftMember(String name, AsyncClientPool asyncPool, SyncClientPool syncPool,
@@ -365,9 +368,11 @@ public abstract class RaftMember {
logger.error("Unexpected interruption when waiting for commitLogPool to end", e);
}
}
+ leader.set(ClusterConstant.EMPTY_NODE);
catchUpService = null;
heartBeatService = null;
appendLogThreadPool = null;
+ stopStatus.setStop(true);
logger.info("Member {} stopped", name);
}
@@ -801,6 +806,9 @@ public abstract class RaftMember {
* Wait until the leader of this node becomes known or time out.
*/
public void waitLeader() {
+ if (stopStatus.isStop()) {
+ return;
+ }
long startTime = System.currentTimeMillis();
while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
synchronized (waitLeaderCondition) {
@@ -1876,4 +1884,31 @@ public abstract class RaftMember {
OK, TIME_OUT, LEADERSHIP_STALE
}
+ public class StopStatus {
+
+ boolean stop;
+
+ boolean syncSuccess;
+
+ public boolean isStop() {
+ return stop;
+ }
+
+ public void setStop(boolean stop) {
+ this.stop = stop;
+ }
+
+ public boolean isSyncSuccess() {
+ return syncSuccess;
+ }
+
+ public void setSyncSuccess(boolean syncSuccess) {
+ this.syncSuccess = syncSuccess;
+ }
+ }
+
+ public StopStatus getStopStatus() {
+ return stopStatus;
+ }
+
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index e112d31..f6bb254 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -106,6 +106,11 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
public List<PartitionGroup> getGlobalGroups() {
return null;
}
+
+ @Override
+ public boolean judgeHoldSlot(Node node, int slot) {
+ return true;
+ }
};
@Override
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 9019680..2a24106 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -315,7 +315,6 @@ service RaftService {
**/
long requestCommitIndex(1:Node header, 2:int raftId)
-
/**
* Read a chunk of a file from the client. If the remaining of the file does not have enough
* bytes, only the remaining will be returned.