You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/08/25 08:49:51 UTC
[iotdb] branch IOTDB-3455 updated: finish multiple data region
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3455
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-3455 by this push:
new 1739d035ab finish multiple data region
1739d035ab is described below
commit 1739d035abf760d4ed69b197c48ab466ca40ced4
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Aug 25 16:49:42 2022 +0800
finish multiple data region
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/localconfignode/LocalConfigNode.java | 22 ++--
.../db/localconfignode/LocalDataPartitionInfo.java | 18 +--
.../localconfignode/LocalDataPartitionTable.java | 128 +++------------------
.../plan/scheduler/StandaloneSchedulerTest.java | 9 +-
5 files changed, 32 insertions(+), 147 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 379ba1fe6c..a7bb686b98 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -813,7 +813,7 @@ public class IoTDBConfig {
private int ioTaskQueueSizeForFlushing = 10;
/** the number of data regions per user-defined storage group */
- private int dataRegionNum = 1;
+ private int dataRegionNum = 3;
/** the interval to log recover progress of each vsg when starting iotdb */
private long recoveryLogIntervalInMs = 5_000L;
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index fb02237665..d533abae61 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -857,12 +857,10 @@ public class LocalConfigNode {
* root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
* thrown.
*/
- public DataRegionId getBelongedDataRegionId(
- PartialPath path, TTimePartitionSlot timePartitionSlot)
+ public DataRegionId getBelongedDataRegionId(PartialPath path)
throws MetadataException, DataRegionException {
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
- DataRegionId dataRegionId =
- dataPartitionTable.getDataRegionId(storageGroup, path, timePartitionSlot);
+ DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
if (dataRegionId == null) {
return null;
}
@@ -877,16 +875,13 @@ public class LocalConfigNode {
}
// This interface involves storage group and data region auto creation
- public DataRegionId getBelongedDataRegionIdWithAutoCreate(
- PartialPath path, TTimePartitionSlot timePartitionSlot)
+ public DataRegionId getBelongedDataRegionIdWithAutoCreate(PartialPath devicePath)
throws MetadataException, DataRegionException {
- PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
- DataRegionId dataRegionId =
- dataPartitionTable.getDataRegionId(storageGroup, path, timePartitionSlot);
+ PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(devicePath);
+ DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, devicePath);
if (dataRegionId == null) {
dataPartitionTable.registerStorageGroup(storageGroup);
- dataRegionId =
- dataPartitionTable.allocateDataRegionForNewSlot(storageGroup, path, timePartitionSlot);
+ dataRegionId = dataPartitionTable.allocateDataRegionForNewSlot(storageGroup, devicePath);
}
DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
if (dataRegion == null) {
@@ -990,8 +985,7 @@ public class LocalConfigNode {
continue;
}
for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
- DataRegionId dataRegionId =
- getBelongedDataRegionId(new PartialPath(deviceId), timePartitionSlot);
+ DataRegionId dataRegionId = getBelongedDataRegionId(new PartialPath(deviceId));
// dataRegionId is null means the DataRegion is not created,
// use an empty dataPartitionMap to init DataPartition
if (dataRegionId != null) {
@@ -1052,7 +1046,7 @@ public class LocalConfigNode {
dataPartitionQueryParam.getTimePartitionSlotList();
for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
DataRegionId dataRegionId =
- getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId), timePartitionSlot);
+ getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionToRegionsMap =
deviceToRegionsMap.getOrDefault(
executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
index d267938080..04305c4b32 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
@@ -19,14 +19,12 @@
package org.apache.iotdb.db.localconfignode;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -62,18 +60,12 @@ public class LocalDataPartitionInfo {
}
}
- public DataRegionId getDataRegionId(
- PartialPath storageGroup, PartialPath path, TTimePartitionSlot timePartitionSlot) {
+ public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath path) {
if (!partitionTableMap.containsKey(storageGroup)) {
return null;
}
LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
- Map<TTimePartitionSlot, DataRegionId> slotRegionMap = new HashMap<>();
- if (!table.getDataRegionId(path, Collections.singletonList(timePartitionSlot), slotRegionMap)) {
- return null;
- } else {
- return slotRegionMap.get(timePartitionSlot);
- }
+ return table.getDataRegionId(path);
}
/**
@@ -82,13 +74,11 @@ public class LocalDataPartitionInfo {
*
* @param storageGroup The path for the storage group.
* @param path The full path for the series.
- * @param timePartitionSlot The time partition slot to allocate.
* @return The data region id for the time partition slot.
*/
- public DataRegionId allocateDataRegionForNewSlot(
- PartialPath storageGroup, PartialPath path, TTimePartitionSlot timePartitionSlot) {
+ public DataRegionId allocateDataRegionForNewSlot(PartialPath storageGroup, PartialPath path) {
LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
- return table.getDataRegionWithAutoExtension(path, timePartitionSlot);
+ return table.getDataRegionWithAutoExtension(path);
}
public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
index 3a8af9c407..75c8f6c636 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
@@ -19,44 +19,28 @@
package org.apache.iotdb.db.localconfignode;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.partition.SeriesPartitionTable;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
public class LocalDataPartitionTable {
private static final Logger LOG = LoggerFactory.getLogger(LocalDataPartitionTable.class);
private String storageGroupName;
- private Map<PartialPath, SeriesPartitionTable> partitionTableMap;
- private List<DataRegionId> regionList = new ArrayList<>();
- private Map<DataRegionId, AtomicInteger> regionSlotCountMap = new ConcurrentHashMap<>();
-
- public LocalDataPartitionTable() {}
+ private final int regionNum = IoTDBDescriptor.getInstance().getConfig().getDataRegionNum();
+ private DataRegionId[] regionIds;
public LocalDataPartitionTable(String storageGroupName) {
this.storageGroupName = storageGroupName;
- this.partitionTableMap = new ConcurrentHashMap<>();
+ this.regionIds = new DataRegionId[regionNum];
}
public void init(ByteBuffer buffer) {
@@ -68,30 +52,14 @@ public class LocalDataPartitionTable {
}
/**
- * Get the TimePartitionSlot to DataRegionId Map. The result is stored in param slotRegionMap.
+ * Get the data region id which the path located in.
*
* @param path The full path for the series.
- * @param timePartitionSlots The time partition slots for the series.
- * @param slotRegionMap The map that store the result.
- * @return Whether all the partitions exist.
+ * @return The region id for the path.
*/
- public boolean getDataRegionId(
- PartialPath path,
- List<TTimePartitionSlot> timePartitionSlots,
- Map<TTimePartitionSlot, DataRegionId> slotRegionMap) {
- if (!partitionTableMap.containsKey(path)) {
- return false;
- }
- SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
- boolean allPartitionExists =
- partitionTableMap.get(path).getDataPartition(timePartitionSlots, seriesPartitionTable);
- for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> entry :
- seriesPartitionTable.getSeriesPartitionMap().entrySet()) {
- if (entry.getValue().size() > 0) {
- slotRegionMap.put(entry.getKey(), new DataRegionId(entry.getValue().get(0).getId()));
- }
- }
- return allPartitionExists;
+ public DataRegionId getDataRegionId(PartialPath path) {
+ int idx = Math.abs(path.hashCode()) % regionNum;
+ return regionIds[idx];
}
/**
@@ -100,82 +68,20 @@ public class LocalDataPartitionTable {
* @return data region id in list
*/
public List<DataRegionId> getAllDataRegionId() {
- Set<DataRegionId> regionIdSet = new HashSet<>();
- for (SeriesPartitionTable partitionTable : partitionTableMap.values()) {
- Map<TTimePartitionSlot, List<TConsensusGroupId>> partitionGroupIdMap =
- partitionTable.getSeriesPartitionMap();
- for (List<TConsensusGroupId> ids : partitionGroupIdMap.values()) {
- ids.forEach(x -> regionIdSet.add(new DataRegionId(x.getId())));
- }
- }
- return new ArrayList<>(regionIdSet);
- }
-
- public int getTimeSlotNum() {
- int slotCount = 0;
- for (SeriesPartitionTable partitionTable : partitionTableMap.values()) {
- Map<TTimePartitionSlot, List<TConsensusGroupId>> partitionGroupIdMap =
- partitionTable.getSeriesPartitionMap();
- slotCount += partitionGroupIdMap.size();
- }
- return slotCount;
- }
-
- public DataRegionId getDataRegionWithAutoExtension(
- PartialPath path, TTimePartitionSlot timePartitionSlot) {
- DataRegionId regionId;
- if ((regionId = checkExpansion(path, timePartitionSlot)) != null) {
- // return the newly allocated region
- regionSlotCountMap.get(regionId).incrementAndGet();
- return regionId;
- }
-
- // select a region with min time partition slot
- List<Pair<DataRegionId, AtomicInteger>> slotCountList = new ArrayList<>();
- regionSlotCountMap.forEach((id, count) -> slotCountList.add(new Pair<>(id, count)));
- slotCountList.sort(Comparator.comparingInt(o -> o.right.get()));
- DataRegionId chosenId = slotCountList.get(0).left;
- regionSlotCountMap.get(chosenId).incrementAndGet();
- return chosenId;
- }
-
- public DataRegionId checkExpansion(PartialPath path, TTimePartitionSlot timePartitionSlot) {
- if (regionList.size() == 0) {
- // there is no data region for this storage group, create one
- return carryOutExpansion(path, timePartitionSlot);
- }
-
- // check the load and carry out the expansion if necessary
- double allocatedSlotNum = getTimeSlotNum();
- double maxRegionNum = IoTDBDescriptor.getInstance().getConfig().getDataRegionNum();
- double maxSlotNum = IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
- if (regionList.size() < maxRegionNum
- && allocatedSlotNum / (double) regionList.size() > maxSlotNum / maxRegionNum) {
- return carryOutExpansion(path, timePartitionSlot);
- }
- return null;
+ return Arrays.asList(regionIds);
}
- private DataRegionId carryOutExpansion(PartialPath path, TTimePartitionSlot timePartitionSlot) {
- if (!partitionTableMap.containsKey(path)) {
- partitionTableMap.put(path, new SeriesPartitionTable());
+ public DataRegionId getDataRegionWithAutoExtension(PartialPath path) {
+ int idx = Math.abs(path.hashCode()) % regionNum;
+ if (regionIds[idx] == null) {
+ int nextId = DataRegionIdGenerator.getInstance().getNextId();
+ regionIds[idx] = new DataRegionId(nextId);
}
- SeriesPartitionTable seriesPartitionTable = partitionTableMap.get(path);
- Map<TTimePartitionSlot, List<TConsensusGroupId>> allotmentMap = new HashMap<>();
- int nextRegionId = DataRegionIdGenerator.getInstance().getNextId();
- allotmentMap.put(
- timePartitionSlot,
- Collections.singletonList(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, nextRegionId)));
- SeriesPartitionTable requestTable = new SeriesPartitionTable(allotmentMap);
- Map<TConsensusGroupId, AtomicInteger> deltaMap = new HashMap<>();
- seriesPartitionTable.createDataPartition(requestTable, deltaMap);
- regionList.add(new DataRegionId(nextRegionId));
- regionSlotCountMap.put(new DataRegionId(nextRegionId), new AtomicInteger(0));
- return new DataRegionId(nextRegionId);
+ return regionIds[idx];
}
public void clear() {
// TODO: clear the table
+ regionIds = null;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 3afef548f5..34e2a9e85f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -72,8 +71,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import static org.apache.iotdb.db.localconfignode.LocalConfigNode.STANDALONE_MOCK_TIME_SLOT_START_TIME;
-
public class StandaloneSchedulerTest {
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
@@ -409,8 +406,7 @@ public class StandaloneSchedulerTest {
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath(deviceId));
- configNode.getBelongedDataRegionIdWithAutoCreate(
- new PartialPath(deviceId), new TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME));
+ configNode.getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
MPPQueryContext context =
new MPPQueryContext(
"",
@@ -491,8 +487,7 @@ public class StandaloneSchedulerTest {
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);
- configNode.getBelongedDataRegionIdWithAutoCreate(
- deviceId, new TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME));
+ configNode.getBelongedDataRegionIdWithAutoCreate(deviceId);
MPPQueryContext context =
new MPPQueryContext(
"",