You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/02/24 14:07:19 UTC

[iotdb] 02/02: This commit fix following issues: 1. SlotPartitionTable serialization null pointer 2. AddNodeLog and RemoveNodeLog serialization issue

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d84f0ff9d7907220bf2ac0b4cab8c184aaf2dd31
Author: lta <li...@163.com>
AuthorDate: Wed Feb 24 22:01:17 2021 +0800

    This commit fix following issues:
    1. SlotPartitionTable serialization null pointer
    2. AddNodeLog and RemoveNodeLog serialization issue
---
 .../iotdb/cluster/log/logtypes/AddNodeLog.java     |  5 +++-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  |  5 +++-
 .../iotdb/cluster/partition/NodeRemovalResult.java |  6 ++---
 .../iotdb/cluster/partition/PartitionGroup.java    | 10 ++++----
 .../partition/slot/SlotNodeRemovalResult.java      |  9 ++++---
 .../cluster/partition/slot/SlotPartitionTable.java | 28 ++++++++++++----------
 .../iotdb/cluster/server/DataClusterServer.java    |  2 +-
 .../cluster/server/member/DataGroupMember.java     |  3 +++
 .../cluster/server/member/MetaGroupMember.java     |  3 +--
 .../iotdb/cluster/utils/SerializeUtilTest.java     | 18 ++++++++++++++
 10 files changed, 61 insertions(+), 28 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index 380ba08..8a2fcab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -68,6 +68,7 @@ public class AddNodeLog extends Log {
   }
 
   public ByteBuffer getPartitionTable() {
+    partitionTable.clear();
     return partitionTable;
   }
 
@@ -104,7 +105,9 @@ public class AddNodeLog extends Log {
     SerializeUtils.deserialize(newNode, buffer);
 
     int len = buffer.getInt();
-    partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+    byte[] data = new byte[len];
+    System.arraycopy(buffer.array(), buffer.position(), data, 0, len);
+    partitionTable = ByteBuffer.wrap(data);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 22af482..4b147eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -54,6 +54,7 @@ public class RemoveNodeLog extends Log {
   }
 
   public ByteBuffer getPartitionTable() {
+    partitionTable.clear();
     return partitionTable;
   }
 
@@ -90,7 +91,9 @@ public class RemoveNodeLog extends Log {
     SerializeUtils.deserialize(removedNode, buffer);
 
     int len = buffer.getInt();
-    partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+    byte[] data = new byte[len];
+    System.arraycopy(buffer.array(), buffer.position(), data, 0, len);
+    partitionTable = ByteBuffer.wrap(data);
   }
 
   public Node getRemovedNode() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 86ff9a2..16e25e2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -78,18 +78,18 @@ public class NodeRemovalResult {
     }
   }
 
