You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/07/07 00:55:46 UTC

[iotdb] branch master updated: [IOTDB-3637] Optimize getOrCreatePartition process (#6596)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f7464253fb [IOTDB-3637] Optimize getOrCreatePartition process (#6596)
f7464253fb is described below

commit f7464253fbcee139d60675f01769ca1c0f4a2b41
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Thu Jul 7 08:55:40 2022 +0800

    [IOTDB-3637] Optimize getOrCreatePartition process (#6596)
---
 .../iotdb/confignode/manager/PartitionManager.java | 343 +++++++--------------
 .../persistence/partition/PartitionInfo.java       |  31 --
 .../partition/StorageGroupPartitionTable.java      |  54 ----
 .../commons/partition/DataPartitionTable.java      |  10 +-
 4 files changed, 126 insertions(+), 312 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 557d9527bc..f2ac6fc467 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -47,10 +47,8 @@ import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
 import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
 import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
-import org.apache.iotdb.confignode.exception.TimeoutException;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -59,11 +57,10 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -145,40 +142,42 @@ public class PartitionManager {
       return resp;
     }
 
-    // Otherwise, fist ensure that each StorageGroup has at least one SchemaRegion.
-    // This block of code is still parallel and concurrent safe.
-    // Thus, we can prepare the SchemaRegions with maximum efficiency.
-    TSStatus status =
-        initializeRegionsIfNecessary(
-            new ArrayList<>(req.getPartitionSlotsMap().keySet()), TConsensusGroupType.SchemaRegion);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      resp.setStatus(status);
-      return resp;
-    }
-
-    // Next, we serialize the creation process of SchemaPartitions to
+    // We serialize the creation process of SchemaPartitions to
     // ensure that each SchemaPartition is created by a unique CreateSchemaPartitionReq.
-    // Because the number of SchemaPartitions per storage group is limited by the number of
-    // SeriesPartitionSlots,
+    // Because the number of SchemaPartitions per storage group is limited
+    // by the number of SeriesPartitionSlots,
     // the number of serialized CreateSchemaPartitionReqs is acceptable.
     synchronized (this) {
       // Filter unassigned SchemaPartitionSlots
-      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlots =
+      Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap =
           partitionInfo.filterUnassignedSchemaPartitionSlots(req.getPartitionSlotsMap());
-      if (unassignedSchemaPartitionSlots.size() > 0) {
-        // Allocate SchemaPartitions
-        Map<String, SchemaPartitionTable> assignedSchemaPartition =
-            getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlots);
-        // Cache allocating result
-        CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
-        createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
-        getConsensusManager().write(createPlan);
+
+      // Here we ensure that each StorageGroup has at least one SchemaRegion.
+      // And if some StorageGroups own too many slots, extend SchemaRegion for them.
+
+      // Map<StorageGroup, unassigned SeriesPartitionSlot count>
+      Map<String, Integer> unassignedSchemaPartitionSlotsCountMap = new ConcurrentHashMap<>();
+      unassignedSchemaPartitionSlotsMap.forEach(
+          (storageGroup, unassignedSchemaPartitionSlots) ->
+              unassignedSchemaPartitionSlotsCountMap.put(
+                  storageGroup, unassignedSchemaPartitionSlots.size()));
+      TSStatus status =
+          extendRegionsIfNecessary(
+              unassignedSchemaPartitionSlotsCountMap, TConsensusGroupType.SchemaRegion);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // Return an error code if Region extension failed
+        resp.setStatus(status);
+        return resp;
       }
-    }
 
-    // Finally, if some StorageGroups own too many slots, extend SchemaRegion for them.
-    extendRegionsIfNecessary(
-        new ArrayList<>(req.getPartitionSlotsMap().keySet()), TConsensusGroupType.SchemaRegion);
+      // Allocate SchemaPartitions
+      Map<String, SchemaPartitionTable> assignedSchemaPartition =
+          getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+      // Cache allocating result
+      CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
+      createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
+      getConsensusManager().write(createPlan);
+    }
 
     return getSchemaPartition(req);
   }
