You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2023/07/21 12:42:18 UTC

[iotdb] 02/02: Pass UT

This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch Computing-resource-balancing
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e50c2ce9b1bbffcce46e91dd86347ebd914a3f27
Author: YongzaoDan <53...@qq.com>
AuthorDate: Fri Jul 21 20:42:05 2023 +0800

    Pass UT
---
 .../manager/load/balancer/PartitionBalancer.java   |   4 +-
 .../load/balancer/partition/DataAllotTable.java    |  41 +++--
 .../partition/DatabasePartitionTable.java          |  13 +-
 .../statemachine/CreateRegionGroupsProcedure.java  |  35 ++--
 .../balancer/partition/DataAllotTableTest.java     | 177 +++++++++++++++++++++
 .../commons/partition/SeriesPartitionTable.java    |  18 +--
 .../iotdb/commons/structure/BalanceTreeMap.java    |  33 +++-
 .../commons/structure/BalanceTreeMapTest.java      |  82 ++++++++++
 8 files changed, 349 insertions(+), 54 deletions(-)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index f0d408e91cd..963ec39edd8 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -243,7 +243,9 @@ public class PartitionBalancer {
    */
   public void updateDataAllotTable(String database) {
     TTimePartitionSlot currentTimePartition =
-        dataAllotTableMap.get(database).getCurrentTimePartition();
+        dataAllotTableMap
+            .computeIfAbsent(database, empty -> new DataAllotTable())
+            .getCurrentTimePartition();
     Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new ConcurrentHashMap<>();
     for (int i = 0; i < SERIES_SLOT_NUM; i++) {
       TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
index f334f490888..0507272f6c3 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 public class DataAllotTable {
 
@@ -102,16 +103,26 @@ public class DataAllotTable {
 
       Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new HashMap<>();
       for (TSeriesPartitionSlot seriesPartitionSlot : seriesSlotList) {
+        if (allocatedTable.containsKey(seriesPartitionSlot)) {
+          // If the SeriesSlot has already been allocated, keep the allocation
+          newAllotTable.put(seriesPartitionSlot, allocatedTable.get(seriesPartitionSlot));
+          continue;
+        }
+
         TConsensusGroupId oldRegionGroupId = dataAllotTable.get(seriesPartitionSlot);
-        if (counter.get(oldRegionGroupId) < mu) {
+        if (oldRegionGroupId != null
+            && counter.containsKey(oldRegionGroupId)
+            && counter.get(oldRegionGroupId) < mu) {
           // Inherit the oldRegionGroupId when the slotNum of oldRegionGroupId is less than average
           newAllotTable.put(seriesPartitionSlot, oldRegionGroupId);
-        } else {
-          // Otherwise, choose the regionGroup with the least slotNum to keep load balance
-          TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue();
-          newAllotTable.put(seriesPartitionSlot, newRegionGroupId);
-          counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1);
+          counter.put(oldRegionGroupId, counter.get(oldRegionGroupId) + 1);
+          continue;
         }
+
+        // Otherwise, choose the regionGroup with the least slotNum to keep load balance
+        TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue();
+        newAllotTable.put(seriesPartitionSlot, newRegionGroupId);
+        counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1);
       }
 
       dataAllotTable.clear();
@@ -128,6 +139,7 @@ public class DataAllotTable {
    * @return whether the current time partition is updated
    */
   public boolean updateCurrentTimePartition(int regionGroupNum) {
+    int threshold = timePartitionThreshold(regionGroupNum);
     dataAllotTableLock.writeLock().lock();
     try {
       AtomicLong newStartTime = new AtomicLong(Long.MIN_VALUE);
@@ -135,23 +147,18 @@ public class DataAllotTable {
           (timePartition, counter) -> {
             // Select the maximum TimePartition whose slotNum is greater than the following equation
             // Ensure that the remaining slots can be still distributed to new regionGroups
-            if (counter.get() >= timePartitionThreshold(regionGroupNum)
-                && timePartition.getStartTime() > newStartTime.get()) {
+            if (counter.get() >= threshold && timePartition.getStartTime() > newStartTime.get()) {
               newStartTime.set(timePartition.getStartTime());
             }
           });
 
       if (newStartTime.get() > currentTimePartition.get().getStartTime()) {
         currentTimePartition.set(new TTimePartitionSlot(newStartTime.get()));
-        dataPartitionCounter
-            .keySet()
-            .forEach(
-                timePartition -> {
-                  // Remove the useless TimePartitions
-                  if (timePartition.getStartTime() < newStartTime.get()) {
-                    dataPartitionCounter.remove(timePartition);
-                  }
-                });
+        List<TTimePartitionSlot> removeTimePartitionSlots =
+            dataPartitionCounter.keySet().stream()
+                .filter(timePartition -> timePartition.getStartTime() < newStartTime.get())
+                .collect(Collectors.toList());
+        removeTimePartitionSlots.forEach(dataPartitionCounter::remove);
         return true;
       }
     } finally {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index c1516e6f7c9..9782356b595 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -229,16 +229,9 @@ public class DatabasePartitionTable {
    * @return List of TConsensusGroupId
    */
   public List<TConsensusGroupId> getAllRegionGroupIds(TConsensusGroupType type) {
-    List<TConsensusGroupId> result = new Vector<>();
-    regionGroupMap
-        .values()
-        .forEach(
-            regionGroup -> {
-              if (regionGroup.getId().getType().equals(type)) {
-                result.add(regionGroup.getId());
-              }
-            });
-    return result;
+    return regionGroupMap.keySet().stream()
+        .filter(regionGroupId -> regionGroupId.getType().equals(type))
+        .collect(Collectors.toList());
   }
 
   public int getAssignedSeriesPartitionSlotsCount() {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 3072746b378..279a5e8c872 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.procedure.impl.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -52,6 +53,7 @@ public class CreateRegionGroupsProcedure
   private TConsensusGroupType consensusGroupType;
 
   private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+  private CreateRegionGroupsPlan persistPlan;
 
   /** key: TConsensusGroupId value: Failed RegionReplicas */
   private Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets = new HashMap<>();
@@ -64,6 +66,7 @@ public class CreateRegionGroupsProcedure
       TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
     this.consensusGroupType = consensusGroupType;
     this.createRegionGroupsPlan = createRegionGroupsPlan;
+    this.persistPlan = new CreateRegionGroupsPlan();
   }
 
   @TestOnly
@@ -74,6 +77,7 @@ public class CreateRegionGroupsProcedure
     this.consensusGroupType = consensusGroupType;
     this.createRegionGroupsPlan = createRegionGroupsPlan;
     this.failedRegionReplicaSets = failedRegionReplicaSets;
+    this.persistPlan = new CreateRegionGroupsPlan();
   }
 
   @Override
@@ -84,7 +88,7 @@ public class CreateRegionGroupsProcedure
         setNextState(CreateRegionGroupsState.SHUNT_REGION_REPLICAS);
         break;
       case SHUNT_REGION_REPLICAS:
-        CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+        persistPlan = new CreateRegionGroupsPlan();
         OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan();
         // Filter those RegionGroups that created successfully
         createRegionGroupsPlan
@@ -201,12 +205,15 @@ public class CreateRegionGroupsProcedure
         setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
         break;
       case CREATE_REGION_GROUPS_FINISH:
-        // Update all corresponding DataAllotTables
-        createRegionGroupsPlan
-            .getRegionGroupMap()
-            .keySet()
-            .forEach(
-                database -> env.getConfigManager().getLoadManager().updateDataAllotTable(database));
+        if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+          // Update all corresponding DataAllotTables
+          persistPlan
+              .getRegionGroupMap()
+              .keySet()
+              .forEach(
+                  database ->
+                      env.getConfigManager().getLoadManager().updateDataAllotTable(database));
+        }
         return Flow.NO_MORE_STATE;
     }
 
@@ -248,6 +255,7 @@ public class CreateRegionGroupsProcedure
           ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream);
           ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream);
         });
+    persistPlan.serializeForProcedure(stream);
   }
 
   @Override
@@ -265,6 +273,7 @@ public class CreateRegionGroupsProcedure
             ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
         failedRegionReplicaSets.put(groupId, replica);
       }
