You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2023/07/21 12:42:18 UTC
[iotdb] 02/02: Pass UT
This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch Computing-resource-balancing
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e50c2ce9b1bbffcce46e91dd86347ebd914a3f27
Author: YongzaoDan <53...@qq.com>
AuthorDate: Fri Jul 21 20:42:05 2023 +0800
Pass UT
---
.../manager/load/balancer/PartitionBalancer.java | 4 +-
.../load/balancer/partition/DataAllotTable.java | 41 +++--
.../partition/DatabasePartitionTable.java | 13 +-
.../statemachine/CreateRegionGroupsProcedure.java | 35 ++--
.../balancer/partition/DataAllotTableTest.java | 177 +++++++++++++++++++++
.../commons/partition/SeriesPartitionTable.java | 18 +--
.../iotdb/commons/structure/BalanceTreeMap.java | 33 +++-
.../commons/structure/BalanceTreeMapTest.java | 82 ++++++++++
8 files changed, 349 insertions(+), 54 deletions(-)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index f0d408e91cd..963ec39edd8 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -243,7 +243,9 @@ public class PartitionBalancer {
*/
public void updateDataAllotTable(String database) {
TTimePartitionSlot currentTimePartition =
- dataAllotTableMap.get(database).getCurrentTimePartition();
+ dataAllotTableMap
+ .computeIfAbsent(database, empty -> new DataAllotTable())
+ .getCurrentTimePartition();
Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new ConcurrentHashMap<>();
for (int i = 0; i < SERIES_SLOT_NUM; i++) {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
index f334f490888..0507272f6c3 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTable.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
public class DataAllotTable {
@@ -102,16 +103,26 @@ public class DataAllotTable {
Map<TSeriesPartitionSlot, TConsensusGroupId> newAllotTable = new HashMap<>();
for (TSeriesPartitionSlot seriesPartitionSlot : seriesSlotList) {
+ if (allocatedTable.containsKey(seriesPartitionSlot)) {
+ // If the SeriesSlot has already been allocated, keep the allocation
+ newAllotTable.put(seriesPartitionSlot, allocatedTable.get(seriesPartitionSlot));
+ continue;
+ }
+
TConsensusGroupId oldRegionGroupId = dataAllotTable.get(seriesPartitionSlot);
- if (counter.get(oldRegionGroupId) < mu) {
+ if (oldRegionGroupId != null
+ && counter.containsKey(oldRegionGroupId)
+ && counter.get(oldRegionGroupId) < mu) {
// Inherit the oldRegionGroupId when the slotNum of oldRegionGroupId is less than average
newAllotTable.put(seriesPartitionSlot, oldRegionGroupId);
- } else {
- // Otherwise, choose the regionGroup with the least slotNum to keep load balance
- TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue();
- newAllotTable.put(seriesPartitionSlot, newRegionGroupId);
- counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1);
+ counter.put(oldRegionGroupId, counter.get(oldRegionGroupId) + 1);
+ continue;
}
+
+ // Otherwise, choose the regionGroup with the least slotNum to keep load balance
+ TConsensusGroupId newRegionGroupId = counter.getKeyWithMinValue();
+ newAllotTable.put(seriesPartitionSlot, newRegionGroupId);
+ counter.put(newRegionGroupId, counter.get(newRegionGroupId) + 1);
}
dataAllotTable.clear();
@@ -128,6 +139,7 @@ public class DataAllotTable {
* @return whether the current time partition is updated
*/
public boolean updateCurrentTimePartition(int regionGroupNum) {
+ int threshold = timePartitionThreshold(regionGroupNum);
dataAllotTableLock.writeLock().lock();
try {
AtomicLong newStartTime = new AtomicLong(Long.MIN_VALUE);
@@ -135,23 +147,18 @@ public class DataAllotTable {
(timePartition, counter) -> {
// Select the maximum TimePartition whose slotNum is greater than the following equation
// Ensure that the remaining slots can be still distributed to new regionGroups
- if (counter.get() >= timePartitionThreshold(regionGroupNum)
- && timePartition.getStartTime() > newStartTime.get()) {
+ if (counter.get() >= threshold && timePartition.getStartTime() > newStartTime.get()) {
newStartTime.set(timePartition.getStartTime());
}
});
if (newStartTime.get() > currentTimePartition.get().getStartTime()) {
currentTimePartition.set(new TTimePartitionSlot(newStartTime.get()));
- dataPartitionCounter
- .keySet()
- .forEach(
- timePartition -> {
- // Remove the useless TimePartitions
- if (timePartition.getStartTime() < newStartTime.get()) {
- dataPartitionCounter.remove(timePartition);
- }
- });
+ List<TTimePartitionSlot> removeTimePartitionSlots =
+ dataPartitionCounter.keySet().stream()
+ .filter(timePartition -> timePartition.getStartTime() < newStartTime.get())
+ .collect(Collectors.toList());
+ removeTimePartitionSlots.forEach(dataPartitionCounter::remove);
return true;
}
} finally {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index c1516e6f7c9..9782356b595 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -229,16 +229,9 @@ public class DatabasePartitionTable {
* @return List of TConsensusGroupId
*/
public List<TConsensusGroupId> getAllRegionGroupIds(TConsensusGroupType type) {
- List<TConsensusGroupId> result = new Vector<>();
- regionGroupMap
- .values()
- .forEach(
- regionGroup -> {
- if (regionGroup.getId().getType().equals(type)) {
- result.add(regionGroup.getId());
- }
- });
- return result;
+ return regionGroupMap.keySet().stream()
+ .filter(regionGroupId -> regionGroupId.getType().equals(type))
+ .collect(Collectors.toList());
}
public int getAssignedSeriesPartitionSlotsCount() {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 3072746b378..279a5e8c872 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.confignode.procedure.impl.statemachine;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
@@ -52,6 +53,7 @@ public class CreateRegionGroupsProcedure
private TConsensusGroupType consensusGroupType;
private CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+ private CreateRegionGroupsPlan persistPlan;
/** key: TConsensusGroupId value: Failed RegionReplicas */
private Map<TConsensusGroupId, TRegionReplicaSet> failedRegionReplicaSets = new HashMap<>();
@@ -64,6 +66,7 @@ public class CreateRegionGroupsProcedure
TConsensusGroupType consensusGroupType, CreateRegionGroupsPlan createRegionGroupsPlan) {
this.consensusGroupType = consensusGroupType;
this.createRegionGroupsPlan = createRegionGroupsPlan;
+ this.persistPlan = new CreateRegionGroupsPlan();
}
@TestOnly
@@ -74,6 +77,7 @@ public class CreateRegionGroupsProcedure
this.consensusGroupType = consensusGroupType;
this.createRegionGroupsPlan = createRegionGroupsPlan;
this.failedRegionReplicaSets = failedRegionReplicaSets;
+ this.persistPlan = new CreateRegionGroupsPlan();
}
@Override
@@ -84,7 +88,7 @@ public class CreateRegionGroupsProcedure
setNextState(CreateRegionGroupsState.SHUNT_REGION_REPLICAS);
break;
case SHUNT_REGION_REPLICAS:
- CreateRegionGroupsPlan persistPlan = new CreateRegionGroupsPlan();
+ persistPlan = new CreateRegionGroupsPlan();
OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan();
// Filter those RegionGroups that created successfully
createRegionGroupsPlan
@@ -201,12 +205,15 @@ public class CreateRegionGroupsProcedure
setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
break;
case CREATE_REGION_GROUPS_FINISH:
- // Update all corresponding DataAllotTables
- createRegionGroupsPlan
- .getRegionGroupMap()
- .keySet()
- .forEach(
- database -> env.getConfigManager().getLoadManager().updateDataAllotTable(database));
+ if (TConsensusGroupType.DataRegion.equals(consensusGroupType)) {
+ // Update all corresponding DataAllotTables
+ persistPlan
+ .getRegionGroupMap()
+ .keySet()
+ .forEach(
+ database ->
+ env.getConfigManager().getLoadManager().updateDataAllotTable(database));
+ }
return Flow.NO_MORE_STATE;
}
@@ -248,6 +255,7 @@ public class CreateRegionGroupsProcedure
ThriftCommonsSerDeUtils.serializeTConsensusGroupId(groupId, stream);
ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(replica, stream);
});
+ persistPlan.serializeForProcedure(stream);
}
@Override
@@ -265,6 +273,7 @@ public class CreateRegionGroupsProcedure
ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
failedRegionReplicaSets.put(groupId, replica);
}
+ persistPlan.deserializeForProcedure(byteBuffer);
} catch (Exception e) {
LOGGER.error("Deserialize meets error in CreateRegionGroupsProcedure", e);
throw new RuntimeException(e);
@@ -273,16 +282,22 @@ public class CreateRegionGroupsProcedure
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
CreateRegionGroupsProcedure that = (CreateRegionGroupsProcedure) o;
return consensusGroupType == that.consensusGroupType
&& createRegionGroupsPlan.equals(that.createRegionGroupsPlan)
+ && persistPlan.equals(that.persistPlan)
&& failedRegionReplicaSets.equals(that.failedRegionReplicaSets);
}
@Override
public int hashCode() {
- return Objects.hash(consensusGroupType, createRegionGroupsPlan, failedRegionReplicaSets);
+ return Objects.hash(
+ consensusGroupType, createRegionGroupsPlan, persistPlan, failedRegionReplicaSets);
}
}
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
new file mode 100644
index 00000000000..45ee77cecb1
--- /dev/null
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/partition/DataAllotTableTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DataAllotTableTest {
+
+ private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+ private static final int SERIES_SLOT_NUM = CONF.getSeriesSlotNum();
+
+ @Test
+ public void testUpdateCurrentTimePartition() {
+ final int regionGroupNum = 5;
+ final int threshold = DataAllotTable.timePartitionThreshold(regionGroupNum);
+ final long timePartitionInterval = 1000;
+ DataAllotTable dataAllotTable = new DataAllotTable();
+
+ // Test 1: currentTimePartition is the first one
+ TTimePartitionSlot nextTimePartition = new TTimePartitionSlot(1000);
+ Map<TTimePartitionSlot, Integer> timePartitionCountMap = new HashMap<>();
+ timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), threshold);
+ timePartitionCountMap.put(
+ new TTimePartitionSlot(nextTimePartition.getStartTime() + timePartitionInterval),
+ threshold - 100);
+ timePartitionCountMap.put(
+ new TTimePartitionSlot(nextTimePartition.getStartTime() + 2 * timePartitionInterval),
+ threshold - 200);
+ dataAllotTable.addTimePartitionCount(timePartitionCountMap);
+ dataAllotTable.updateCurrentTimePartition(regionGroupNum);
+ Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition());
+
+ // Test 2: currentTimePartition in the middle
+ timePartitionCountMap.clear();
+ nextTimePartition = new TTimePartitionSlot(5000);
+ timePartitionCountMap.put(
+ new TTimePartitionSlot(nextTimePartition.getStartTime() - timePartitionInterval),
+ threshold - 100);
+ timePartitionCountMap.put(new TTimePartitionSlot(nextTimePartition), threshold);
+ timePartitionCountMap.put(
+ new TTimePartitionSlot(nextTimePartition.getStartTime() + timePartitionInterval),
+ threshold - 100);
+ dataAllotTable.addTimePartitionCount(timePartitionCountMap);
+ dataAllotTable.updateCurrentTimePartition(regionGroupNum);
+ Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition());
+
+ // Test 3: currentTimePartition will be the maximum timePartitionSlot that greater or equal to
+ // threshold
+ int offset = 200;
+ Random random = new Random();
+ timePartitionCountMap.clear();
+ TTimePartitionSlot baseSlot = new TTimePartitionSlot(10000);
+ nextTimePartition = baseSlot;
+ timePartitionCountMap.put(nextTimePartition, threshold);
+ for (int i = 1; i < 100; i++) {
+ TTimePartitionSlot slot =
+ new TTimePartitionSlot(baseSlot.getStartTime() + i * timePartitionInterval);
+ int count = threshold + random.nextInt(offset) - offset / 2;
+ timePartitionCountMap.put(slot, count);
+ if (count >= threshold) {
+ nextTimePartition = slot;
+ }
+ }
+ dataAllotTable.addTimePartitionCount(timePartitionCountMap);
+ dataAllotTable.updateCurrentTimePartition(regionGroupNum);
+ Assert.assertEquals(nextTimePartition, dataAllotTable.getCurrentTimePartition());
+ }
+
+ @Test
+ public void testUpdateDataAllotTable() {
+ DataAllotTable dataAllotTable = new DataAllotTable();
+ List<TConsensusGroupId> dataRegionGroups = new ArrayList<>();
+
+ // Test 1: construct DataAllotTable from scratch
+ TConsensusGroupId group1 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
+ dataRegionGroups.add(group1);
+ dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>());
+ for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ // All SeriesPartitionSlots belong to group1
+ Assert.assertEquals(group1, dataAllotTable.getRegionGroupId(seriesPartitionSlot));
+ }
+
+ // Test2: extend DataRegionGroups
+ Map<TSeriesPartitionSlot, TConsensusGroupId> lastDataAllotTable = new HashMap<>();
+ dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2));
+ dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3));
+ dataAllotTable.updateDataAllotTable(dataRegionGroups, new HashMap<>());
+ int mu = SERIES_SLOT_NUM / 3;
+ Map<TConsensusGroupId, AtomicInteger> counter = new HashMap<>();
+ for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ TConsensusGroupId groupId = dataAllotTable.getRegionGroupId(seriesPartitionSlot);
+ lastDataAllotTable.put(seriesPartitionSlot, groupId);
+ counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet();
+ }
+ // All DataRegionGroups divide SeriesPartitionSlots evenly
+ for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) {
+ Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+ }
+
+ // Test 3: extend DataRegionGroups while inherit future allocate result
+ dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 4));
+ dataRegionGroups.add(new TConsensusGroupId(TConsensusGroupType.DataRegion, 5));
+ Random random = new Random();
+ Map<TSeriesPartitionSlot, TConsensusGroupId> allocatedTable = new HashMap<>();
+ Map<TConsensusGroupId, AtomicInteger> unchangedSlots = new HashMap<>();
+ for (int i = 0; i < 50; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot =
+ new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM));
+ while (allocatedTable.containsKey(seriesPartitionSlot)) {
+ seriesPartitionSlot = new TSeriesPartitionSlot(random.nextInt(SERIES_SLOT_NUM));
+ }
+ allocatedTable.put(
+ seriesPartitionSlot,
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, random.nextInt(2) + 4));
+ }
+ dataAllotTable.updateDataAllotTable(dataRegionGroups, allocatedTable);
+ mu = SERIES_SLOT_NUM / 5;
+ counter.clear();
+ for (int i = 0; i < SERIES_SLOT_NUM; i++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
+ TConsensusGroupId groupId = dataAllotTable.getRegionGroupId(seriesPartitionSlot);
+ counter.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet();
+ if (groupId.getId() < 4) {
+ // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged
+ Assert.assertEquals(lastDataAllotTable.get(seriesPartitionSlot), groupId);
+ unchangedSlots.computeIfAbsent(groupId, empty -> new AtomicInteger(0)).incrementAndGet();
+ }
+ }
+ // All DataRegionGroups divide SeriesPartitionSlots evenly
+ for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : counter.entrySet()) {
+ Assert.assertTrue(Math.abs(counterEntry.getValue().get() - mu) <= 1);
+ }
+ // All SeriesPartitionSlots that have been allocated before should be allocated to the same
+ // DataRegionGroup
+ allocatedTable.forEach(
+ (seriesPartitionSlot, groupId) ->
+ Assert.assertEquals(groupId, dataAllotTable.getRegionGroupId(seriesPartitionSlot)));
+ // Most of SeriesPartitionSlots in the first three DataRegionGroups should remain unchanged
+ for (Map.Entry<TConsensusGroupId, AtomicInteger> counterEntry : unchangedSlots.entrySet()) {
+ Assert.assertEquals(mu, counterEntry.getValue().get());
+ }
+ }
+}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index def3cc07fd6..00edd8bcb89 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -249,14 +249,13 @@ public class SeriesPartitionTable {
* @return The last DataPartition, null if there are no DataPartitions
*/
public Pair<TTimePartitionSlot, TConsensusGroupId> getLastDataPartition() {
- try {
- Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
- seriesPartitionMap.lastEntry();
- return new Pair<>(
- lastEntry.getKey(), lastEntry.getValue().get(lastEntry.getValue().size() - 1));
- } catch (NoSuchElementException e) {
+ Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+ seriesPartitionMap.lastEntry();
+ if (lastEntry == null) {
return null;
}
+ return new Pair<>(
+ lastEntry.getKey(), lastEntry.getValue().get(lastEntry.getValue().size() - 1));
}
/**
@@ -275,11 +274,12 @@ public class SeriesPartitionTable {
* @return The last DataPartition's ConsensusGroupId, null if there are no DataPartitions yet
*/
public TConsensusGroupId getLastConsensusGroupId() {
- try {
- return seriesPartitionMap.lastEntry().getValue().get(0);
- } catch (NoSuchElementException e) {
+ Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> lastEntry =
+ seriesPartitionMap.lastEntry();
+ if (lastEntry == null) {
return null;
}
+ return lastEntry.getValue().get(lastEntry.getValue().size() - 1);
}
/**
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
index f1ecdc3a5fb..1ea29c9a80b 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.structure;
+import org.apache.iotdb.commons.utils.TestOnly;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
@@ -56,6 +58,15 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
valueKeyMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key);
}
+ /**
+ * Get key with minimum value.
+ *
+ * @return Key with minimum value
+ */
+ public K getKeyWithMinValue() {
+ return valueKeyMap.firstEntry().getValue().iterator().next();
+ }
+
public V get(K key) {
return keyValueMap.getOrDefault(key, null);
}
@@ -64,12 +75,20 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
return keyValueMap.containsKey(key);
}
- /**
- * Get key with minimum value.
- *
- * @return Key with minimum value
- */
- public K getKeyWithMinValue() {
- return valueKeyMap.firstEntry().getValue().iterator().next();
+ @TestOnly
+ public void remove(K key) {
+ V value = keyValueMap.getOrDefault(key, null);
+ if (value != null) {
+ keyValueMap.remove(key);
+ valueKeyMap.get(value).remove(key);
+ if (valueKeyMap.get(value).isEmpty()) {
+ valueKeyMap.remove(value);
+ }
+ }
+ }
+
+ @TestOnly
+ public boolean isEmpty() {
+ return keyValueMap.isEmpty() && valueKeyMap.isEmpty();
}
}
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
new file mode 100644
index 00000000000..d38b8176f26
--- /dev/null
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/structure/BalanceTreeMapTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.structure;
+
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class BalanceTreeMapTest {
+
+ @Test
+ public void testGetKeyWithMinValue() {
+ Random random = new Random();
+ BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new BalanceTreeMap<>();
+ for (int i = 0; i < 100; i++) {
+ balanceTreeMap.put(new TSeriesPartitionSlot(i), random.nextInt(Integer.MAX_VALUE));
+ }
+ TSeriesPartitionSlot minSlot = new TSeriesPartitionSlot(100);
+ balanceTreeMap.put(minSlot, Integer.MIN_VALUE);
+ for (int i = 101; i < 200; i++) {
+ balanceTreeMap.put(new TSeriesPartitionSlot(i), random.nextInt(Integer.MAX_VALUE));
+ }
+ Assert.assertEquals(minSlot, balanceTreeMap.getKeyWithMinValue());
+
+ int currentValue = Integer.MIN_VALUE;
+ for (int i = 0; i < 200; i++) {
+ TSeriesPartitionSlot slot = balanceTreeMap.getKeyWithMinValue();
+ Assert.assertTrue(balanceTreeMap.get(slot) >= currentValue);
+ currentValue = balanceTreeMap.get(slot);
+ balanceTreeMap.remove(slot);
+ }
+ }
+
+ @Test
+ public void testKeysDuplicate() {
+ BalanceTreeMap<TSeriesPartitionSlot, Integer> balanceTreeMap = new BalanceTreeMap<>();
+ Set<TSeriesPartitionSlot> duplicateSet0 = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i);
+ balanceTreeMap.put(slot, 0);
+ duplicateSet0.add(slot);
+ }
+ Set<TSeriesPartitionSlot> duplicateSet1 = new HashSet<>();
+ for (int i = 10; i < 20; i++) {
+ TSeriesPartitionSlot slot = new TSeriesPartitionSlot(i);
+ balanceTreeMap.put(slot, 1);
+ duplicateSet1.add(slot);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Assert.assertTrue(duplicateSet0.contains(balanceTreeMap.getKeyWithMinValue()));
+ balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue());
+ }
+ for (int i = 0; i < 10; i++) {
+ Assert.assertTrue(duplicateSet1.contains(balanceTreeMap.getKeyWithMinValue()));
+ balanceTreeMap.remove(balanceTreeMap.getKeyWithMinValue());
+ }
+ Assert.assertTrue(balanceTreeMap.isEmpty());
+ }
+}