@@ -194,48 +193,50 @@ public class PartitionManager {
    *     StorageGroup doesn't exist.
    */
   public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) {
-    // After all the SchemaPartitions are allocated,
-    // all the read requests about SchemaPartitionTable are parallel.
+    // After all the DataPartitions are allocated,
+    // all the read requests about DataPartitionTable are parallel.
     DataPartitionResp resp = (DataPartitionResp) getDataPartition(req);
     if (resp.isAllPartitionsExist()) {
       return resp;
     }
 
-    // Otherwise, fist ensure that each StorageGroup has at least one DataRegion.
-    // This block of code is still parallel and concurrent safe.
-    // Thus, we can prepare the DataRegions with maximum efficiency.
-    TSStatus status =
-        initializeRegionsIfNecessary(
-            new ArrayList<>(req.getPartitionSlotsMap().keySet()), TConsensusGroupType.DataRegion);
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      resp.setStatus(status);
-      return resp;
-    }
-
-    // Next, we serialize the creation process of DataPartitions to
+    // We serialize the creation process of DataPartitions to
     // ensure that each DataPartition is created by a unique CreateDataPartitionReq.
-    // Because the number of DataPartitions per storage group per day is limited by the number of
-    // SeriesPartitionSlots,
+    // Because the number of DataPartitions per storage group is limited
+    // by the number of SeriesPartitionSlots,
     // the number of serialized CreateDataPartitionReqs is acceptable.
     synchronized (this) {
       // Filter unassigned DataPartitionSlots
       Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
-          unassignedDataPartitionSlots =
+          unassignedDataPartitionSlotsMap =
               partitionInfo.filterUnassignedDataPartitionSlots(req.getPartitionSlotsMap());
-      if (unassignedDataPartitionSlots.size() > 0) {
-        // Allocate DataPartitions
-        Map<String, DataPartitionTable> assignedDataPartition =
-            getLoadManager().allocateDataPartition(unassignedDataPartitionSlots);
-        // Cache allocating result
-        CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
-        createPlan.setAssignedDataPartition(assignedDataPartition);
-        getConsensusManager().write(createPlan);
+
+      // Here we ensure that each StorageGroup has at least one DataRegion.
+      // And if some StorageGroups own too many slots, extend DataRegion for them.
+
+      // Map<StorageGroup, unassigned SeriesPartitionSlot count>
+      Map<String, Integer> unassignedDataPartitionSlotsCountMap = new ConcurrentHashMap<>();
+      unassignedDataPartitionSlotsMap.forEach(
+          (storageGroup, unassignedDataPartitionSlots) ->
+              unassignedDataPartitionSlotsCountMap.put(
+                  storageGroup, unassignedDataPartitionSlots.size()));
+      TSStatus status =
+          extendRegionsIfNecessary(
+              unassignedDataPartitionSlotsCountMap, TConsensusGroupType.DataRegion);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // Return an error code if Region extension failed
+        resp.setStatus(status);
+        return resp;
       }
-    }
 
-    // Finally, if some StorageGroups own too many slots, extend DataRegion for them.
-    extendRegionsIfNecessary(
-        new ArrayList<>(req.getPartitionSlotsMap().keySet()), TConsensusGroupType.DataRegion);
+      // Allocate DataPartitions
+      Map<String, DataPartitionTable> assignedDataPartition =
+          getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+      // Cache allocating result
+      CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+      createPlan.setAssignedDataPartition(assignedDataPartition);
+      getConsensusManager().write(createPlan);
+    }
 
     return getDataPartition(req);
   }
