You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2023/08/23 01:48:01 UTC

[iotdb] branch 6357533a25 created (now 8150659fb40)

This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a change to branch 6357533a25
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 8150659fb40 [IOTDB-6125] Fix DataPartition allocation bug when insert big batch data (#10924)

This branch includes the following new commits:

     new 8150659fb40 [IOTDB-6125] Fix DataPartition allocation bug when insert big batch data (#10924)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-6125] Fix DataPartition allocation bug when insert big batch data (#10924)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch 6357533a25
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8150659fb404264dc74cd12e0de63f104a5b3e2f
Author: YongzaoDan <53...@qq.com>
AuthorDate: Tue Aug 22 23:27:29 2023 +0800

    [IOTDB-6125] Fix DataPartition allocation bug when insert big batch data (#10924)
---
 .../manager/load/balancer/PartitionBalancer.java   | 21 ++++++++++++---
 .../partition/DataPartitionPolicyTable.java        | 27 +++++++++++++++++++
 .../manager/partition/PartitionManager.java        | 30 ++++++++++++----------
 .../iotdb/commons/structure/BalanceTreeMap.java    |  4 +++
 4 files changed, 65 insertions(+), 17 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 6bfa9ae0671..86bb000ca05 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -205,10 +205,19 @@ public class PartitionBalancer {
    */
   public void reBalanceDataPartitionPolicy(String database) {
     try {
-      dataPartitionPolicyTableMap
-          .computeIfAbsent(database, empty -> new DataPartitionPolicyTable())
-          .reBalanceDataPartitionPolicy(
-              getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion));
+      DataPartitionPolicyTable dataPartitionPolicyTable =
+          dataPartitionPolicyTableMap.computeIfAbsent(
+              database, empty -> new DataPartitionPolicyTable());
+
+      try {
+        dataPartitionPolicyTable.acquireLock();
+        dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
+            getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion));
+        dataPartitionPolicyTable.logDataAllotTable(database);
+      } finally {
+        dataPartitionPolicyTable.releaseLock();
+      }
+
     } catch (DatabaseNotExistsException e) {
       LOGGER.error("Database {} not exists when updateDataAllotTable", database);
     }
@@ -224,6 +233,8 @@ public class PartitionBalancer {
               DataPartitionPolicyTable dataPartitionPolicyTable = new DataPartitionPolicyTable();
               dataPartitionPolicyTableMap.put(database, dataPartitionPolicyTable);
               try {
+                dataPartitionPolicyTable.acquireLock();
+
                 // Put all DataRegionGroups into the DataPartitionPolicyTable
                 dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
                     getPartitionManager()
@@ -233,6 +244,8 @@ public class PartitionBalancer {
                     getPartitionManager().getLastDataAllotTable(database));
               } catch (DatabaseNotExistsException e) {
                 LOGGER.error("Database {} not exists when setupPartitionBalancer", database);
+              } finally {
+                dataPartitionPolicyTable.releaseLock();
               }
             });
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
index ed297abf4d7..4f1727d88b9 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataPartitionPolicyTable.java
@@ -25,6 +25,9 @@ import org.apache.iotdb.commons.structure.BalanceTreeMap;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,6 +37,8 @@ import java.util.concurrent.locks.ReentrantLock;
 
 public class DataPartitionPolicyTable {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataPartitionPolicyTable.class);
+
   private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
   private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
 
@@ -69,6 +74,12 @@ public class DataPartitionPolicyTable {
     dataAllotMap.put(seriesPartitionSlot, regionGroupId);
     seriesPartitionSlotCounter.put(
         regionGroupId, seriesPartitionSlotCounter.get(regionGroupId) + 1);
+    LOGGER.info(
+        "[ActivateDataAllotTable] Activate SeriesPartitionSlot {} "
+            + "to RegionGroup {}, SeriesPartitionSlot Count: {}",
+        seriesPartitionSlot,
+        regionGroupId,
+        seriesPartitionSlotCounter.get(regionGroupId));
     return regionGroupId;
   }
 
@@ -102,6 +113,8 @@ public class DataPartitionPolicyTable {
       int mu = SERIES_SLOT_NUM / dataRegionGroups.size();
       for (TSeriesPartitionSlot seriesPartitionSlot : seriesPartitionSlots) {
         if (!dataAllotMap.containsKey(seriesPartitionSlot)) {
+          // Skip unallocated SeriesPartitionSlot
+          // They will be activated when allocating DataPartition
           continue;
         }
 
@@ -109,6 +122,7 @@ public class DataPartitionPolicyTable {
         int seriesPartitionSlotCount = seriesPartitionSlotCounter.get(regionGroupId);
         if (seriesPartitionSlotCount > mu) {
           // Remove from dataAllotMap if the number of SeriesSlots is greater than mu
+          // They will be re-activated when allocating DataPartition
           dataAllotMap.remove(seriesPartitionSlot);
           seriesPartitionSlotCounter.put(regionGroupId, seriesPartitionSlotCount - 1);
         }
@@ -143,6 +157,19 @@ public class DataPartitionPolicyTable {
     }
   }
 
+  public void logDataAllotTable(String database) {
+    seriesPartitionSlotCounter
+        .keySet()
+        .forEach(
+            regionGroupId ->
+                LOGGER.info(
+                    "[ReBalanceDataAllotTable] Database: {}, "
+                        + "RegionGroupId: {}, SeriesPartitionSlot Count: {}",
+                    database,
+                    regionGroupId,
+                    seriesPartitionSlotCounter.get(regionGroupId)));
+  }
+
   public void acquireLock() {
     dataAllotTableLock.lock();
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index ac0df2916a9..7797d20a106 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -508,7 +508,7 @@ public class PartitionManager {
       Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType)
       throws DatabaseNotExistsException, NotEnoughDataNodeException {
 
-    // Map<StorageGroup, Region allotment>
+    // Map<Database, Region allotment>
     Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
 
     for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
@@ -531,7 +531,7 @@ public class PartitionManager {
       Map<String, Integer> unassignedPartitionSlotsCountMap, TConsensusGroupType consensusGroupType)
       throws NotEnoughDataNodeException, DatabaseNotExistsException {
 
-    // Map<StorageGroup, Region allotment>
+    // Map<Database, Region allotment>
     Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
 
     for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
@@ -544,15 +544,20 @@ public class PartitionManager {
       float slotCount =
           (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(database)
               + unassignedPartitionSlotsCount;
-      float maxRegionGroupCount =
+      float maxRegionGroupNum =
           getClusterSchemaManager().getMaxRegionGroupNum(database, consensusGroupType);
       float maxSlotCount = CONF.getSeriesSlotNum();
 
       /* RegionGroup extension is required in the following cases */
-      // 1. The number of current RegionGroup of the StorageGroup is less than the minimum number
+      // 1. The number of current RegionGroup of the Database is less than the minimum number
       int minRegionGroupNum =
           getClusterSchemaManager().getMinRegionGroupNum(database, consensusGroupType);
-      if (allocatedRegionGroupCount < minRegionGroupNum) {
+      if (allocatedRegionGroupCount < minRegionGroupNum
+          // Ensure the number of RegionGroups is enough
+          // for current SeriesPartitionSlots after extension
+          // Otherwise, more RegionGroups should be extended through case 2.
+          && slotCount <= (maxSlotCount / maxRegionGroupNum) * minRegionGroupNum) {
+
         // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount
         // no less than the minRegionGroupNum
         int delta =
@@ -560,32 +565,31 @@ public class PartitionManager {
                 Math.min(
                     unassignedPartitionSlotsCount, minRegionGroupNum - allocatedRegionGroupCount);
         allotmentMap.put(database, delta);
-        continue;
       }
 
       // 2. The average number of partitions held by each Region will be greater than the
       // expected average number after the partition allocation is completed
-      if (allocatedRegionGroupCount < maxRegionGroupCount
-          && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupCount) {
+      if (allocatedRegionGroupCount < maxRegionGroupNum
+          && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupNum) {
         // The delta is equal to the smallest integer solution that satisfies the inequality:
-        // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupCount
+        // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupNum
         int delta =
             Math.min(
-                (int) (maxRegionGroupCount - allocatedRegionGroupCount),
+                (int) (maxRegionGroupNum - allocatedRegionGroupCount),
                 Math.max(
                     1,
                     (int)
                         Math.ceil(
-                            slotCount * maxRegionGroupCount / maxSlotCount
+                            slotCount * maxRegionGroupNum / maxSlotCount
                                 - allocatedRegionGroupCount)));
         allotmentMap.put(database, delta);
         continue;
       }
 
-      // 3. All RegionGroups in the specified StorageGroup are disabled currently
+      // 3. All RegionGroups in the specified Database are disabled currently
       if (allocatedRegionGroupCount
               == filterRegionGroupThroughStatus(database, RegionGroupStatus.Disabled).size()
-          && allocatedRegionGroupCount < maxRegionGroupCount) {
+          && allocatedRegionGroupCount < maxRegionGroupNum) {
         allotmentMap.put(database, 1);
       }
     }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
index 1fe5ce1bcdf..20df26ffcb4 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
@@ -77,6 +77,10 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
     return keyValueMap.getOrDefault(key, null);
   }
 
+  public Set<K> keySet() {
+    return keyValueMap.keySet();
+  }
+
   public boolean containsKey(K key) {
     return keyValueMap.containsKey(key);
   }