-  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+  public void deserialize(ByteBuffer buffer) {
     int removedGroupListSize = buffer.getInt();
     for (int i = 0 ; i < removedGroupListSize; i++) {
       PartitionGroup group = new PartitionGroup();
-      group.deserialize(buffer, idNodeMap);
+      group.deserialize(buffer);
       removedGroupList.add(group);
     }
 
     int newGroupListSize = buffer.getInt();
     for (int i = 0 ; i < newGroupListSize; i++) {
       PartitionGroup group = new PartitionGroup();
-      group.deserialize(buffer, idNodeMap);
+      group.deserialize(buffer);
       newGroupList.add(group);
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index e7d039c..0bb5005 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -25,9 +25,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Map;
 import java.util.Objects;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.db.utils.SerializeUtils;
 
 /**
  * PartitionGroup contains all the nodes that will form a data group with a certain node, which are
@@ -73,15 +73,17 @@ public class PartitionGroup extends ArrayList<Node> {
     dataOutputStream.writeInt(getId());
     dataOutputStream.writeInt(size());
     for (Node node : this) {
-      dataOutputStream.writeInt(node.getNodeIdentifier());
+      SerializeUtils.serialize(node, dataOutputStream);
     }
   }
 
-  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
+  public void deserialize(ByteBuffer buffer) {
     id = buffer.getInt();
     int nodeNum = buffer.getInt();
     for (int i2 = 0; i2 < nodeNum; i2++) {
-      add(idNodeMap.get(buffer.getInt()));
+      Node node = new Node();
+      SerializeUtils.deserialize(node, buffer);
+      add(node);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
index a04a289..9a17ea3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.db.utils.SerializeUtils;
 
 /**
  * SlotNodeRemovalResult stores the removed partition group and who will take over its slots.
@@ -61,11 +62,13 @@ public class SlotNodeRemovalResult extends NodeRemovalResult {
   }
 
   @Override
-  public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) {
-    super.deserialize(buffer, idNodeMap);
+  public void deserialize(ByteBuffer buffer) {
+    super.deserialize(buffer);
     int size = buffer.getInt();
     for (int i = 0 ; i < size; i++) {
-      RaftNode raftNode = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt());
+      Node node = new Node();
+      SerializeUtils.deserialize(node, buffer);
+      RaftNode raftNode = new RaftNode(node, buffer.getInt());
       List<Integer> slots = new ArrayList<>();
       int slotSize = buffer.getInt();
       for (int j = 0 ; j < slotSize; j++) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 255fb22..8817b9f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -200,7 +200,7 @@ public class SlotPartitionTable implements PartitionTable {
     // assuming the nodes are [1,2,3,4,5]
     int nodeIndex = nodeRing.indexOf(raftNode.getNode());
     if (nodeIndex == -1) {
-      logger.error("Node {} is not in the cluster", raftNode.getNode());
+      logger.warn("Node {} is not in the cluster", raftNode.getNode());
       return null;
     }
     int endIndex = nodeIndex + replicationNum;
@@ -307,7 +307,7 @@ public class SlotPartitionTable implements PartitionTable {
       }
     }
 
-    calculateGlobalGroups(nodeRing);
+    globalGroups = calculateGlobalGroups(nodeRing);
 
     // the slots movement is only done logically, the new node itself will pull data from the
     // old node
@@ -354,7 +354,7 @@ public class SlotPartitionTable implements PartitionTable {
 
       dataOutputStream.writeInt(previousNodeMap.size());
       for (Entry<RaftNode, Map<Integer, PartitionGroup>> nodeMapEntry : previousNodeMap.entrySet()) {
-        dataOutputStream.writeInt(nodeMapEntry.getKey().getNode().getNodeIdentifier());
+        SerializeUtils.serialize(nodeMapEntry.getKey().getNode(), dataOutputStream);
         dataOutputStream.writeInt(nodeMapEntry.getKey().getRaftId());
         Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue();
         dataOutputStream.writeInt(prevHolders.size());
@@ -383,15 +383,15 @@ public class SlotPartitionTable implements PartitionTable {
     logger.info("Initializing the partition table from buffer");
     totalSlotNumbers = buffer.getInt();
     int size = buffer.getInt();
-    Map<Integer, Node> idNodeMap = new HashMap<>();
+    nodeSlotMap = new HashMap<>();
+    Node node;
     for (int i = 0; i < size; i++) {
-      Node node = new Node();
+      node = new Node();
       SerializeUtils.deserialize(node, buffer);
       RaftNode raftNode = new RaftNode(node, buffer.getInt());
       List<Integer> slots = new ArrayList<>();
       SerializeUtils.deserializeIntList(slots, buffer);
       nodeSlotMap.put(raftNode, slots);
-      idNodeMap.put(node.getNodeIdentifier(), node);
       for (Integer slot : slots) {
         slotNodes[slot] = raftNode;
       }
@@ -400,22 +400,24 @@ public class SlotPartitionTable implements PartitionTable {
     int prevNodeMapSize = buffer.getInt();
     previousNodeMap = new HashMap<>();
     for (int i = 0; i < prevNodeMapSize; i++) {
-      int nodeId = buffer.getInt();
-      RaftNode node = new RaftNode(idNodeMap.get(nodeId), buffer.getInt());
+      node = new Node();
+      SerializeUtils.deserialize(node, buffer);
+      RaftNode raftNode = new RaftNode(node, buffer.getInt());
 
       Map<Integer, PartitionGroup> prevHolders = new HashMap<>();
       int holderNum = buffer.getInt();
       for (int i1 = 0; i1 < holderNum; i1++) {
         PartitionGroup group = new PartitionGroup();
-        group.deserialize(buffer, idNodeMap);
+        group.deserialize(buffer);
         prevHolders.put(buffer.getInt(), group);
       }
-      previousNodeMap.put(node, prevHolders);
+      previousNodeMap.put(raftNode, prevHolders);
     }
 
     nodeRemovalResult = new NodeRemovalResult();
-    nodeRemovalResult.deserialize(buffer, idNodeMap);
+    nodeRemovalResult.deserialize(buffer);
 
+    nodeRing.clear();
     for (RaftNode raftNode : nodeSlotMap.keySet()) {
       if (!nodeRing.contains(raftNode.getNode())) {
         nodeRing.add(raftNode.getNode());
@@ -527,7 +529,7 @@ public class SlotPartitionTable implements PartitionTable {
         result.addNewGroup(newGrp);
       }
 
-      calculateGlobalGroups(nodeRing);
+      globalGroups = calculateGlobalGroups(nodeRing);
 
       // the slots movement is only done logically, the new node itself will pull data from the
       // old node
@@ -563,7 +565,7 @@ public class SlotPartitionTable implements PartitionTable {
     List<PartitionGroup> result = new ArrayList<>();
     for (Node node : nodeRing) {
       for (int i = 0; i < multiRaftFactor; i++) {
-        result.add(getHeaderGroup(new RaftNode(node, i)));
+        result.add(getHeaderGroup(new RaftNode(node, i), nodeRing));
       }
     }
     return result;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 454f5df..3b70208 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -628,7 +628,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async
         DataGroupMember dataGroupMember = entry.getValue();
         if (dataGroupMember.getHeader().equals(node) || node.equals(thisNode)) {
           entryIterator.remove();
-          removeMember(entry.getKey(), entry.getValue());
+          removeMember(entry.getKey(), dataGroupMember);
         } else {
           // the group should be updated and pull new slots from the removed node
           dataGroupMember.removeNode(node, removalResult);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 7b4f663..e0a382e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -752,6 +752,9 @@ public class DataGroupMember extends RaftMember {
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
         PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId()));
+        if (newGroup == null) {
+          return;
+        }
         Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
         allNodes.add(newNodeToGroup);
         peerMap.putIfAbsent(newNodeToGroup, new Peer(logManager.getLastLogIndex()));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index dd5b419..ec4f0de 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1934,7 +1934,7 @@ public class MetaGroupMember extends RaftMember {
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
-      partitionTable.removeNode(node);
+      partitionTable.removeNode(target);
       ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
 
       RemoveNodeLog removeNodeLog = new RemoveNodeLog();
@@ -1986,7 +1986,6 @@ public class MetaGroupMember extends RaftMember {
         if (allNodes.contains(oldNode)) {
           allNodes.remove(oldNode);
           idNodeMap.remove(oldNode.nodeIdentifier);
-
         }
 
         // save the updated partition table
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
index 134b2fe..bc855af 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/SerializeUtilTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -51,6 +52,23 @@ import org.junit.Test;
 public class SerializeUtilTest {
 
   @Test
+  public void testSlotPartitionTable() {
+    List<Node> nodes = new ArrayList<>();
+    nodes.add(TestUtils.getNode(0));
+    nodes.add(TestUtils.getNode(1));
+    nodes.add(TestUtils.getNode(2));
+    SlotPartitionTable slotPartitionTable1 = new SlotPartitionTable(nodes, TestUtils.getNode(0));
+    SlotPartitionTable slotPartitionTable2 = new SlotPartitionTable(nodes, TestUtils.getNode(0));
+    SlotPartitionTable slotPartitionTable3 = new SlotPartitionTable(nodes, TestUtils.getNode(0));
+    slotPartitionTable1.removeNode(TestUtils.getNode(0));
+    slotPartitionTable2.deserialize(slotPartitionTable1.serialize());
+    assertEquals(slotPartitionTable2, slotPartitionTable1);
+    slotPartitionTable1.addNode(TestUtils.getNode(0));
+    slotPartitionTable3.deserialize(slotPartitionTable1.serialize());
+    assertEquals(slotPartitionTable3, slotPartitionTable1);
+  }
+
+  @Test
   public void testStrToNode() {
     for (int i = 0; i < 10; i++) {
       Node node = TestUtils.getNode(i);