@@ -244,185 +245,79 @@ public class PartitionManager {
   // Leader scheduling interfaces
   // ======================================================
 
-  /** Handle the exceptions from initializeRegions */
-  private TSStatus initializeRegionsIfNecessary(
-      List<String> storageGroups, TConsensusGroupType consensusGroupType) {
-    try {
-      initializeRegions(storageGroups, consensusGroupType);
-    } catch (NotEnoughDataNodeException e) {
-      return new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode())
-          .setMessage(
-              "ConfigNode failed to allocate Partition because there are not enough DataNodes");
-    } catch (TimeoutException e) {
-      return new TSStatus(TSStatusCode.TIME_OUT.getStatusCode())
-          .setMessage(
-              "ConfigNode failed to allocate Partition because waiting for another thread's Region allocation timeout.");
-    } catch (StorageGroupNotExistsException e) {
-      return new TSStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode())
-          .setMessage(
-              "ConfigNode failed to allocate DataPartition because some StorageGroup doesn't exist.");
-    }
-    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-  }
-
   /**
-   * Initialize one Region for each StorageGroup who doesn't have any.
+   * Allocate more Regions to StorageGroups who have too many slots.
    *
-   * @param storageGroups List<StorageGroupName>
+   * @param unassignedPartitionSlotsCountMap Map<StorageGroup, unassigned Partition count>
    * @param consensusGroupType SchemaRegion or DataRegion
-   * @throws NotEnoughDataNodeException When the number of online DataNodes are too small to
-   *     allocate Regions
-   * @throws TimeoutException When waiting other threads to allocate Regions for too long
-   * @throws StorageGroupNotExistsException When some StorageGroups don't exist
+   * @return SUCCESS_STATUS when Region extension successful; NOT_ENOUGH_DATA_NODE when there are
+   *     not enough DataNodes; STORAGE_GROUP_NOT_EXIST when some StorageGroups don't exist
    */
