You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/16 09:49:23 UTC

[iotdb] branch master updated: [IOTDB-2689] [IOTDB-2690] Simple Partition load balancing (#5910)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d4b5bc1ac [IOTDB-2689] [IOTDB-2690] Simple Partition load balancing (#5910)
2d4b5bc1ac is described below

commit 2d4b5bc1acb5c06c8bc3556aff1900ea17f3c31c
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Mon May 16 17:49:17 2022 +0800

    [IOTDB-2689] [IOTDB-2690] Simple Partition load balancing (#5910)
---
 .../iotdb/confignode/manager/PartitionManager.java |  57 +++++++++---
 .../confignode/persistence/PartitionInfo.java      | 101 ++++++++++++++++-----
 .../confignode/persistence/PartitionInfoTest.java  |  54 ++++++++---
 3 files changed, 167 insertions(+), 45 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index ee6e6f3520..5d90ab1551 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -42,15 +42,16 @@ import org.apache.iotdb.confignode.persistence.PartitionInfo;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 /** The PartitionManager Manages cluster PartitionTable read and write requests. */
 public class PartitionManager {
@@ -128,23 +129,37 @@ public class PartitionManager {
    */
   private Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> allocateSchemaPartition(
       Map<String, List<TSeriesPartitionSlot>> noAssignedSchemaPartitionSlotsMap) {
+
     Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> result = new HashMap<>();
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaMap =
+        partitionInfo.getRegionReplicaMap();
 
     for (String storageGroup : noAssignedSchemaPartitionSlotsMap.keySet()) {
+
       List<TSeriesPartitionSlot> noAssignedPartitionSlots =
           noAssignedSchemaPartitionSlotsMap.get(storageGroup);
-      List<TRegionReplicaSet> schemaRegionReplicaSets =
-          partitionInfo.getRegionReplicaSets(
+      // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
+      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+          partitionInfo.getSortedRegionSlotsCounter(
               getClusterSchemaManager()
                   .getRegionGroupIds(storageGroup, TConsensusGroupType.SchemaRegion));
-      Random random = new Random();
 
       Map<TSeriesPartitionSlot, TRegionReplicaSet> allocateResult = new HashMap<>();
-      noAssignedPartitionSlots.forEach(
-          seriesPartitionSlot ->
-              allocateResult.put(
-                  seriesPartitionSlot,
-                  schemaRegionReplicaSets.get(random.nextInt(schemaRegionReplicaSets.size()))));
+      for (TSeriesPartitionSlot seriesPartitionSlot : noAssignedPartitionSlots) {
+        // Do greedy allocation
+        Pair<Long, TConsensusGroupId> bestRegion = regionSlotsCounter.get(0);
+        allocateResult.put(seriesPartitionSlot, regionReplicaMap.get(bestRegion.getRight()));
+
+        // Bubble sort
+        int index = 0;
+        regionSlotsCounter.set(0, new Pair<>(bestRegion.getLeft() + 1, bestRegion.getRight()));
+        while (index < regionSlotsCounter.size() - 1
+            && regionSlotsCounter.get(index).getLeft()
+                > regionSlotsCounter.get(index + 1).getLeft()) {
+          Collections.swap(regionSlotsCounter, index, index + 1);
+          index += 1;
+        }
+      }
 
       result.put(storageGroup, allocateResult);
     }
@@ -220,15 +235,18 @@ public class PartitionManager {
 
     Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
         result = new HashMap<>();
+    Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaMap =
+        partitionInfo.getRegionReplicaMap();
 
     for (String storageGroup : noAssignedDataPartitionSlotsMap.keySet()) {
+
       Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> noAssignedPartitionSlotsMap =
           noAssignedDataPartitionSlotsMap.get(storageGroup);
-      List<TRegionReplicaSet> dataRegionEndPoints =
-          partitionInfo.getRegionReplicaSets(
+      // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
+      List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+          partitionInfo.getSortedRegionSlotsCounter(
               getClusterSchemaManager()
                   .getRegionGroupIds(storageGroup, TConsensusGroupType.DataRegion));
-      Random random = new Random();
 
       Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> allocateResult =
           new HashMap<>();
@@ -236,10 +254,23 @@ public class PartitionManager {
           noAssignedPartitionSlotsMap.entrySet()) {
         allocateResult.put(seriesPartitionEntry.getKey(), new HashMap<>());
         for (TTimePartitionSlot timePartitionSlot : seriesPartitionEntry.getValue()) {
+
+          // Do greedy allocation
+          Pair<Long, TConsensusGroupId> bestRegion = regionSlotsCounter.get(0);
           allocateResult
               .get(seriesPartitionEntry.getKey())
               .computeIfAbsent(timePartitionSlot, key -> new ArrayList<>())
-              .add(dataRegionEndPoints.get(random.nextInt(dataRegionEndPoints.size())));
+              .add(regionReplicaMap.get(bestRegion.getRight()));
+
+          // Bubble sort
+          int index = 0;
+          regionSlotsCounter.set(0, new Pair<>(bestRegion.getLeft() + 1, bestRegion.getRight()));
+          while (index < regionSlotsCounter.size() - 1
+              && regionSlotsCounter.get(index).getLeft()
+                  > regionSlotsCounter.get(index + 1).getLeft()) {
+            Collections.swap(regionSlotsCounter, index, index + 1);
+            index += 1;
+          }
         }
       }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
index 20a701ef34..b9af789868 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -58,10 +59,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -74,10 +75,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class PartitionInfo implements SnapshotProcessor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionInfo.class);
+
   // Region read write lock
   private final ReentrantReadWriteLock regionReadWriteLock;
   private AtomicInteger nextRegionGroupId = new AtomicInteger(0);
-  private final Map<TConsensusGroupId, TRegionReplicaSet> regionMap;
+  private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaMap;
+  // Map<TConsensusGroupId, allocatedSlotsNumber>
+  private final Map<TConsensusGroupId, Long> regionSlotsCounter;
 
   // SchemaPartition read write lock
   private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
@@ -94,7 +98,8 @@ public class PartitionInfo implements SnapshotProcessor {
 
   public PartitionInfo() {
     this.regionReadWriteLock = new ReentrantReadWriteLock();
-    this.regionMap = new HashMap<>();
+    this.regionReplicaMap = new HashMap<>();
+    this.regionSlotsCounter = new HashMap<>();
 
     this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
     this.schemaPartition =
@@ -133,7 +138,8 @@ public class PartitionInfo implements SnapshotProcessor {
       int maxRegionId = Integer.MIN_VALUE;
 
       for (TRegionReplicaSet regionReplicaSet : req.getRegionMap().values()) {
-        regionMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+        regionReplicaMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+        regionSlotsCounter.put(regionReplicaSet.getRegionId(), 0L);
         maxRegionId = Math.max(maxRegionId, regionReplicaSet.getRegionId().getId());
       }
 
@@ -161,7 +167,8 @@ public class PartitionInfo implements SnapshotProcessor {
     regionReadWriteLock.writeLock().lock();
     try {
       for (TConsensusGroupId consensusGroupId : req.getConsensusGroupIds()) {
-        regionMap.remove(consensusGroupId);
+        regionReplicaMap.remove(consensusGroupId);
+        regionSlotsCounter.remove(consensusGroupId);
       }
       result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } finally {
@@ -220,6 +227,7 @@ public class PartitionInfo implements SnapshotProcessor {
    */
   public TSStatus createSchemaPartition(CreateSchemaPartitionReq req) {
     schemaPartitionReadWriteLock.writeLock().lock();
+    regionReadWriteLock.writeLock().lock();
 
     try {
       // Allocate SchemaPartition by CreateSchemaPartitionPlan
@@ -228,10 +236,14 @@ public class PartitionInfo implements SnapshotProcessor {
       assignedResult.forEach(
           (storageGroup, partitionSlots) ->
               partitionSlots.forEach(
-                  (seriesPartitionSlot, regionReplicaSet) ->
-                      schemaPartition.createSchemaPartition(
-                          storageGroup, seriesPartitionSlot, regionReplicaSet)));
+                  (seriesPartitionSlot, regionReplicaSet) -> {
+                    schemaPartition.createSchemaPartition(
+                        storageGroup, seriesPartitionSlot, regionReplicaSet);
+                    regionSlotsCounter.computeIfPresent(
+                        regionReplicaSet.getRegionId(), (consensusGroupId, count) -> (count + 1));
+                  }));
     } finally {
+      regionReadWriteLock.writeLock().unlock();
       schemaPartitionReadWriteLock.writeLock().unlock();
     }
 
@@ -289,6 +301,7 @@ public class PartitionInfo implements SnapshotProcessor {
    */
   public TSStatus createDataPartition(CreateDataPartitionReq req) {
     dataPartitionReadWriteLock.writeLock().lock();
+    regionReadWriteLock.writeLock().lock();
 
     try {
       // Allocate DataPartition by CreateDataPartitionPlan
@@ -301,13 +314,18 @@ public class PartitionInfo implements SnapshotProcessor {
                       timePartitionSlotRegionReplicaSets.forEach(
                           ((timePartitionSlot, regionReplicaSets) ->
                               regionReplicaSets.forEach(
-                                  regionReplicaSet ->
-                                      dataPartition.createDataPartition(
-                                          storageGroup,
-                                          seriesPartitionSlot,
-                                          timePartitionSlot,
-                                          regionReplicaSet)))))));
+                                  regionReplicaSet -> {
+                                    dataPartition.createDataPartition(
+                                        storageGroup,
+                                        seriesPartitionSlot,
+                                        timePartitionSlot,
+                                        regionReplicaSet);
+                                    regionSlotsCounter.computeIfPresent(
+                                        regionReplicaSet.getRegionId(),
+                                        (consensusGroupId, count) -> (count + 1));
+                                  }))))));
     } finally {
+      regionReadWriteLock.writeLock().unlock();
       dataPartitionReadWriteLock.writeLock().unlock();
     }
 
@@ -341,7 +359,7 @@ public class PartitionInfo implements SnapshotProcessor {
     regionReadWriteLock.readLock().lock();
     try {
       for (TConsensusGroupId groupId : groupIds) {
-        result.add(regionMap.get(groupId));
+        result.add(regionReplicaMap.get(groupId));
       }
     } finally {
       regionReadWriteLock.readLock().unlock();
@@ -354,7 +372,35 @@ public class PartitionInfo implements SnapshotProcessor {
     List<TRegionReplicaSet> result;
     regionReadWriteLock.readLock().lock();
     try {
-      result = new ArrayList<>(regionMap.values());
+      result = new ArrayList<>(regionReplicaMap.values());
+    } finally {
+      regionReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
+  /** @return A copy of regionReplicaMap */
+  public Map<TConsensusGroupId, TRegionReplicaSet> getRegionReplicaMap() {
+    Map<TConsensusGroupId, TRegionReplicaSet> result;
+    regionReadWriteLock.readLock().lock();
+    try {
+      result = new HashMap<>(regionReplicaMap);
+    } finally {
+      regionReadWriteLock.readLock().unlock();
+    }
+    return result;
+  }
+
+  /** @return The specific Regions that sorted by the number of allocated slots */
+  public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(
+      List<TConsensusGroupId> consensusGroupIds) {
+    List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
+    regionReadWriteLock.readLock().lock();
+    try {
+      for (TConsensusGroupId consensusGroupId : consensusGroupIds) {
+        result.add(new Pair<>(regionSlotsCounter.get(consensusGroupId), consensusGroupId));
+      }
+      result.sort(Comparator.comparingLong(Pair::getLeft));
     } finally {
       regionReadWriteLock.readLock().unlock();
     }
@@ -491,14 +537,22 @@ public class PartitionInfo implements SnapshotProcessor {
     return schemaPartition;
   }
 
+  @TestOnly
+  public Map<TConsensusGroupId, Long> getRegionSlotsCounter() {
+    return regionSlotsCounter;
+  }
+
   private void serializeRegionMap(ByteBuffer buffer) throws TException, IOException {
     try (ByteArrayOutputStream out = new ByteArrayOutputStream();
         TIOStreamTransport tioStreamTransport = new TIOStreamTransport(out)) {
       TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
-      for (Entry<TConsensusGroupId, TRegionReplicaSet> entry : regionMap.entrySet()) {
-        entry.getKey().write(protocol);
-        entry.getValue().write(protocol);
+
+      for (TConsensusGroupId consensusGroupId : regionReplicaMap.keySet()) {
+        consensusGroupId.write(protocol);
+        regionReplicaMap.get(consensusGroupId).write(protocol);
+        protocol.writeI64(regionSlotsCounter.get(consensusGroupId));
       }
+
       byte[] toArray = out.toByteArray();
       buffer.putInt(toArray.length);
       buffer.put(toArray);
@@ -513,18 +567,23 @@ public class PartitionInfo implements SnapshotProcessor {
         TIOStreamTransport tioStreamTransport = new TIOStreamTransport(in)) {
       while (in.available() > 0) {
         TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
+
         TConsensusGroupId tConsensusGroupId = new TConsensusGroupId();
         tConsensusGroupId.read(protocol);
         TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
         tRegionReplicaSet.read(protocol);
-        regionMap.put(tConsensusGroupId, tRegionReplicaSet);
+        Long count = protocol.readI64();
+
+        regionReplicaMap.put(tConsensusGroupId, tRegionReplicaSet);
+        regionSlotsCounter.put(tConsensusGroupId, count);
       }
     }
   }
 
   public void clear() {
     nextRegionGroupId = new AtomicInteger(0);
-    regionMap.clear();
+    regionReplicaMap.clear();
+    regionSlotsCounter.clear();
 
     if (schemaPartition.getSchemaPartitionMap() != null) {
       schemaPartition.getSchemaPartitionMap().clear();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index e6668cd778..820ec3c1e7 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -53,7 +53,6 @@ public class PartitionInfoTest {
   private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
 
   enum testFlag {
-    RegionReplica(10),
     DataPartition(20),
     SchemaPartition(30);
 
@@ -89,25 +88,40 @@ public class PartitionInfoTest {
 
     partitionInfo.generateNextRegionGroupId();
 
+    // Create a SchemaRegion
     CreateRegionsReq createRegionsReq = new CreateRegionsReq();
+    TRegionReplicaSet schemaRegionReplicaSet =
+        generateTRegionReplicaSet(
+            testFlag.SchemaPartition.getFlag(),
+            generateTConsensusGroupId(
+                testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion));
+    createRegionsReq.addRegion("root.test", schemaRegionReplicaSet);
+    partitionInfo.createRegions(createRegionsReq);
 
-    TRegionReplicaSet tRegionReplicaSet =
+    // Create a DataRegion
+    createRegionsReq = new CreateRegionsReq();
+    TRegionReplicaSet dataRegionReplicaSet =
         generateTRegionReplicaSet(
-            testFlag.RegionReplica.getFlag(),
-            generateTConsensusGroupId(testFlag.RegionReplica.getFlag()));
-    createRegionsReq.addRegion("root.test", tRegionReplicaSet);
+            testFlag.DataPartition.getFlag(),
+            generateTConsensusGroupId(
+                testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion));
+    createRegionsReq.addRegion("root.test", dataRegionReplicaSet);
     partitionInfo.createRegions(createRegionsReq);
 
+    // Create a SchemaPartition
     CreateSchemaPartitionReq createSchemaPartitionReq =
         generateCreateSchemaPartitionReq(
             testFlag.SchemaPartition.getFlag(),
-            generateTConsensusGroupId(testFlag.SchemaPartition.getFlag()));
+            generateTConsensusGroupId(
+                testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion));
     partitionInfo.createSchemaPartition(createSchemaPartitionReq);
 
+    // Create a DataPartition
     CreateDataPartitionReq createDataPartitionReq =
         generateCreateDataPartitionReq(
             testFlag.DataPartition.getFlag(),
-            generateTConsensusGroupId(testFlag.DataPartition.getFlag()));
+            generateTConsensusGroupId(
+                testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion));
     partitionInfo.createDataPartition(createDataPartitionReq);
     int nextId = partitionInfo.getNextRegionGroupId();
 
@@ -117,11 +131,23 @@ public class PartitionInfoTest {
 
     Assert.assertEquals(nextId, (int) partitionInfo.getNextRegionGroupId());
 
+    // Check SchemaRegion
     List<TRegionReplicaSet> reloadTRegionReplicaSet =
         partitionInfo.getRegionReplicaSets(
-            Collections.singletonList(generateTConsensusGroupId(testFlag.RegionReplica.getFlag())));
+            Collections.singletonList(
+                generateTConsensusGroupId(
+                    testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion)));
     Assert.assertEquals(1, reloadTRegionReplicaSet.size());
-    Assert.assertEquals(tRegionReplicaSet, reloadTRegionReplicaSet.get(0));
+    Assert.assertEquals(schemaRegionReplicaSet, reloadTRegionReplicaSet.get(0));
+
+    // Check DataRegion
+    reloadTRegionReplicaSet =
+        partitionInfo.getRegionReplicaSets(
+            Collections.singletonList(
+                generateTConsensusGroupId(
+                    testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion)));
+    Assert.assertEquals(1, reloadTRegionReplicaSet.size());
+    Assert.assertEquals(dataRegionReplicaSet, reloadTRegionReplicaSet.get(0));
 
     Assert.assertEquals(
         createDataPartitionReq.getAssignedDataPartition(),
@@ -130,6 +156,11 @@ public class PartitionInfoTest {
     Assert.assertEquals(
         createSchemaPartitionReq.getAssignedSchemaPartition(),
         partitionInfo.getSchemaPartition().getSchemaPartitionMap());
+
+    Assert.assertEquals(2, partitionInfo.getRegionSlotsCounter().size());
+    for (Long count : partitionInfo.getRegionSlotsCounter().values()) {
+      Assert.assertEquals(1, count.intValue());
+    }
   }
 
   private TRegionReplicaSet generateTRegionReplicaSet(
@@ -187,7 +218,8 @@ public class PartitionInfoTest {
     return createSchemaPartitionReq;
   }
 
-  private TConsensusGroupId generateTConsensusGroupId(int startFlag) {
-    return new TConsensusGroupId(TConsensusGroupType.PartitionRegion, 111000 + startFlag);
+  private TConsensusGroupId generateTConsensusGroupId(
+      int startFlag, TConsensusGroupType consensusGroupType) {
+    return new TConsensusGroupId(consensusGroupType, 111000 + startFlag);
   }
 }