You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/16 09:49:23 UTC
[iotdb] branch master updated: [IOTDB-2689] [IOTDB-2690] Simple Partition load balancing (#5910)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2d4b5bc1ac [IOTDB-2689] [IOTDB-2690] Simple Partition load balancing (#5910)
2d4b5bc1ac is described below
commit 2d4b5bc1acb5c06c8bc3556aff1900ea17f3c31c
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Mon May 16 17:49:17 2022 +0800
[IOTDB-2689] [IOTDB-2690] Simple Partition load balancing (#5910)
---
.../iotdb/confignode/manager/PartitionManager.java | 57 +++++++++---
.../confignode/persistence/PartitionInfo.java | 101 ++++++++++++++++-----
.../confignode/persistence/PartitionInfoTest.java | 54 ++++++++---
3 files changed, 167 insertions(+), 45 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index ee6e6f3520..5d90ab1551 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -42,15 +42,16 @@ import org.apache.iotdb.confignode.persistence.PartitionInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
/** The PartitionManager Manages cluster PartitionTable read and write requests. */
public class PartitionManager {
@@ -128,23 +129,37 @@ public class PartitionManager {
*/
private Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> allocateSchemaPartition(
Map<String, List<TSeriesPartitionSlot>> noAssignedSchemaPartitionSlotsMap) {
+
Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> result = new HashMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaMap =
+ partitionInfo.getRegionReplicaMap();
for (String storageGroup : noAssignedSchemaPartitionSlotsMap.keySet()) {
+
List<TSeriesPartitionSlot> noAssignedPartitionSlots =
noAssignedSchemaPartitionSlotsMap.get(storageGroup);
- List<TRegionReplicaSet> schemaRegionReplicaSets =
- partitionInfo.getRegionReplicaSets(
+ // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
+ List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+ partitionInfo.getSortedRegionSlotsCounter(
getClusterSchemaManager()
.getRegionGroupIds(storageGroup, TConsensusGroupType.SchemaRegion));
- Random random = new Random();
Map<TSeriesPartitionSlot, TRegionReplicaSet> allocateResult = new HashMap<>();
- noAssignedPartitionSlots.forEach(
- seriesPartitionSlot ->
- allocateResult.put(
- seriesPartitionSlot,
- schemaRegionReplicaSets.get(random.nextInt(schemaRegionReplicaSets.size()))));
+ for (TSeriesPartitionSlot seriesPartitionSlot : noAssignedPartitionSlots) {
+ // Do greedy allocation
+ Pair<Long, TConsensusGroupId> bestRegion = regionSlotsCounter.get(0);
+ allocateResult.put(seriesPartitionSlot, regionReplicaMap.get(bestRegion.getRight()));
+
+ // Bubble sort
+ int index = 0;
+ regionSlotsCounter.set(0, new Pair<>(bestRegion.getLeft() + 1, bestRegion.getRight()));
+ while (index < regionSlotsCounter.size() - 1
+ && regionSlotsCounter.get(index).getLeft()
+ > regionSlotsCounter.get(index + 1).getLeft()) {
+ Collections.swap(regionSlotsCounter, index, index + 1);
+ index += 1;
+ }
+ }
result.put(storageGroup, allocateResult);
}
@@ -220,15 +235,18 @@ public class PartitionManager {
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
result = new HashMap<>();
+ Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaMap =
+ partitionInfo.getRegionReplicaMap();
for (String storageGroup : noAssignedDataPartitionSlotsMap.keySet()) {
+
Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> noAssignedPartitionSlotsMap =
noAssignedDataPartitionSlotsMap.get(storageGroup);
- List<TRegionReplicaSet> dataRegionEndPoints =
- partitionInfo.getRegionReplicaSets(
+ // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
+ List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+ partitionInfo.getSortedRegionSlotsCounter(
getClusterSchemaManager()
.getRegionGroupIds(storageGroup, TConsensusGroupType.DataRegion));
- Random random = new Random();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> allocateResult =
new HashMap<>();
@@ -236,10 +254,23 @@ public class PartitionManager {
noAssignedPartitionSlotsMap.entrySet()) {
allocateResult.put(seriesPartitionEntry.getKey(), new HashMap<>());
for (TTimePartitionSlot timePartitionSlot : seriesPartitionEntry.getValue()) {
+
+ // Do greedy allocation
+ Pair<Long, TConsensusGroupId> bestRegion = regionSlotsCounter.get(0);
allocateResult
.get(seriesPartitionEntry.getKey())
.computeIfAbsent(timePartitionSlot, key -> new ArrayList<>())
- .add(dataRegionEndPoints.get(random.nextInt(dataRegionEndPoints.size())));
+ .add(regionReplicaMap.get(bestRegion.getRight()));
+
+ // Bubble sort
+ int index = 0;
+ regionSlotsCounter.set(0, new Pair<>(bestRegion.getLeft() + 1, bestRegion.getRight()));
+ while (index < regionSlotsCounter.size() - 1
+ && regionSlotsCounter.get(index).getLeft()
+ > regionSlotsCounter.get(index + 1).getLeft()) {
+ Collections.swap(regionSlotsCounter, index, index + 1);
+ index += 1;
+ }
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
index 20a701ef34..b9af789868 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -58,10 +59,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -74,10 +75,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PartitionInfo implements SnapshotProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionInfo.class);
+
// Region read write lock
private final ReentrantReadWriteLock regionReadWriteLock;
private AtomicInteger nextRegionGroupId = new AtomicInteger(0);
- private final Map<TConsensusGroupId, TRegionReplicaSet> regionMap;
+ private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaMap;
+ // Map<TConsensusGroupId, allocatedSlotsNumber>
+ private final Map<TConsensusGroupId, Long> regionSlotsCounter;
// SchemaPartition read write lock
private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
@@ -94,7 +98,8 @@ public class PartitionInfo implements SnapshotProcessor {
public PartitionInfo() {
this.regionReadWriteLock = new ReentrantReadWriteLock();
- this.regionMap = new HashMap<>();
+ this.regionReplicaMap = new HashMap<>();
+ this.regionSlotsCounter = new HashMap<>();
this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
this.schemaPartition =
@@ -133,7 +138,8 @@ public class PartitionInfo implements SnapshotProcessor {
int maxRegionId = Integer.MIN_VALUE;
for (TRegionReplicaSet regionReplicaSet : req.getRegionMap().values()) {
- regionMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+ regionReplicaMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+ regionSlotsCounter.put(regionReplicaSet.getRegionId(), 0L);
maxRegionId = Math.max(maxRegionId, regionReplicaSet.getRegionId().getId());
}
@@ -161,7 +167,8 @@ public class PartitionInfo implements SnapshotProcessor {
regionReadWriteLock.writeLock().lock();
try {
for (TConsensusGroupId consensusGroupId : req.getConsensusGroupIds()) {
- regionMap.remove(consensusGroupId);
+ regionReplicaMap.remove(consensusGroupId);
+ regionSlotsCounter.remove(consensusGroupId);
}
result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
@@ -220,6 +227,7 @@ public class PartitionInfo implements SnapshotProcessor {
*/
public TSStatus createSchemaPartition(CreateSchemaPartitionReq req) {
schemaPartitionReadWriteLock.writeLock().lock();
+ regionReadWriteLock.writeLock().lock();
try {
// Allocate SchemaPartition by CreateSchemaPartitionPlan
@@ -228,10 +236,14 @@ public class PartitionInfo implements SnapshotProcessor {
assignedResult.forEach(
(storageGroup, partitionSlots) ->
partitionSlots.forEach(
- (seriesPartitionSlot, regionReplicaSet) ->
- schemaPartition.createSchemaPartition(
- storageGroup, seriesPartitionSlot, regionReplicaSet)));
+ (seriesPartitionSlot, regionReplicaSet) -> {
+ schemaPartition.createSchemaPartition(
+ storageGroup, seriesPartitionSlot, regionReplicaSet);
+ regionSlotsCounter.computeIfPresent(
+ regionReplicaSet.getRegionId(), (consensusGroupId, count) -> (count + 1));
+ }));
} finally {
+ regionReadWriteLock.writeLock().unlock();
schemaPartitionReadWriteLock.writeLock().unlock();
}
@@ -289,6 +301,7 @@ public class PartitionInfo implements SnapshotProcessor {
*/
public TSStatus createDataPartition(CreateDataPartitionReq req) {
dataPartitionReadWriteLock.writeLock().lock();
+ regionReadWriteLock.writeLock().lock();
try {
// Allocate DataPartition by CreateDataPartitionPlan
@@ -301,13 +314,18 @@ public class PartitionInfo implements SnapshotProcessor {
timePartitionSlotRegionReplicaSets.forEach(
((timePartitionSlot, regionReplicaSets) ->
regionReplicaSets.forEach(
- regionReplicaSet ->
- dataPartition.createDataPartition(
- storageGroup,
- seriesPartitionSlot,
- timePartitionSlot,
- regionReplicaSet)))))));
+ regionReplicaSet -> {
+ dataPartition.createDataPartition(
+ storageGroup,
+ seriesPartitionSlot,
+ timePartitionSlot,
+ regionReplicaSet);
+ regionSlotsCounter.computeIfPresent(
+ regionReplicaSet.getRegionId(),
+ (consensusGroupId, count) -> (count + 1));
+ }))))));
} finally {
+ regionReadWriteLock.writeLock().unlock();
dataPartitionReadWriteLock.writeLock().unlock();
}
@@ -341,7 +359,7 @@ public class PartitionInfo implements SnapshotProcessor {
regionReadWriteLock.readLock().lock();
try {
for (TConsensusGroupId groupId : groupIds) {
- result.add(regionMap.get(groupId));
+ result.add(regionReplicaMap.get(groupId));
}
} finally {
regionReadWriteLock.readLock().unlock();
@@ -354,7 +372,35 @@ public class PartitionInfo implements SnapshotProcessor {
List<TRegionReplicaSet> result;
regionReadWriteLock.readLock().lock();
try {
- result = new ArrayList<>(regionMap.values());
+ result = new ArrayList<>(regionReplicaMap.values());
+ } finally {
+ regionReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ /** @return A copy of regionReplicaMap */
+ public Map<TConsensusGroupId, TRegionReplicaSet> getRegionReplicaMap() {
+ Map<TConsensusGroupId, TRegionReplicaSet> result;
+ regionReadWriteLock.readLock().lock();
+ try {
+ result = new HashMap<>(regionReplicaMap);
+ } finally {
+ regionReadWriteLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ /** @return The specific Regions that sorted by the number of allocated slots */
+ public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(
+ List<TConsensusGroupId> consensusGroupIds) {
+ List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
+ regionReadWriteLock.readLock().lock();
+ try {
+ for (TConsensusGroupId consensusGroupId : consensusGroupIds) {
+ result.add(new Pair<>(regionSlotsCounter.get(consensusGroupId), consensusGroupId));
+ }
+ result.sort(Comparator.comparingLong(Pair::getLeft));
} finally {
regionReadWriteLock.readLock().unlock();
}
@@ -491,14 +537,22 @@ public class PartitionInfo implements SnapshotProcessor {
return schemaPartition;
}
+ @TestOnly
+ public Map<TConsensusGroupId, Long> getRegionSlotsCounter() {
+ return regionSlotsCounter;
+ }
+
private void serializeRegionMap(ByteBuffer buffer) throws TException, IOException {
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(out)) {
TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
- for (Entry<TConsensusGroupId, TRegionReplicaSet> entry : regionMap.entrySet()) {
- entry.getKey().write(protocol);
- entry.getValue().write(protocol);
+
+ for (TConsensusGroupId consensusGroupId : regionReplicaMap.keySet()) {
+ consensusGroupId.write(protocol);
+ regionReplicaMap.get(consensusGroupId).write(protocol);
+ protocol.writeI64(regionSlotsCounter.get(consensusGroupId));
}
+
byte[] toArray = out.toByteArray();
buffer.putInt(toArray.length);
buffer.put(toArray);
@@ -513,18 +567,23 @@ public class PartitionInfo implements SnapshotProcessor {
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(in)) {
while (in.available() > 0) {
TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
+
TConsensusGroupId tConsensusGroupId = new TConsensusGroupId();
tConsensusGroupId.read(protocol);
TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
tRegionReplicaSet.read(protocol);
- regionMap.put(tConsensusGroupId, tRegionReplicaSet);
+ Long count = protocol.readI64();
+
+ regionReplicaMap.put(tConsensusGroupId, tRegionReplicaSet);
+ regionSlotsCounter.put(tConsensusGroupId, count);
}
}
}
public void clear() {
nextRegionGroupId = new AtomicInteger(0);
- regionMap.clear();
+ regionReplicaMap.clear();
+ regionSlotsCounter.clear();
if (schemaPartition.getSchemaPartitionMap() != null) {
schemaPartition.getSchemaPartitionMap().clear();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index e6668cd778..820ec3c1e7 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -53,7 +53,6 @@ public class PartitionInfoTest {
private static final File snapshotDir = new File(BASE_OUTPUT_PATH, "snapshot");
enum testFlag {
- RegionReplica(10),
DataPartition(20),
SchemaPartition(30);
@@ -89,25 +88,40 @@ public class PartitionInfoTest {
partitionInfo.generateNextRegionGroupId();
+ // Create a SchemaRegion
CreateRegionsReq createRegionsReq = new CreateRegionsReq();
+ TRegionReplicaSet schemaRegionReplicaSet =
+ generateTRegionReplicaSet(
+ testFlag.SchemaPartition.getFlag(),
+ generateTConsensusGroupId(
+ testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion));
+ createRegionsReq.addRegion("root.test", schemaRegionReplicaSet);
+ partitionInfo.createRegions(createRegionsReq);
- TRegionReplicaSet tRegionReplicaSet =
+ // Create a DataRegion
+ createRegionsReq = new CreateRegionsReq();
+ TRegionReplicaSet dataRegionReplicaSet =
generateTRegionReplicaSet(
- testFlag.RegionReplica.getFlag(),
- generateTConsensusGroupId(testFlag.RegionReplica.getFlag()));
- createRegionsReq.addRegion("root.test", tRegionReplicaSet);
+ testFlag.DataPartition.getFlag(),
+ generateTConsensusGroupId(
+ testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion));
+ createRegionsReq.addRegion("root.test", dataRegionReplicaSet);
partitionInfo.createRegions(createRegionsReq);
+ // Create a SchemaPartition
CreateSchemaPartitionReq createSchemaPartitionReq =
generateCreateSchemaPartitionReq(
testFlag.SchemaPartition.getFlag(),
- generateTConsensusGroupId(testFlag.SchemaPartition.getFlag()));
+ generateTConsensusGroupId(
+ testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion));
partitionInfo.createSchemaPartition(createSchemaPartitionReq);
+ // Create a DataPartition
CreateDataPartitionReq createDataPartitionReq =
generateCreateDataPartitionReq(
testFlag.DataPartition.getFlag(),
- generateTConsensusGroupId(testFlag.DataPartition.getFlag()));
+ generateTConsensusGroupId(
+ testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion));
partitionInfo.createDataPartition(createDataPartitionReq);
int nextId = partitionInfo.getNextRegionGroupId();
@@ -117,11 +131,23 @@ public class PartitionInfoTest {
Assert.assertEquals(nextId, (int) partitionInfo.getNextRegionGroupId());
+ // Check SchemaRegion
List<TRegionReplicaSet> reloadTRegionReplicaSet =
partitionInfo.getRegionReplicaSets(
- Collections.singletonList(generateTConsensusGroupId(testFlag.RegionReplica.getFlag())));
+ Collections.singletonList(
+ generateTConsensusGroupId(
+ testFlag.SchemaPartition.getFlag(), TConsensusGroupType.SchemaRegion)));
Assert.assertEquals(1, reloadTRegionReplicaSet.size());
- Assert.assertEquals(tRegionReplicaSet, reloadTRegionReplicaSet.get(0));
+ Assert.assertEquals(schemaRegionReplicaSet, reloadTRegionReplicaSet.get(0));
+
+ // Check DataRegion
+ reloadTRegionReplicaSet =
+ partitionInfo.getRegionReplicaSets(
+ Collections.singletonList(
+ generateTConsensusGroupId(
+ testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion)));
+ Assert.assertEquals(1, reloadTRegionReplicaSet.size());
+ Assert.assertEquals(dataRegionReplicaSet, reloadTRegionReplicaSet.get(0));
Assert.assertEquals(
createDataPartitionReq.getAssignedDataPartition(),
@@ -130,6 +156,11 @@ public class PartitionInfoTest {
Assert.assertEquals(
createSchemaPartitionReq.getAssignedSchemaPartition(),
partitionInfo.getSchemaPartition().getSchemaPartitionMap());
+
+ Assert.assertEquals(2, partitionInfo.getRegionSlotsCounter().size());
+ for (Long count : partitionInfo.getRegionSlotsCounter().values()) {
+ Assert.assertEquals(1, count.intValue());
+ }
}
private TRegionReplicaSet generateTRegionReplicaSet(
@@ -187,7 +218,8 @@ public class PartitionInfoTest {
return createSchemaPartitionReq;
}
- private TConsensusGroupId generateTConsensusGroupId(int startFlag) {
- return new TConsensusGroupId(TConsensusGroupType.PartitionRegion, 111000 + startFlag);
+ private TConsensusGroupId generateTConsensusGroupId(
+ int startFlag, TConsensusGroupType consensusGroupType) {
+ return new TConsensusGroupId(consensusGroupType, 111000 + startFlag);
}
}