-  private void initializeRegions(List<String> storageGroups, TConsensusGroupType consensusGroupType)
-      throws NotEnoughDataNodeException, TimeoutException, StorageGroupNotExistsException {
-
-    int leastDataNode = 0;
-    Map<String, Integer> unreadyStorageGroupMap = new HashMap<>();
-    for (String storageGroup : storageGroups) {
-      if (getRegionCount(storageGroup, consensusGroupType) == 0) {
-        // Update leastDataNode
-        TStorageGroupSchema storageGroupSchema =
-            getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
-        switch (consensusGroupType) {
-          case SchemaRegion:
-            leastDataNode =
-                Math.max(leastDataNode, storageGroupSchema.getSchemaReplicationFactor());
-            break;
-          case DataRegion:
-            leastDataNode = Math.max(leastDataNode, storageGroupSchema.getDataReplicationFactor());
+  private TSStatus extendRegionsIfNecessary(
+      Map<String, Integer> unassignedPartitionSlotsCountMap,
+      TConsensusGroupType consensusGroupType) {
+    TSStatus result = new TSStatus();
+
+    try {
+      // Map<StorageGroup, Region allotment>
+      Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
+
+      for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
+        float allocatedRegionCount =
+            partitionInfo.getRegionCount(entry.getKey(), consensusGroupType);
+        // The slotCount equals to the sum of assigned slot count and unassigned slot count
+        float slotCount = partitionInfo.getSlotCount(entry.getKey()) + entry.getValue();
+        float maxRegionCount =
+            getClusterSchemaManager().getMaxRegionGroupCount(entry.getKey(), consensusGroupType);
+        float maxSlotCount =
+            ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
+
+        /* Region extension is required in the following two cases  */
+        // 1. There are no Region has been created for the current StorageGroup
+        if (allocatedRegionCount == 0) {
+          // The delta is equal to the smallest integer solution that satisfies the inequality:
+          // slotCount / delta < maxSlotCount / maxRegionCount
+          int delta =
+              Math.min(
+                  (int) maxRegionCount,
+                  Math.max(1, (int) Math.ceil(slotCount * maxRegionCount / maxSlotCount)));
+          allotmentMap.put(entry.getKey(), delta);
+          continue;
         }
 
-        // Recording StorageGroups without Region
-        unreadyStorageGroupMap.put(storageGroup, 1);
+        // 2. The average number of partitions held by each Region is greater than the expected
+        // average
+        //    when the partition allocation is complete
+        if (allocatedRegionCount < maxRegionCount
+            && slotCount / allocatedRegionCount > maxSlotCount / maxRegionCount) {
+          // The delta is equal to the smallest integer solution that satisfies the inequality:
+          // slotCount / (allocatedRegionCount + delta) < maxSlotCount / maxRegionCount
+          int delta =
+              Math.min(
+                  (int) (maxRegionCount - allocatedRegionCount),
+                  Math.max(
+                      1,
+                      (int)
+                          Math.ceil(
+                              slotCount * maxRegionCount / maxSlotCount - allocatedRegionCount)));
+          allotmentMap.put(entry.getKey(), delta);
+        }
       }
-    }
-    if (getNodeManager().getRegisteredDataNodeCount() < leastDataNode) {
-      // Make sure DataNodes enough
-      throw new NotEnoughDataNodeException();
-    }
 
-    doOrWaitRegionCreation(unreadyStorageGroupMap, consensusGroupType);
-  }
+      // TODO: Use procedure to protect the following process
+      // Do Region allocation and creation for StorageGroups based on the allotment
+      getLoadManager().doRegionCreation(allotmentMap, consensusGroupType);
 
-  /** Handle the exceptions from extendRegions */
-  private void extendRegionsIfNecessary(
-      List<String> storageGroups, TConsensusGroupType consensusGroupType) {
-    try {
-      extendRegions(storageGroups, consensusGroupType);
+      result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (NotEnoughDataNodeException e) {
       LOGGER.error("ConfigNode failed to extend Region because there are not enough DataNodes");
-    } catch (TimeoutException e) {
-      LOGGER.error(
-          "ConfigNode failed to extend Region because waiting for another thread's Region allocation timeout.");
+      result.setCode(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
     } catch (StorageGroupNotExistsException e) {
       LOGGER.error("ConfigNode failed to extend Region because some StorageGroup doesn't exist.");
+      result.setCode(TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode());
     }
-  }
-
-  /**
-   * Allocate more Regions to StorageGroups who have too many slots.
-   *
-   * @param storageGroups List<StorageGroupName>
-   * @param consensusGroupType SchemaRegion or DataRegion
-   * @throws StorageGroupNotExistsException When some StorageGroups don't exist
-   * @throws NotEnoughDataNodeException When the number of online DataNodes are too small to
-   *     allocate Regions
-   * @throws TimeoutException When waiting other threads to allocate Regions for too long
-   */
-  private void extendRegions(List<String> storageGroups, TConsensusGroupType consensusGroupType)
-      throws StorageGroupNotExistsException, NotEnoughDataNodeException, TimeoutException {
-    // Map<StorageGroup, Region allotment>
-    Map<String, Integer> filledStorageGroupMap = new HashMap<>();
-    for (String storageGroup : storageGroups) {
-      float regionCount = partitionInfo.getRegionCount(storageGroup, consensusGroupType);
-      float slotCount = partitionInfo.getSlotCount(storageGroup);
-      float maxRegionCount =
-          getClusterSchemaManager().getMaxRegionGroupCount(storageGroup, consensusGroupType);
-      float maxSlotCount = ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
-
-      // Need extension
-      if (regionCount < maxRegionCount && slotCount / regionCount > maxSlotCount / maxRegionCount) {
-        // The delta is equal to the smallest integer solution that satisfies the inequality:
-        // slotCount / (regionCount + delta) < maxSlotCount / maxRegionCount
-        int delta =
-            Math.min(
-                (int) (maxRegionCount - regionCount),
-                Math.max(
-                    1, (int) Math.ceil(slotCount * maxRegionCount / maxSlotCount - regionCount)));
-        filledStorageGroupMap.put(storageGroup, delta);
-      }
-    }
-
-    doOrWaitRegionCreation(filledStorageGroupMap, consensusGroupType);
-  }
-
-  /**
-   * Do Region creation for those StorageGroups who get the allocation particle, for those who
-   * doesn't, waiting until other threads finished the creation process.
-   *
-   * @param allotmentMap Map<StorageGroup, Region allotment>
-   * @param consensusGroupType SchemaRegion or DataRegion
-   * @throws NotEnoughDataNodeException When the number of online DataNodes are too small to *
-   *     allocate Regions
-   * @throws StorageGroupNotExistsException When some StorageGroups don't exist
-   * @throws TimeoutException When waiting other threads to allocate Regions for too long
-   */
-  private void doOrWaitRegionCreation(
-      Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
-      throws NotEnoughDataNodeException, StorageGroupNotExistsException, TimeoutException {
-    // StorageGroups who get the allocation particle
-    Map<String, Integer> allocateMap = new HashMap<>();
-    // StorageGroups who doesn't get the allocation particle
-    List<String> waitingList = new ArrayList<>();
-    for (String storageGroup : allotmentMap.keySet()) {
-      // Try to get the allocation particle
-      if (partitionInfo.contendRegionAllocationParticle(storageGroup, consensusGroupType)) {
-        // Initialize one Region
-        allocateMap.put(storageGroup, allotmentMap.get(storageGroup));
-      } else {
-        waitingList.add(storageGroup);
-      }
-    }
-
-    // TODO: Use procedure to protect the following process
-    // Do Region allocation and creation for those StorageGroups who get the particle
-    getLoadManager().doRegionCreation(allocateMap, consensusGroupType);
-    // Put back particles after that
-    for (String storageGroup : allocateMap.keySet()) {
-      partitionInfo.putBackRegionAllocationParticle(storageGroup, consensusGroupType);
-    }
-
-    // Waiting Region creation for those StorageGroups who don't get the particle
-    waitRegionCreation(waitingList, consensusGroupType);
-  }
 
-  /** Waiting Region creation for those StorageGroups who don't get the particle */
-  private void waitRegionCreation(List<String> waitingList, TConsensusGroupType consensusGroupType)
-      throws TimeoutException {
-    for (int retry = 0; retry < 100; retry++) {
-      boolean allocationFinished = true;
-      for (String storageGroup : waitingList) {
-        if (!partitionInfo.getRegionAllocationParticle(storageGroup, consensusGroupType)) {
-          // If a StorageGroup's Region allocation particle doesn't return,
-          // the Region creation process is not complete
-          allocationFinished = false;
-          break;
-        }
-      }
-      if (allocationFinished) {
-        return;
-      }
-
-      try {
-        // Sleep 200ms to wait Region allocation
-        TimeUnit.MILLISECONDS.sleep(200);
-      } catch (InterruptedException e) {
-        LOGGER.warn("The PartitionManager is interrupted.", e);
-      }
-    }
-    throw new TimeoutException("");
+    return result;
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 52d3e8b120..90592cdbf2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -531,37 +531,6 @@ public class PartitionInfo implements SnapshotProcessor {
     return storageGroupPartitionTables.get(storageGroup).getSlotsCount();
   }
 
-  /**
-   * Only leader use this interface. Contending the Region allocation particle.
-   *
-   * @param storageGroup StorageGroupName
-   * @param type SchemaRegion or DataRegion
-   * @return True when successfully get the allocation particle, false otherwise
-   */
-  public boolean contendRegionAllocationParticle(String storageGroup, TConsensusGroupType type) {
-    return storageGroupPartitionTables.get(storageGroup).contendRegionAllocationParticle(type);
-  }
-
-  /**
-   * Only leader use this interface. Put back the Region allocation particle.
-   *
-   * @param storageGroup StorageGroupName
-   * @param type SchemaRegion or DataRegion
-   */
-  public void putBackRegionAllocationParticle(String storageGroup, TConsensusGroupType type) {
-    storageGroupPartitionTables.get(storageGroup).putBackRegionAllocationParticle(type);
-  }
-
-  /**
-   * Only leader use this interface. Get the Region allocation particle.
-   *
-   * @param storageGroup StorageGroupName
-   * @param type SchemaRegion or DataRegion
-   */
-  public boolean getRegionAllocationParticle(String storageGroup, TConsensusGroupType type) {
-    return storageGroupPartitionTables.get(storageGroup).getRegionAllocationParticle(type);
-  }
-
   /**
    * Get the DataNodes who contain the specific StorageGroup's Schema or Data
    *
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index e4a6ed4431..d584062ec6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -52,7 +52,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class StorageGroupPartitionTable {
@@ -65,10 +64,6 @@ public class StorageGroupPartitionTable {
   // determines whether a new Region needs to be created
   private final AtomicInteger seriesPartitionSlotsCount;
 
-  // Region allocation particle
-  private final AtomicBoolean schemaRegionParticle;
-  private final AtomicBoolean dataRegionParticle;
-
   // Region
   private final Map<TConsensusGroupId, RegionGroup> regionGroupMap;
   // SchemaPartition
@@ -80,8 +75,6 @@ public class StorageGroupPartitionTable {
     this.storageGroupName = storageGroupName;
     this.seriesPartitionSlotsCount = new AtomicInteger(0);
 
-    this.schemaRegionParticle = new AtomicBoolean(true);
-    this.dataRegionParticle = new AtomicBoolean(true);
     this.regionGroupMap = new ConcurrentHashMap<>();
 
     this.schemaPartitionTable = new SchemaPartitionTable();
@@ -206,53 +199,6 @@ public class StorageGroupPartitionTable {
     return result.getAndIncrement();
   }
 
-  /**
-   * Only leader use this interface. Contending the Region allocation particle.
-   *
-   * @param type SchemaRegion or DataRegion
-   * @return True when successfully get the allocation particle, false otherwise
-   */
-  public boolean contendRegionAllocationParticle(TConsensusGroupType type) {
-    switch (type) {
-      case SchemaRegion:
-        return schemaRegionParticle.getAndSet(false);
-      case DataRegion:
-        return dataRegionParticle.getAndSet(false);
-      default:
-        return false;
-    }
-  }
-
-  /**
-   * Only leader use this interface. Put back the Region allocation particle.
-   *
-   * @param type SchemaRegion or DataRegion
-   */
-  public void putBackRegionAllocationParticle(TConsensusGroupType type) {
-    switch (type) {
-      case SchemaRegion:
-        schemaRegionParticle.set(true);
-      case DataRegion:
-        dataRegionParticle.set(true);
-    }
-  }
-
-  /**
-   * Only leader use this interface. Get the Region allocation particle.
-   *
-   * @param type SchemaRegion or DataRegion
-   */
-  public boolean getRegionAllocationParticle(TConsensusGroupType type) {
-    switch (type) {
-      case SchemaRegion:
-        return schemaRegionParticle.get();
-      case DataRegion:
-        return dataRegionParticle.get();
-      default:
-        return false;
-    }
-  }
-
   public int getSlotsCount() {
     return seriesPartitionSlotsCount.get();
   }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 5c1efe7dce..f9b0e1c7a8 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -79,9 +79,13 @@ public class DataPartitionTable {
                   .getDataPartition(timePartitionSlots, seriesPartitionTable)) {
                 result.set(false);
               }
-              dataPartitionTable
-                  .getDataPartitionMap()
-                  .put(seriesPartitionSlot, seriesPartitionTable);
+
+              if (!seriesPartitionTable.getSeriesPartitionMap().isEmpty()) {
+                // Only return those non-empty DataPartitions
+                dataPartitionTable
+                    .getDataPartitionMap()
+                    .put(seriesPartitionSlot, seriesPartitionTable);
+              }
             } else {
               result.set(false);
             }