You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/02/24 14:07:19 UTC
[iotdb] 02/02: This commit fix following issues: 1.
SlotPartitionTable serialization null pointer 2. AddNodeLog and
RemoveNodeLog serialization issue
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d84f0ff9d7907220bf2ac0b4cab8c184aaf2dd31
Author: lta <li...@163.com>
AuthorDate: Wed Feb 24 22:01:17 2021 +0800
This commit fix following issues:
1. SlotPartitionTable serialization null pointer
2. AddNodeLog and RemoveNodeLog serialization issue
---
.../iotdb/cluster/log/logtypes/AddNodeLog.java | 5 +++-
.../iotdb/cluster/log/logtypes/RemoveNodeLog.java | 5 +++-
.../iotdb/cluster/partition/NodeRemovalResult.java | 6 ++---
.../iotdb/cluster/partition/PartitionGroup.java | 10 ++++----
.../partition/slot/SlotNodeRemovalResult.java | 9 ++++---
.../cluster/partition/slot/SlotPartitionTable.java | 28 ++++++++++++----------
.../iotdb/cluster/server/DataClusterServer.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 3 +++
.../cluster/server/member/MetaGroupMember.java | 3 +--
.../iotdb/cluster/utils/SerializeUtilTest.java | 18 ++++++++++++++
10 files changed, 61 insertions(+), 28 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index 380ba08..8a2fcab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -68,6 +68,7 @@ public class AddNodeLog extends Log {
}
public ByteBuffer getPartitionTable() {
+ partitionTable.clear();
return partitionTable;
}
@@ -104,7 +105,9 @@ public class AddNodeLog extends Log {
SerializeUtils.deserialize(newNode, buffer);
int len = buffer.getInt();
- partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+ byte[] data = new byte[len];
+ System.arraycopy(buffer.array(), buffer.position(), data, 0, len);
+ partitionTable = ByteBuffer.wrap(data);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 22af482..4b147eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -54,6 +54,7 @@ public class RemoveNodeLog extends Log {
}
public ByteBuffer getPartitionTable() {
+ partitionTable.clear();
return partitionTable;
}
@@ -90,7 +91,9 @@ public class RemoveNodeLog extends Log {
SerializeUtils.deserialize(removedNode, buffer);
int len = buffer.getInt();
- partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+ byte[] data = new byte[len];
+ System.arraycopy(buffer.array(), buffer.position(), data, 0, len);
+ partitionTable = ByteBuffer.wrap(data);
}
public Node getRemovedNode() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 86ff9a2..16e25e2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -78,18 +78,18 @@ public class NodeRemovalResult {
}
}
- public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+ public void deserialize(ByteBuffer buffer) {
int removedGroupListSize = buffer.getInt();
for (int i = 0 ; i < removedGroupListSize; i++) {
PartitionGroup group = new PartitionGroup();
- group.deserialize(buffer, idNodeMap);
+ group.deserialize(buffer);
removedGroupList.add(group);
}
int newGroupListSize = buffer.getInt();
for (int i = 0 ; i < newGroupListSize; i++) {
PartitionGroup group = new PartitionGroup();
- group.deserialize(buffer, idNodeMap);
+ group.deserialize(buffer);
newGroupList.add(group);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index e7d039c..0bb5005 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -25,9 +25,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Map;
import java.util.Objects;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.db.utils.SerializeUtils;
/**
* PartitionGroup contains all the nodes that will form a data group with a certain node, which are
@@ -73,15 +73,17 @@ public class PartitionGroup extends ArrayList<Node> {
dataOutputStream.writeInt(getId());
dataOutputStream.writeInt(size());
for (Node node : this) {
- dataOutputStream.writeInt(node.getNodeIdentifier());
+ SerializeUtils.serialize(node, dataOutputStream);
}
}
- public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+ public void deserialize(ByteBuffer buffer) {
id = buffer.getInt();
int nodeNum = buffer.getInt();
for (int i2 = 0; i2 < nodeNum; i2++) {
- add(idNodeMap.get(buffer.getInt()));
+ Node node = new Node();
+ SerializeUtils.deserialize(node, buffer);
+ add(node);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
index a04a289..9a17ea3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.iotdb.cluster.partition.NodeRemovalResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.db.utils.SerializeUtils;
/**
* SlotNodeRemovalResult stores the removed partition group and who will take over its slots.
@@ -61,11 +62,13 @@ public class SlotNodeRemovalResult extends NodeRemovalResult {
}
@Override
- public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
- super.deserialize(buffer, idNodeMap);
+ public void deserialize(ByteBuffer buffer) {
+ super.deserialize(buffer);
int size = buffer.getInt();
for (int i = 0 ; i < size; i++) {
- RaftNode raftNode = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt());
+ Node node = new Node();
+ SerializeUtils.deserialize(node, buffer);
+ RaftNode raftNode = new RaftNode(node, buffer.getInt());
List<Integer> slots = new ArrayList<>();
int slotSize = buffer.getInt();
for (int j = 0 ; j < slotSize; j++) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 255fb22..8817b9f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -200,7 +200,7 @@ public class SlotPartitionTable implements PartitionTable {
// assuming the nodes are [1,2,3,4,5]
int nodeIndex = nodeRing.indexOf(raftNode.getNode());
if (nodeIndex == -1) {
- logger.error("Node {} is not in the cluster", raftNode.getNode());
+ logger.warn("Node {} is not in the cluster", raftNode.getNode());
return null;
}
int endIndex = nodeIndex + replicationNum;
@@ -307,7 +307,7 @@ public class SlotPartitionTable implements PartitionTable {
}
}
- calculateGlobalGroups(nodeRing);
+ globalGroups = calculateGlobalGroups(nodeRing);
// the slots movement is only done logically, the new node itself will pull data from the
// old node
@@ -354,7 +354,7 @@ public class SlotPartitionTable implements PartitionTable {
dataOutputStream.writeInt(previousNodeMap.size());
for (Entry<RaftNode, Map<Integer, PartitionGroup>> nodeMapEntry : previousNodeMap.entrySet()) {
- dataOutputStream.writeInt(nodeMapEntry.getKey().getNode().getNodeIdentifier());
+ SerializeUtils.serialize(nodeMapEntry.getKey().getNode(), dataOutputStream);
dataOutputStream.writeInt(nodeMapEntry.getKey().getRaftId());
Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue();
dataOutputStream.writeInt(prevHolders.size());
@@ -383,15 +383,15 @@ public class SlotPartitionTable implements PartitionTable {
logger.info("Initializing the partition table from buffer");
totalSlotNumbers = buffer.getInt();
int size = buffer.getInt();
- Map<Integer, Node> idNodeMap = new HashMap<>();
+ nodeSlotMap = new HashMap<>();
+ Node node;
for (int i = 0; i < size; i++) {
- Node node = new Node();
+ node = new Node();
SerializeUtils.deserialize(node, buffer);
RaftNode raftNode = new RaftNode(node, buffer.getInt());
List<Integer> slots = new ArrayList<>();
SerializeUtils.deserializeIntList(slots, buffer);
nodeSlotMap.put(raftNode, slots);
- idNodeMap.put(node.getNodeIdentifier(), node);
for (Integer slot : slots) {
slotNodes[slot] = raftNode;
}
@@ -400,22 +400,24 @@ public class SlotPartitionTable implements PartitionTable {
int prevNodeMapSize = buffer.getInt();
previousNodeMap = new HashMap<>();
for (int i = 0; i < prevNodeMapSize; i++) {
- int nodeId = buffer.getInt();
- RaftNode node = new RaftNode(idNodeMap.get(nodeId), buffer.getInt());
+ node = new Node();
+ SerializeUtils.deserialize(node, buffer);
+ RaftNode raftNode = new RaftNode(node, buffer.getInt());
Map<Integer, PartitionGroup> prevHolders = new HashMap<>();
int holderNum = buffer.getInt();
for (int i1 = 0; i1 < holderNum; i1++) {
PartitionGroup group = new PartitionGroup();
- group.deserialize(buffer, idNodeMap);
+ group.deserialize(buffer);
prevHolders.put(buffer.getInt(), group);
}
- previousNodeMap.put(node, prevHolders);
+ previousNodeMap.put(raftNode, prevHolders);
}
nodeRemovalResult = new NodeRemovalResult();
- nodeRemovalResult.deserialize(buffer, idNodeMap);
+ nodeRemovalResult.deserialize(buffer);
+ nodeRing.clear();
for (RaftNode raftNode : nodeSlotMap.keySet()) {
if (!nodeRing.contains(raftNode.getNode())) {
nodeRing.add(raftNode.getNode());
@@ -527,7 +529,7 @@ public class SlotPartitionTable implements PartitionTable {
result.addNewGroup(newGrp);
}
- calculateGlobalGroups(nodeRing);
+ globalGroups = calculateGlobalGroups(nodeRing);
// the slots movement is only done logically, the new node itself will pull data from the
// old node
@@ -563,7 +565,7 @@ public class SlotPartitionTable implements PartitionTable {
List<PartitionGroup> result = new ArrayList<>();
for (Node node : nodeRing) {
for (int i = 0; i < multiRaftFactor; i++) {
- result.add(getHeaderGroup(new RaftNode(node, i)));
+ result.add(getHeaderGroup(new RaftNode(node, i), nodeRing));
}
}
return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 454f5df..3b70208 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -628,7 +628,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
DataGroupMember dataGroupMember = entry.getValue();
if (dataGroupMember.getHeader().equals(node) || node.equals(thisNode)) {
entryIterator.remove();
- removeMember(entry.getKey(), entry.getValue());
+ removeMember(entry.getKey(), dataGroupMember);
} else {
// the group should be updated and pull new slots from the removed node
dataGroupMember.removeNode(node, removalResult);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 7b4f663..e0a382e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -752,6 +752,9 @@ public class DataGroupMember extends RaftMember {
if (allNodes.contains(removedNode)) {
// update the group if the deleted node was in it
PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
+ if (newGroup == null) {
+ return;
+ }
Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
allNodes.add(newNodeToGroup);
peerMap.putIfAbsent(newNodeToGroup, new Peer(logManager.getLastLogIndex()));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index dd5b419..ec4f0de 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1934,7 +1934,7 @@ public class MetaGroupMember extends RaftMember {
// node removal must be serialized to reduce potential concurrency problem
synchronized (logManager) {
// update partition table
- partitionTable.removeNode(node);
+ partitionTable.removeNode(target);
((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
RemoveNodeLog removeNodeLog = new RemoveNodeLog();
@@ -1986,7 +1986,6 @@ public class MetaGroupMember extends RaftMember {
if (allNodes.contains(oldNode)) {
allNodes.remove(oldNode);
idNodeMap.remove(oldNode.nodeIdentifier);
-
}
// save the updated partition table
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
index 134b2fe..bc855af 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -51,6 +52,23 @@ import org.junit.Test;
public class SerializeUtilTest {
@Test
+ public void testSlotPartitionTable() {
+ List<Node> nodes = new ArrayList<>();
+ nodes.add(TestUtils.getNode(0));
+ nodes.add(TestUtils.getNode(1));
+ nodes.add(TestUtils.getNode(2));
+ SlotPartitionTable slotPartitionTable1 = new SlotPartitionTable(nodes, TestUtils.getNode(0));
+ SlotPartitionTable slotPartitionTable2 = new SlotPartitionTable(nodes, TestUtils.getNode(0));
+ SlotPartitionTable slotPartitionTable3 = new SlotPartitionTable(nodes, TestUtils.getNode(0));
+ slotPartitionTable1.removeNode(TestUtils.getNode(0));
+ slotPartitionTable2.deserialize(slotPartitionTable1.serialize());
+ assertEquals(slotPartitionTable2, slotPartitionTable1);
+ slotPartitionTable1.addNode(TestUtils.getNode(0));
+ slotPartitionTable3.deserialize(slotPartitionTable1.serialize());
+ assertEquals(slotPartitionTable3, slotPartitionTable1);
+ }
+
+ @Test
public void testStrToNode() {
for (int i = 0; i < 10; i++) {
Node node = TestUtils.getNode(i);