+      persistPlan.deserializeForProcedure(byteBuffer);
     } catch (Exception e) {
       LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", e);
       throw new RuntimeException(e);
@@ -273,16 +282,22 @@ public class CreateRegionGroupsProcedure
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     CreateRegionGroupsProcedure that = (CreateRegionGroupsProcedure) o;
     return consensusGroupType == that.consensusGroupType
         && createRegionGroupsPlan.equals(that.createRegionGroupsPlan)
+        && persistPlan.equals(that.persistPlan)
         && failedRegionReplicaSets.equals(that.failedRegionReplicaSets);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(consensusGroupType, createRegionGroupsPlan, failedRegionReplicaSets);
+    return Objects.hash(
+        consensusGroupType, createRegionGroupsPlan, persistPlan, failedRegionReplicaSets);
   }
 }
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
new file mode 100644
index 00000000000..45ee77cecb1
--- /dev/null
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataAllotTableTest {
+
+  private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+  private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+  @Test
+  public void testUpdateCurrentTimePartition() {
+    final int regionGroupNum = 5;
+    final int threshold = DataAllotTable.timePartitionThreshold(regionGroupNum);
+    final long timePartitionInterval = 1000;
+    DataAllotTable dataAllotTable = new DataAllotTable();
+
+    // Test 1: currentTimePartition is the first one
+    TTimePartitionSlot nextTimePartition = new TTimePartitionSlot(1000);
+    Map<TTimePartitionSlot, Integer> timePartitionCountMap = new HashMap<>();
+    timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), threshold);
+    timePartitionCountMap.put(
+        new TTimePartitionSlot(nextTimePartition.getStartTime() + timePartitionInterval),
+        threshold - 100);
+    timePartitionCountMap.put(
+        new TTimePartitionSlot(nextTimePartition.getStartTime() + 2 * timePartitionInterval),
+        threshold - 200);
+    dataAllotTable.addTimePartitionCount(timePartitionCountMap);
+    dataAllotTable.updateCurrentTimePartition(regionGroupNum);
+    Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition());
+
+    // Test 2: currentTimePartition in the middle
+    timePartitionCountMap.clear();
+    nextTimePartition = new TTimePartitionSlot(5000);
+    timePartitionCountMap.put(
+        new TTimePartitionSlot(nextTimePartition.getStartTime() - timePartitionInterval),
+        threshold - 100);
+    timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), threshold);
+    timePartitionCountMap.put(
+        new TTimePartitionSlot(nextTimePartition.getStartTime() + timePartitionInterval),
+        threshold - 100);
+    dataAllotTable.addTimePartitionCount(timePartitionCountMap);
+    dataAllotTable.updateCurrentTimePartition(regionGroupNum);
+    Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition());
+
+    // Test 3: currentTimePartition will be the maximum timePartitionSlot that greater or equal to
+    // threshold
+    int offset = 200;
+    Random random = new Random();
+    timePartitionCountMap.clear();
+    TTimePartitionSlot baseSlot = new TTimePartitionSlot(10000);
+    nextTimePartition = baseSlot;
+    timePartitionCountMap.put(nextTimePartition, threshold);
+    for (int i = 1; i < 100; i++) {
+      TTimePartitionSlot slot =
+          new TTimePartitionSlot(baseSlot.getStartTime() + i * timePartitionInterval);
+      int count = threshold + random.nextInt(offset) - offset / 2;
+      timePartitionCountMap.put(slot, count);
+      if (count >= threshold) {
+        nextTimePartition = slot;
+      }
+    }
+    dataAllotTable.addTimePartitionCount(timePartitionCountMap);
+    dataAllotTable.updateCurrentTimePartition(regionGroupNum);
+    Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition());
+  }
+
+  @Test
+  public void testUpdateDataAllotTable() {
+    DataAllotTable dataAllotTable = new DataAllotTable();
+    List<TConsensusGroupId> dataRegionGroups = new ArrayList<>();
+
+    // Test 1: construct DataAllotTable from scratch
+    TConsensusGroupId group1 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
+    dataRegionGroups.add(group1);
+    dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>());
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      // All SeriesPartitionSlots belong to group1
+      Assert.assertEquals(group1, dataAllotTable.getRegionGroupId(seriesPartitionSlot));
+    }
+
+    // Test2: extend DataRegionGroups
+    Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new HashMap<>();
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2));
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3));
+    dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>());
+    int mu = SERIES_SLOT_NUM / 3;
+    Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>();
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      TConsensusGroupId groupId = dataAllotTable.getRegionGroupId(seriesPartitionSlot);
+      lastDataAllotTable.put(seriesPartitionSlot, groupId);
+      counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet();
+    }
+    // All DataRegionGroups divide SeriesPartitionSlots evenly
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) {
+      Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+    }
+
+    // Test 3: extend DataRegionGroups while inherit future allocate result
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 4));
+    dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 5));
+    Random random = new Random();
+    Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new HashMap<>();
+    Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>();
+    for (int i = 0; i < 50; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot =
+          new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM));
+      while (allocatedTable.containsKey(seriesPartitionSlot)) {
+        seriesPartitionSlot = new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM));
+      }
+      allocatedTable.put(
+          seriesPartitionSlot,
+          new TConsensusGroupId(TConsensusGroupType.DataRegion, random.nextInt(2) + 4));
+    }
+    dataAllotTable.updateDataAllotTable(dataRegionGroups, allocatedTable);
+    mu = SERIES_SLOT_NUM / 5;
+    counter.clear();
+    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+      TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+      TConsensusGroupId groupId = dataAllotTable.getRegionGroupId(seriesPartitionSlot);
+      counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet();
+      if (groupId.getId() < 4) {
+        // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged
+        Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot), groupId);
+        unchangedSlots.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet();
+      }
+    }
+    // All DataRegionGroups divide SeriesPartitionSlots evenly
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) {
+      Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+    }
+    // All SeriesPartitionSlots that have been allocated before should be allocated to the same
+    // DataRegionGroup
+    allocatedTable.forEach(
+        (seriesPartitionSlot, groupId) ->
+            Assert.assertEquals(groupId, dataAllotTable.getRegionGroupId(seriesPartitionSlot)));
+    // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged
+    for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : unchangedSlots.entrySet()) {
+      Assert.assertEquals(mu, counterEntry.getValue().get());
+    }
+  }
+}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index def3cc07fd6..00edd8bcb89 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -249,14 +249,13 @@ public class SeriesPartitionTable {
    * @return The last DataPartition, null if there are no DataPartitions
    */
   public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition() {
-    try {
-      Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
-          seriesPartitionMap.lastEntry();
-      return new Pair<>(
-          lastEntry.getKey(), lastEntry.getValue().get(lastEntry.getValue().size() - 1));
-    } catch (NoSuchElementException e) {
+    Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+        seriesPartitionMap.lastEntry();
+    if (lastEntry == null) {
       return null;
     }
+    return new Pair<>(
+        lastEntry.getKey(), lastEntry.getValue().get(lastEntry.getValue().size() - 1));
   }
 
   /**
@@ -275,11 +274,12 @@ public class SeriesPartitionTable {
    * @return The last DataPartition's ConsensusGroupId, null if there are no DataPartitions yet
    */
   public TConsensusGroupId getLastConsensusGroupId() {
-    try {
-      return seriesPartitionMap.lastEntry().getValue().get(0);
-    } catch (NoSuchElementException e) {
+    Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+        seriesPartitionMap.lastEntry();
+    if (lastEntry == null) {
       return null;
     }
+    return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
   }
 
   /**
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
index f1ecdc3a5fb..1ea29c9a80b 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.commons.structure;
 
+import org.apache.iotdb.commons.utils.TestOnly;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
@@ -56,6 +58,15 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
     valueKeyMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key);
   }
 
+  /**
+   * Get key with minimum value.
+   *
+   * @return Key with minimum value
+   */
+  public K getKeyWithMinValue() {
+    return valueKeyMap.firstEntry().getValue().iterator().next();
+  }
+
   public V get(K key) {
     return keyValueMap.getOrDefault(key, null);
   }
@@ -64,12 +75,20 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
     return keyValueMap.containsKey(key);
   }
 
-  /**
-   * Get key with minimum value.
-   *
-   * @return Key with minimum value
-   */
-  public K getKeyWithMinValue() {
-    return valueKeyMap.firstEntry().getValue().iterator().next();
+  @TestOnly
+  public void remove(K key) {
+    V value = keyValueMap.getOrDefault(key, null);
+    if (value != null) {
+      keyValueMap.remove(key);
+      valueKeyMap.get(value).remove(key);
+      if (valueKeyMap.get(value).isEmpty()) {
+        valueKeyMap.remove(value);
+      }
+    }
+  }
+
+  @TestOnly
+  public boolean isEmpty() {
+    return keyValueMap.isEmpty() && valueKeyMap.isEmpty();
   }
 }
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
new file mode 100644
index 00000000000..d38b8176f26
--- /dev/null
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.structure;
+
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class BalanceTreeMapTest {
+
+  @Test
+  public void testGetKeyWithMinValue() {
+    Random random = new Random();
+    BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new BalanceTreeMap<>();
+    for (int i = 0; i < 100; i++) {
+      balanceTreeMap.put(new TSeriesPartitionSlot(i), random.nextInt(Integer.MAX_VALUE));
+    }
+    TSeriesPartitionSlot minSlot = new TSeriesPartitionSlot(100);
+    balanceTreeMap.put(minSlot, Integer.MIN_VALUE);
+    for (int i = 101; i < 200; i++) {
+      balanceTreeMap.put(new TSeriesPartitionSlot(i), random.nextInt(Integer.MAX_VALUE));
+    }
+    Assert.assertEquals(minSlot, balanceTreeMap.getKeyWithMinValue());
+
+    int currentValue = Integer.MIN_VALUE;
+    for (int i = 0; i < 200; i++) {
+      TSeriesPartitionSlot slot = balanceTreeMap.getKeyWithMinValue();
+      Assert.assertTrue(balanceTreeMap.get(slot) >= currentValue);
+      currentValue = balanceTreeMap.get(slot);
+      balanceTreeMap.remove(slot);
+    }
+  }
+
+  @Test
+  public void testKeysDuplicate() {
+    BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new BalanceTreeMap<>();
+    Set<TSeriesPartitionSlot> duplicateSet0 = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i);
+      balanceTreeMap.put(slot, 0);
+      duplicateSet0.add(slot);
+    }
+    Set<TSeriesPartitionSlot> duplicateSet1 = new HashSet<>();
+    for (int i = 10; i < 20; i++) {
+      TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i);
+      balanceTreeMap.put(slot, 1);
+      duplicateSet1.add(slot);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(duplicateSet0.contains(balanceTreeMap.getKeyWithMinValue()));
+      balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue());
+    }
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(duplicateSet1.contains(balanceTreeMap.getKeyWithMinValue()));
+      balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue());
+    }
+    Assert.assertTrue(balanceTreeMap.isEmpty());
+  }
+}