You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/03/01 08:34:27 UTC
[iotdb] branch cluster_scalability updated: This commit fix a
serious bug of abnormal cache of asyncServiceMap and syncServiceMap when
removing a node.
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_scalability by this push:
new 4d5ca80 This commit fix a serious bug of abnormal cache of asyncServiceMap and syncServiceMap when removing a node.
4d5ca80 is described below
commit 4d5ca80e721823af3a546662d4be2f61f5736445
Author: lta <li...@163.com>
AuthorDate: Mon Mar 1 16:32:06 2021 +0800
This commit fix a serious bug of abnormal cache of asyncServiceMap and syncServiceMap when removing a node.
---
.../iotdb/cluster/partition/PartitionGroup.java | 8 ++---
.../iotdb/cluster/server/DataClusterServer.java | 35 +++++++++++-----------
2 files changed, 21 insertions(+), 22 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index 0bb5005..f3106ef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -100,8 +100,8 @@ public class PartitionGroup extends ArrayList<Node> {
return id;
}
- @Override
- public String toString() {
- return String.format("PartitionGroup{id=%d, header=%s}", id, get(0));
- }
+// @Override
+// public String toString() {
+// return String.format("PartitionGroup{id=%d, header=%s}", id, get(0));
+// }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index ee026a4..2a0dc2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -120,17 +120,21 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
* @param dataGroupMember
*/
public void addDataGroupMember(DataGroupMember dataGroupMember) {
- RaftNode raftNode = new RaftNode(dataGroupMember.getHeader(),
+ RaftNode header = new RaftNode(dataGroupMember.getHeader(),
dataGroupMember.getRaftGroupId());
- DataGroupMember removedMember = headerGroupMap.remove(raftNode);
- if (removedMember != null) {
- removedMember.stop();
- asyncServiceMap.remove(raftNode);
- syncServiceMap.remove(raftNode);
+ if (headerGroupMap.containsKey(header)) {
+ logger.debug("group {} already exist.", dataGroupMember.getAllNodes());
+ return;
}
- stoppedMemberManager.remove(raftNode);
+ stoppedMemberManager.remove(header);
+ headerGroupMap.put(header, dataGroupMember);
+ resetServiceCache(header);
+ dataGroupMember.start();
+ }
- headerGroupMap.put(raftNode, dataGroupMember);
+ private void resetServiceCache(RaftNode header) {
+ asyncServiceMap.remove(header);
+ syncServiceMap.remove(header);
}
private <T> DataAsyncService getDataAsyncService(Node node, int raftId,
@@ -231,6 +235,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
// the two nodes are in the same group, create a new data member
member = dataMemberFactory.create(partitionGroup, thisNode);
headerGroupMap.put(raftNode, member);
+ stoppedMemberManager.remove(raftNode);
logger.info("Created a member for header {}, group is {}", raftNode, partitionGroup);
member.start();
} else {
@@ -559,7 +564,6 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
logger.info("Adding this node into a new group {}", newGroup);
DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode);
addDataGroupMember(dataGroupMember);
- dataGroupMember.start();
dataGroupMember.pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node,
newGroup.getId()), node);
}
@@ -578,7 +582,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
dataGroupMember.waitFollowersToSync();
dataGroupMember.stop();
stoppedMemberManager.put(header, dataGroupMember);
- logger.info("Data group member has removed, header is {}.", header);
+ logger.info("Data group member has removed, header {}, group is {}.", header,
+ dataGroupMember.getAllNodes());
}
/**
@@ -608,7 +613,6 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
logger.info("Building member of data group: {}", partitionGroup);
// no previous member or member changed
DataGroupMember dataGroupMember = dataMemberFactory.create(partitionGroup, thisNode);
- dataGroupMember.start();
// the previous member will be replaced here
addDataGroupMember(dataGroupMember);
dataGroupMember.setUnchanged(true);
@@ -666,13 +670,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
for (PartitionGroup group : partitionTable.getLocalGroups()) {
if (!headerGroupMap.containsKey(new RaftNode(group.getHeader(), group.getId()))) {
logger.info("{} should join a new group {}", thisNode, group);
- try {
- createNewMember(new RaftNode(group.getHeader(), group.getId()));
- } catch (NotInSameGroupException e) {
- // ignored
- } catch (CheckConsistencyException ce) {
- logger.error("remove node failed, error={}", ce.getMessage());
- }
+ DataGroupMember dataGroupMember = dataMemberFactory.create(group, thisNode);
+ addDataGroupMember(dataGroupMember);
}
}
}