You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/01/08 03:21:41 UTC

[GitHub] [iotdb] jt2594838 commented on a change in pull request #2368: [IOTDB-1092] Support multi-raft for one data group

jt2594838 commented on a change in pull request #2368:
URL: https://github.com/apache/iotdb/pull/2368#discussion_r553690518



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
##########
@@ -53,7 +56,7 @@
 
   private boolean useAsyncApplier = true;
 
-  private int connectionTimeoutInMS = 20 * 1000;
+  private int connectionTimeoutInMS = 20_1000;

Review comment:
       It should be 20_000.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
##########
@@ -23,6 +23,7 @@
 import java.util.List;
 import org.apache.commons.collections4.map.MultiKeyMap;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;

Review comment:
       Now you have this RaftNode, maybe it is better to change parameters from Node and raftId to RaftNode for simplicity.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
##########
@@ -143,11 +146,12 @@ private void parseRemoved(String[] split) {
     }
     DataGroupMember member = memberFactory.create(partitionGroup, thisNode);
     member.setReadOnly();
-    removedMemberMap.put(partitionGroup.getHeader(), member);
+    //TODO CORRECT
+    removedMemberMap.put(new RaftNode(partitionGroup.getHeader(), 0), member);

Review comment:
       Please finish this by serializing and deserializing raftId in the log.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
##########
@@ -21,45 +21,60 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 
 /**
  * PartitionGroup contains all the nodes that will form a data group with a certain node, which are
- * the REPLICATION_NUM - 1 different physical nodes after this node.
- * The first element of the list is called header, which is also the identifier of the data group.
+ * the REPLICATION_NUM - 1 different physical nodes after this node. The first element of the list
+ * is called header, which is also the identifier of the data group.
  */
 public class PartitionGroup extends ArrayList<Node> {
 
-  private Node thisNode;
+  private int id;

Review comment:
       Please add some comments to explain this id.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
##########
@@ -266,33 +286,52 @@ public NodeAdditionResult addNode(Node node) {
   /**
    * Move last slots from each group whose slot number is bigger than the new average to the new
    * node.
+   *
    * @param newNode
    * @return a map recording what slots each group lost.
    */
-  private Map<Node, Set<Integer>> moveSlotsToNew(Node newNode) {
-    Map<Node, Set<Integer>> result = new HashMap<>();
+  private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode) {
+    Map<RaftNode, Set<Integer>> result = new HashMap<>();
     // as a node is added, the average slots for each node decrease
     // move the slots to the new node if any previous node have more slots than the new average
-    List<Integer> newSlots = new ArrayList<>();
-    Map<Integer, Node> previousHolders = new HashMap<>();
-    int newAvg = totalSlotNumbers / nodeRing.size();
-    for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
+    int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
+    int raftId = 0;
+    for(int i = 0 ; i < multiRaftFactor; i++) {
+      RaftNode raftNode = new RaftNode(newNode, i);
+      nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>());
+      previousNodeMap.putIfAbsent(raftNode, new HashMap<>());
+    }
+    for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
       List<Integer> slots = entry.getValue();
       int transferNum = slots.size() - newAvg;
       if (transferNum > 0) {
-        List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size());
-        newSlots.addAll(slotsToMove);
+        RaftNode curNode = new RaftNode(newNode, raftId);
+        int numToMove = transferNum;
+        if(raftId != multiRaftFactor - 1) {
+          numToMove = Math.min(numToMove, newAvg - nodeSlotMap.get(curNode).size());
+        }
+        List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size() - transferNum + numToMove);
+        nodeSlotMap.get(curNode).addAll(slotsToMove);
         for (Integer slot : slotsToMove) {
           // record what node previously hold the integer
-          previousHolders.put(slot, entry.getKey());
-          slotNodes[slot] = newNode;
+          previousNodeMap.get(curNode).put(slot, entry.getKey());
+          slotNodes[slot] = curNode;
         }
         result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove);
-        slotsToMove.clear();
+        transferNum -= numToMove;
+        if (transferNum > 0) {
+          curNode = new RaftNode(newNode, ++raftId);
+          slotsToMove = slots.subList(slots.size() - transferNum, slots.size());
+          nodeSlotMap.get(curNode).addAll(slotsToMove);
+          for (Integer slot : slotsToMove) {
+            // record what node previously hold the integer
+            previousNodeMap.get(curNode).put(slot, entry.getKey());
+            slotNodes[slot] = curNode;
+          }
+          result.get(entry.getKey()).addAll(slotsToMove);
+        }

Review comment:
       It seems `slotsToMove` is not removed from `slots`, so the moved slots will exist in both RaftNodes.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
##########
@@ -117,65 +118,81 @@ public void stop() {
    * @param dataGroupMember
    */
   public void addDataGroupMember(DataGroupMember dataGroupMember) {
-    DataGroupMember removedMember = headerGroupMap.remove(dataGroupMember.getHeader());
+    RaftNode raftNode = new RaftNode(dataGroupMember.getHeader(),
+        dataGroupMember.getRaftGroupId());
+    DataGroupMember removedMember = headerGroupMap.remove(raftNode);
     if (removedMember != null) {
       removedMember.stop();
-      asyncServiceMap.remove(dataGroupMember.getHeader());
-      syncServiceMap.remove(dataGroupMember.getHeader());
+      asyncServiceMap.remove(raftNode);
+      syncServiceMap.remove(raftNode);
     }
-    stoppedMemberManager.remove(dataGroupMember.getHeader());
+    stoppedMemberManager.remove(raftNode);
 
-    headerGroupMap.put(dataGroupMember.getHeader(), dataGroupMember);
+    headerGroupMap.put(raftNode, dataGroupMember);
   }
 
-  private <T> DataAsyncService getDataAsyncService(Node header,
+  private <T> DataAsyncService getDataAsyncService(Node node, int raftId,
       AsyncMethodCallback<T> resultHandler, Object request) {
-    return asyncServiceMap.computeIfAbsent(header, h -> {
-      DataGroupMember dataMember = getDataMember(h, resultHandler, request);
+      return getDataAsyncService(new RaftNode(node, raftId), resultHandler, request);
+  }
+
+  private <T> DataAsyncService getDataAsyncService(RaftNode raftNode,
+      AsyncMethodCallback<T> resultHandler, Object request) {
+    return asyncServiceMap.computeIfAbsent(raftNode, h -> {
+      DataGroupMember dataMember = getDataMember(raftNode, resultHandler, request);
       return dataMember != null ? new DataAsyncService(dataMember) : null;
     });
   }
 
-  private DataSyncService getDataSyncService(Node header) {
+  private DataSyncService getDataSyncService(Node header, int raftId) {
+    return getDataSyncService(new RaftNode(header, raftId));
+  }
+
+  private DataSyncService getDataSyncService(RaftNode header) {
     return syncServiceMap.computeIfAbsent(header, h -> {
-      DataGroupMember dataMember = getDataMember(h, null, null);
+      DataGroupMember dataMember = getDataMember(header, null, null);
       return dataMember != null ? new DataSyncService(dataMember) : null;
     });

Review comment:
       This may create an additional field in the anonymous class, better to use `h`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org