You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/07 00:55:46 UTC
[iotdb] branch master updated: [IOTDB-3637] Optimize getOrCreatePartition process (#6596)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f7464253fb [IOTDB-3637] Optimize getOrCreatePartition process (#6596)
f7464253fb is described below
commit f7464253fbcee139d60675f01769ca1c0f4a2b41
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Thu Jul 7 08:55:40 2022 +0800
[IOTDB-3637] Optimize getOrCreatePartition process (#6596)
---
.../iotdb/confignode/manager/PartitionManager.java | 343 +++++++--------------
.../persistence/partition/PartitionInfo.java | 31 --
.../partition/StorageGroupPartitionTable.java | 54 ----
.../commons/partition/DataPartitionTable.java | 10 +-
4 files changed, 126 insertions(+), 312 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 557d9527bc..f2ac6fc467 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -47,10 +47,8 @@ import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
-import org.apache.iotdb.confignode.exception.TimeoutException;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -59,11 +57,10 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -145,40 +142,42 @@ public class PartitionManager {
return resp;
}
- // Otherwise, fist ensure that each StorageGroup has at least one SchemaRegion.
- // This block of code is still parallel and concurrent safe.
- // Thus, we can prepare the SchemaRegions with maximum efficiency.
- TSStatus status =
- initializeRegionsIfNecessary(
- new ArrayList<>(req.getPartitionSlotsMap().keySet()), TConsensusGroupType.SchemaRegion);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- resp.setStatus(status);
- return resp;
- }
-
- // Next, we serialize the creation process of SchemaPartitions to
+ // We serialize the creation process of SchemaPartitions to
// ensure that each SchemaPartition is created by a unique CreateSchemaPartitionReq.
- // Because the number of SchemaPartitions per storage group is limited by the number of
- // SeriesPartitionSlots,
+ // Because the number of SchemaPartitions per storage group is limited
+ // by the number of SeriesPartitionSlots,
// the number of serialized CreateSchemaPartitionReqs is acceptable.
synchronized (this) {
// Filter unassigned SchemaPartitionSlots
- Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlots =
+ Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap =
partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap());
- if (unassignedSchemaPartitionSlots.size() > 0) {
- // Allocate SchemaPartitions
- Map<String, SchemaPartitionTable> assignedSchemaPartition =
- getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlots);
- // Cache allocating result
- CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
- createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
- getConsensusManager().write(createPlan);
+
+ // Here we ensure that each StorageGroup has at least one SchemaRegion.
+ // And if some StorageGroups own too many slots, extend SchemaRegion for them.
+
+ // Map<StorageGroup, unassigned SeriesPartitionSlot count>
+ Map<String, Integer> unassignedSchemaPartitionSlotsCountMap = new ConcurrentHashMap<>();
+ unassignedSchemaPartitionSlotsMap.forEach(
+ (storageGroup, unassignedSchemaPartitionSlots) ->
+ unassignedSchemaPartitionSlotsCountMap.put(
+ storageGroup, unassignedSchemaPartitionSlots.size()));
+ TSStatus status =
+ extendRegionsIfNecessary(
+ unassignedSchemaPartitionSlotsCountMap, TConsensusGroupType.SchemaRegion);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Return an error code if Region extension failed
+ resp.setStatus(status);
+ return resp;
}
- }
- // Finally, if some StorageGroups own too many slots, extend SchemaRegion for them.
- extendRegionsIfNecessary(
- new ArrayList<>(req.getPartitionSlotsMap().keySet()), TConsensusGroupType.SchemaRegion);
+ // Allocate SchemaPartitions
+ Map<String, SchemaPartitionTable> assignedSchemaPartition =
+ getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+ // Cache allocating result
+ CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
+ createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
+ getConsensusManager().write(createPlan);
+ }
return getSchemaPartition(req);
}
@@ -194,48 +193,50 @@ public class PartitionManager {
* StorageGroup doesn't exist.
*/
public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) {
- // After all the SchemaPartitions are allocated,
- // all the read requests about SchemaPartitionTable are parallel.
+ // After all the DataPartitions are allocated,
+ // all the read requests about DataPartitionTable are parallel.
DataPartitionResp resp = (DataPartitionResp) getDataPartition(req);
if (resp.isAllPartitionsExist()) {
return resp;
}
- // Otherwise, fist ensure that each StorageGroup has at least one DataRegion.
- // This block of code is still parallel and concurrent safe.
- // Thus, we can prepare the DataRegions with maximum efficiency.
- TSStatus status =
- initializeRegionsIfNecessary(
- new ArrayList<>(req.getPartitionSlotsMap().keySet()), TConsensusGroupType.DataRegion);
- if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- resp.setStatus(status);
- return resp;
- }
-
- // Next, we serialize the creation process of DataPartitions to
+ // We serialize the creation process of DataPartitions to
// ensure that each DataPartition is created by a unique CreateDataPartitionReq.
- // Because the number of DataPartitions per storage group per day is limited by the number of
- // SeriesPartitionSlots,
+ // Because the number of DataPartitions per storage group is limited
+ // by the number of SeriesPartitionSlots,
// the number of serialized CreateDataPartitionReqs is acceptable.
synchronized (this) {
// Filter unassigned DataPartitionSlots
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlots =
+ unassignedDataPartitionSlotsMap =
partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap());
- if (unassignedDataPartitionSlots.size() > 0) {
- // Allocate DataPartitions
- Map<String, DataPartitionTable> assignedDataPartition =
- getLoadManager().allocateDataPartition(unassignedDataPartitionSlots);
- // Cache allocating result
- CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
- createPlan.setAssignedDataPartition(assignedDataPartition);
- getConsensusManager().write(createPlan);
+
+ // Here we ensure that each StorageGroup has at least one DataRegion.
+ // And if some StorageGroups own too many slots, extend DataRegion for them.
+
+ // Map<StorageGroup, unassigned SeriesPartitionSlot count>
+ Map<String, Integer> unassignedDataPartitionSlotsCountMap = new ConcurrentHashMap<>();
+ unassignedDataPartitionSlotsMap.forEach(
+ (storageGroup, unassignedDataPartitionSlots) ->
+ unassignedDataPartitionSlotsCountMap.put(
+ storageGroup, unassignedDataPartitionSlots.size()));
+ TSStatus status =
+ extendRegionsIfNecessary(
+ unassignedDataPartitionSlotsCountMap, TConsensusGroupType.DataRegion);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // Return an error code if Region extension failed
+ resp.setStatus(status);
+ return resp;
}
- }
- // Finally, if some StorageGroups own too many slots, extend DataRegion for them.
- extendRegionsIfNecessary(
- new ArrayList<>(req.getPartitionSlotsMap().keySet()), TConsensusGroupType.DataRegion);
+ // Allocate DataPartitions
+ Map<String, DataPartitionTable> assignedDataPartition =
+ getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+ // Cache allocating result
+ CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+ createPlan.setAssignedDataPartition(assignedDataPartition);
+ getConsensusManager().write(createPlan);
+ }
return getDataPartition(req);
}
@@ -244,185 +245,79 @@ public class PartitionManager {
// Leader scheduling interfaces
// ======================================================
- /** Handle the exceptions from initializeRegions */
- private TSStatus initializeRegionsIfNecessary(
- List<String> storageGroups, TConsensusGroupType consensusGroupType) {
- try {
- initializeRegions(storageGroups, consensusGroupType);
- } catch (NotEnoughDataNodeException e) {
- return new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate Partition because there are not enough DataNodes");
- } catch (TimeoutException e) {
- return new TSStatus(TSStatusCode.TIME_OUT.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate Partition because waiting for another thread's Region allocation timeout.");
- } catch (StorageGroupNotExistsException e) {
- return new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
- .setMessage(
- "ConfigNode failed to allocate DataPartition because some StorageGroup doesn't exist.");
- }
- return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
-
/**
- * Initialize one Region for each StorageGroup who doesn't have any.
+ * Allocate more Regions to StorageGroups who have too many slots.
*
- * @param storageGroups List<StorageGroupName>
+ * @param unassignedPartitionSlotsCountMap Map<StorageGroup, unassigned Partition count>
* @param consensusGroupType SchemaRegion or DataRegion
- * @throws NotEnoughDataNodeException When the number of online DataNodes are too small to
- * allocate Regions
- * @throws TimeoutException When waiting other threads to allocate Regions for too long
- * @throws StorageGroupNotExistsException When some StorageGroups don't exist
+ * @return SUCCESS_STATUS when Region extension successful; NOT_ENOUGH_DATA_NODE when there are
+ * not enough DataNodes; STORAGE_GROUP_NOT_EXIST when some StorageGroups don't exist
*/
- private void initializeRegions(List<String> storageGroups, TConsensusGroupType consensusGroupType)
- throws NotEnoughDataNodeException, TimeoutException, StorageGroupNotExistsException {
-
- int leastDataNode = 0;
- Map<String, Integer> unreadyStorageGroupMap = new HashMap<>();
- for (String storageGroup : storageGroups) {
- if (getRegionCount(storageGroup, consensusGroupType) == 0) {
- // Update leastDataNode
- TStorageGroupSchema storageGroupSchema =
- getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
- switch (consensusGroupType) {
- case SchemaRegion:
- leastDataNode =
- Math.max(leastDataNode, storageGroupSchema.getSchemaReplicationFactor());
- break;
- case DataRegion:
- leastDataNode = Math.max(leastDataNode, storageGroupSchema.getDataReplicationFactor());
+ private TSStatus extendRegionsIfNecessary(
+ Map<String, Integer> unassignedPartitionSlotsCountMap,
+ TConsensusGroupType consensusGroupType) {
+ TSStatus result = new TSStatus();
+
+ try {
+ // Map<StorageGroup, Region allotment>
+ Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
+
+ for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
+ float allocatedRegionCount =
+ partitionInfo.getRegionCount(entry.getKey(), consensusGroupType);
+ // The slotCount equals to the sum of assigned slot count and unassigned slot count
+ float slotCount = partitionInfo.getSlotCount(entry.getKey()) + entry.getValue();
+ float maxRegionCount =
+ getClusterSchemaManager().getMaxRegionGroupCount(entry.getKey(), consensusGroupType);
+ float maxSlotCount =
+ ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
+
+ /* Region extension is required in the following two cases */
+ // 1. There are no Region has been created for the current StorageGroup
+ if (allocatedRegionCount == 0) {
+ // The delta is equal to the smallest integer solution that satisfies the inequality:
+ // slotCount / delta < maxSlotCount / maxRegionCount
+ int delta =
+ Math.min(
+ (int) maxRegionCount,
+ Math.max(1, (int) Math.ceil(slotCount * maxRegionCount / maxSlotCount)));
+ allotmentMap.put(entry.getKey(), delta);
+ continue;
}
- // Recording StorageGroups without Region
- unreadyStorageGroupMap.put(storageGroup, 1);
+ // 2. The average number of partitions held by each Region is greater than the expected
+ // average
+ // when the partition allocation is complete
+ if (allocatedRegionCount < maxRegionCount
+ && slotCount / allocatedRegionCount > maxSlotCount / maxRegionCount) {
+ // The delta is equal to the smallest integer solution that satisfies the inequality:
+ // slotCount / (allocatedRegionCount + delta) < maxSlotCount / maxRegionCount
+ int delta =
+ Math.min(
+ (int) (maxRegionCount - allocatedRegionCount),
+ Math.max(
+ 1,
+ (int)
+ Math.ceil(
+ slotCount * maxRegionCount / maxSlotCount - allocatedRegionCount)));
+ allotmentMap.put(entry.getKey(), delta);
+ }
}
- }
- if (getNodeManager().getRegisteredDataNodeCount() < leastDataNode) {
- // Make sure DataNodes enough
- throw new NotEnoughDataNodeException();
- }
- doOrWaitRegionCreation(unreadyStorageGroupMap, consensusGroupType);
- }
+ // TODO: Use procedure to protect the following process
+ // Do Region allocation and creation for StorageGroups based on the allotment
+ getLoadManager().doRegionCreation(allotmentMap, consensusGroupType);
- /** Handle the exceptions from extendRegions */
- private void extendRegionsIfNecessary(
- List<String> storageGroups, TConsensusGroupType consensusGroupType) {
- try {
- extendRegions(storageGroups, consensusGroupType);
+ result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (NotEnoughDataNodeException e) {
LOGGER.error("ConfigNode failed to extend Region because there are not enough DataNodes");
- } catch (TimeoutException e) {
- LOGGER.error(
- "ConfigNode failed to extend Region because waiting for another thread's Region allocation timeout.");
+ result.setCode(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
} catch (StorageGroupNotExistsException e) {
LOGGER.error("ConfigNode failed to extend Region because some StorageGroup doesn't exist.");
+ result.setCode(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode());
}
- }
-
- /**
- * Allocate more Regions to StorageGroups who have too many slots.
- *
- * @param storageGroups List<StorageGroupName>
- * @param consensusGroupType SchemaRegion or DataRegion
- * @throws StorageGroupNotExistsException When some StorageGroups don't exist
- * @throws NotEnoughDataNodeException When the number of online DataNodes are too small to
- * allocate Regions
- * @throws TimeoutException When waiting other threads to allocate Regions for too long
- */
- private void extendRegions(List<String> storageGroups, TConsensusGroupType consensusGroupType)
- throws StorageGroupNotExistsException, NotEnoughDataNodeException, TimeoutException {
- // Map<StorageGroup, Region allotment>
- Map<String, Integer> filledStorageGroupMap = new HashMap<>();
- for (String storageGroup : storageGroups) {
- float regionCount = partitionInfo.getRegionCount(storageGroup, consensusGroupType);
- float slotCount = partitionInfo.getSlotCount(storageGroup);
- float maxRegionCount =
- getClusterSchemaManager().getMaxRegionGroupCount(storageGroup, consensusGroupType);
- float maxSlotCount = ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
-
- // Need extension
- if (regionCount < maxRegionCount && slotCount / regionCount > maxSlotCount / maxRegionCount) {
- // The delta is equal to the smallest integer solution that satisfies the inequality:
- // slotCount / (regionCount + delta) < maxSlotCount / maxRegionCount
- int delta =
- Math.min(
- (int) (maxRegionCount - regionCount),
- Math.max(
- 1, (int) Math.ceil(slotCount * maxRegionCount / maxSlotCount - regionCount)));
- filledStorageGroupMap.put(storageGroup, delta);
- }
- }
-
- doOrWaitRegionCreation(filledStorageGroupMap, consensusGroupType);
- }
-
- /**
- * Do Region creation for those StorageGroups who get the allocation particle, for those who
- * doesn't, waiting until other threads finished the creation process.
- *
- * @param allotmentMap Map<StorageGroup, Region allotment>
- * @param consensusGroupType SchemaRegion or DataRegion
- * @throws NotEnoughDataNodeException When the number of online DataNodes are too small to *
- * allocate Regions
- * @throws StorageGroupNotExistsException When some StorageGroups don't exist
- * @throws TimeoutException When waiting other threads to allocate Regions for too long
- */
- private void doOrWaitRegionCreation(
- Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
- throws NotEnoughDataNodeException, StorageGroupNotExistsException, TimeoutException {
- // StorageGroups who get the allocation particle
- Map<String, Integer> allocateMap = new HashMap<>();
- // StorageGroups who doesn't get the allocation particle
- List<String> waitingList = new ArrayList<>();
- for (String storageGroup : allotmentMap.keySet()) {
- // Try to get the allocation particle
- if (partitionInfo.contendRegionAllocationParticle(storageGroup, consensusGroupType)) {
- // Initialize one Region
- allocateMap.put(storageGroup, allotmentMap.get(storageGroup));
- } else {
- waitingList.add(storageGroup);
- }
- }
-
- // TODO: Use procedure to protect the following process
- // Do Region allocation and creation for those StorageGroups who get the particle
- getLoadManager().doRegionCreation(allocateMap, consensusGroupType);
- // Put back particles after that
- for (String storageGroup : allocateMap.keySet()) {
- partitionInfo.putBackRegionAllocationParticle(storageGroup, consensusGroupType);
- }
-
- // Waiting Region creation for those StorageGroups who don't get the particle
- waitRegionCreation(waitingList, consensusGroupType);
- }
- /** Waiting Region creation for those StorageGroups who don't get the particle */
- private void waitRegionCreation(List<String> waitingList, TConsensusGroupType consensusGroupType)
- throws TimeoutException {
- for (int retry = 0; retry < 100; retry++) {
- boolean allocationFinished = true;
- for (String storageGroup : waitingList) {
- if (!partitionInfo.getRegionAllocationParticle(storageGroup, consensusGroupType)) {
- // If a StorageGroup's Region allocation particle doesn't return,
- // the Region creation process is not complete
- allocationFinished = false;
- break;
- }
- }
- if (allocationFinished) {
- return;
- }
-
- try {
- // Sleep 200ms to wait Region allocation
- TimeUnit.MILLISECONDS.sleep(200);
- } catch (InterruptedException e) {
- LOGGER.warn("The PartitionManager is interrupted.", e);
- }
- }
- throw new TimeoutException("");
+ return result;
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 52d3e8b120..90592cdbf2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -531,37 +531,6 @@ public class PartitionInfo implements SnapshotProcessor {
return storageGroupPartitionTables.get(storageGroup).getSlotsCount();
}
- /**
- * Only leader use this interface. Contending the Region allocation particle.
- *
- * @param storageGroup StorageGroupName
- * @param type SchemaRegion or DataRegion
- * @return True when successfully get the allocation particle, false otherwise
- */
- public boolean contendRegionAllocationParticle(String storageGroup, TConsensusGroupType type) {
- return storageGroupPartitionTables.get(storageGroup).contendRegionAllocationParticle(type);
- }
-
- /**
- * Only leader use this interface. Put back the Region allocation particle.
- *
- * @param storageGroup StorageGroupName
- * @param type SchemaRegion or DataRegion
- */
- public void putBackRegionAllocationParticle(String storageGroup, TConsensusGroupType type) {
- storageGroupPartitionTables.get(storageGroup).putBackRegionAllocationParticle(type);
- }
-
- /**
- * Only leader use this interface. Get the Region allocation particle.
- *
- * @param storageGroup StorageGroupName
- * @param type SchemaRegion or DataRegion
- */
- public boolean getRegionAllocationParticle(String storageGroup, TConsensusGroupType type) {
- return storageGroupPartitionTables.get(storageGroup).getRegionAllocationParticle(type);
- }
-
/**
* Get the DataNodes who contain the specific StorageGroup's Schema or Data
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index e4a6ed4431..d584062ec6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -52,7 +52,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class StorageGroupPartitionTable {
@@ -65,10 +64,6 @@ public class StorageGroupPartitionTable {
// determines whether a new Region needs to be created
private final AtomicInteger seriesPartitionSlotsCount;
- // Region allocation particle
- private final AtomicBoolean schemaRegionParticle;
- private final AtomicBoolean dataRegionParticle;
-
// Region
private final Map<TConsensusGroupId, RegionGroup> regionGroupMap;
// SchemaPartition
@@ -80,8 +75,6 @@ public class StorageGroupPartitionTable {
this.storageGroupName = storageGroupName;
this.seriesPartitionSlotsCount = new AtomicInteger(0);
- this.schemaRegionParticle = new AtomicBoolean(true);
- this.dataRegionParticle = new AtomicBoolean(true);
this.regionGroupMap = new ConcurrentHashMap<>();
this.schemaPartitionTable = new SchemaPartitionTable();
@@ -206,53 +199,6 @@ public class StorageGroupPartitionTable {
return result.getAndIncrement();
}
- /**
- * Only leader use this interface. Contending the Region allocation particle.
- *
- * @param type SchemaRegion or DataRegion
- * @return True when successfully get the allocation particle, false otherwise
- */
- public boolean contendRegionAllocationParticle(TConsensusGroupType type) {
- switch (type) {
- case SchemaRegion:
- return schemaRegionParticle.getAndSet(false);
- case DataRegion:
- return dataRegionParticle.getAndSet(false);
- default:
- return false;
- }
- }
-
- /**
- * Only leader use this interface. Put back the Region allocation particle.
- *
- * @param type SchemaRegion or DataRegion
- */
- public void putBackRegionAllocationParticle(TConsensusGroupType type) {
- switch (type) {
- case SchemaRegion:
- schemaRegionParticle.set(true);
- case DataRegion:
- dataRegionParticle.set(true);
- }
- }
-
- /**
- * Only leader use this interface. Get the Region allocation particle.
- *
- * @param type SchemaRegion or DataRegion
- */
- public boolean getRegionAllocationParticle(TConsensusGroupType type) {
- switch (type) {
- case SchemaRegion:
- return schemaRegionParticle.get();
- case DataRegion:
- return dataRegionParticle.get();
- default:
- return false;
- }
- }
-
public int getSlotsCount() {
return seriesPartitionSlotsCount.get();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 5c1efe7dce..f9b0e1c7a8 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -79,9 +79,13 @@ public class DataPartitionTable {
.getDataPartition(timePartitionSlots, seriesPartitionTable)) {
result.set(false);
}
- dataPartitionTable
- .getDataPartitionMap()
- .put(seriesPartitionSlot, seriesPartitionTable);
+
+ if (!seriesPartitionTable.getSeriesPartitionMap().isEmpty()) {
+ // Only return those non-empty DataPartitions
+ dataPartitionTable
+ .getDataPartitionMap()
+ .put(seriesPartitionSlot, seriesPartitionTable);
+ }
} else {
result.set(false);
}