You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/08/25 08:49:51 UTC

[iotdb] branch IOTDB-3455 updated: finish multiple data region

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3455
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-3455 by this push:
     new 1739d035ab finish multiple data region
1739d035ab is described below

commit 1739d035abf760d4ed69b197c48ab466ca40ced4
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Thu Aug 25 16:49:42 2022 +0800

    finish multiple data region
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../iotdb/db/localconfignode/LocalConfigNode.java  |  22 ++--
 .../db/localconfignode/LocalDataPartitionInfo.java |  18 +--
 .../localconfignode/LocalDataPartitionTable.java   | 128 +++------------------
 .../plan/scheduler/StandaloneSchedulerTest.java    |   9 +-
 5 files changed, 32 insertions(+), 147 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 379ba1fe6c..a7bb686b98 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -813,7 +813,7 @@ public class IoTDBConfig {
   private int ioTaskQueueSizeForFlushing = 10;
 
   /** the number of data regions per user-defined storage group */
-  private int dataRegionNum = 1;
+  private int dataRegionNum = 3;
 
   /** the interval to log recover progress of each vsg when starting iotdb */
   private long recoveryLogIntervalInMs = 5_000L;
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
index fb02237665..d533abae61 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalConfigNode.java
@@ -857,12 +857,10 @@ public class LocalConfigNode {
    * root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
    * thrown.
    */
-  public DataRegionId getBelongedDataRegionId(
-      PartialPath path, TTimePartitionSlot timePartitionSlot)
+  public DataRegionId getBelongedDataRegionId(PartialPath path)
       throws MetadataException, DataRegionException {
     PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
-    DataRegionId dataRegionId =
-        dataPartitionTable.getDataRegionId(storageGroup, path, timePartitionSlot);
+    DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
     if (dataRegionId == null) {
       return null;
     }
@@ -877,16 +875,13 @@ public class LocalConfigNode {
   }
 
   // This interface involves storage group and data region auto creation
-  public DataRegionId getBelongedDataRegionIdWithAutoCreate(
-      PartialPath path, TTimePartitionSlot timePartitionSlot)
+  public DataRegionId getBelongedDataRegionIdWithAutoCreate(PartialPath devicePath)
       throws MetadataException, DataRegionException {
-    PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
-    DataRegionId dataRegionId =
-        dataPartitionTable.getDataRegionId(storageGroup, path, timePartitionSlot);
+    PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(devicePath);
+    DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, devicePath);
     if (dataRegionId == null) {
       dataPartitionTable.registerStorageGroup(storageGroup);
-      dataRegionId =
-          dataPartitionTable.allocateDataRegionForNewSlot(storageGroup, path, timePartitionSlot);
+      dataRegionId = dataPartitionTable.allocateDataRegionForNewSlot(storageGroup, devicePath);
     }
     DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
     if (dataRegion == null) {
@@ -990,8 +985,7 @@ public class LocalConfigNode {
           continue;
         }
         for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
-          DataRegionId dataRegionId =
-              getBelongedDataRegionId(new PartialPath(deviceId), timePartitionSlot);
+          DataRegionId dataRegionId = getBelongedDataRegionId(new PartialPath(deviceId));
           // dataRegionId is null means the DataRegion is not created,
           // use an empty dataPartitionMap to init DataPartition
           if (dataRegionId != null) {
@@ -1052,7 +1046,7 @@ public class LocalConfigNode {
             dataPartitionQueryParam.getTimePartitionSlotList();
         for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
           DataRegionId dataRegionId =
-              getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId), timePartitionSlot);
+              getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
           Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionToRegionsMap =
               deviceToRegionsMap.getOrDefault(
                   executor.getSeriesPartitionSlot(deviceId), new HashMap<>());
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
index d267938080..04305c4b32 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionInfo.java
@@ -19,14 +19,12 @@
 
 package org.apache.iotdb.db.localconfignode;
 
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -62,18 +60,12 @@ public class LocalDataPartitionInfo {
     }
   }
 
-  public DataRegionId getDataRegionId(
-      PartialPath storageGroup, PartialPath path, TTimePartitionSlot timePartitionSlot) {
+  public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath path) {
     if (!partitionTableMap.containsKey(storageGroup)) {
       return null;
     }
     LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
-    Map<TTimePartitionSlot, DataRegionId> slotRegionMap = new HashMap<>();
-    if (!table.getDataRegionId(path, Collections.singletonList(timePartitionSlot), slotRegionMap)) {
-      return null;
-    } else {
-      return slotRegionMap.get(timePartitionSlot);
-    }
+    return table.getDataRegionId(path);
   }
 
   /**
@@ -82,13 +74,11 @@ public class LocalDataPartitionInfo {
    *
    * @param storageGroup The path for the storage group.
    * @param path The full path for the series.
-   * @param timePartitionSlot The time partition slot to allocate.
    * @return The data region id for the time partition slot.
    */
-  public DataRegionId allocateDataRegionForNewSlot(
-      PartialPath storageGroup, PartialPath path, TTimePartitionSlot timePartitionSlot) {
+  public DataRegionId allocateDataRegionForNewSlot(PartialPath storageGroup, PartialPath path) {
     LocalDataPartitionTable table = partitionTableMap.get(storageGroup);
-    return table.getDataRegionWithAutoExtension(path, timePartitionSlot);
+    return table.getDataRegionWithAutoExtension(path);
   }
 
   public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
diff --git a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
index 3a8af9c407..75c8f6c636 100644
--- a/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/localconfignode/LocalDataPartitionTable.java
@@ -19,44 +19,28 @@
 
 package org.apache.iotdb.db.localconfignode;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.partition.SeriesPartitionTable;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class LocalDataPartitionTable {
   private static final Logger LOG = LoggerFactory.getLogger(LocalDataPartitionTable.class);
 
   private String storageGroupName;
-  private Map<PartialPath, SeriesPartitionTable> partitionTableMap;
-  private List<DataRegionId> regionList = new ArrayList<>();
-  private Map<DataRegionId, AtomicInteger> regionSlotCountMap = new ConcurrentHashMap<>();
-
-  public LocalDataPartitionTable() {}
+  private final int regionNum = IoTDBDescriptor.getInstance().getConfig().getDataRegionNum();
+  private DataRegionId[] regionIds;
 
   public LocalDataPartitionTable(String storageGroupName) {
     this.storageGroupName = storageGroupName;
-    this.partitionTableMap = new ConcurrentHashMap<>();
+    this.regionIds = new DataRegionId[regionNum];
   }
 
   public void init(ByteBuffer buffer) {
@@ -68,30 +52,14 @@ public class LocalDataPartitionTable {
   }
 
   /**
-   * Get the TimePartitionSlot to DataRegionId Map. The result is stored in param slotRegionMap.
+   * Get the data region id which the path located in.
    *
    * @param path The full path for the series.
-   * @param timePartitionSlots The time partition slots for the series.
-   * @param slotRegionMap The map that store the result.
-   * @return Whether all the partitions exist.
+   * @return The region id for the path.
    */
-  public boolean getDataRegionId(
-      PartialPath path,
-      List<TTimePartitionSlot> timePartitionSlots,
-      Map<TTimePartitionSlot, DataRegionId> slotRegionMap) {
-    if (!partitionTableMap.containsKey(path)) {
-      return false;
-    }
-    SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
-    boolean allPartitionExists =
-        partitionTableMap.get(path).getDataPartition(timePartitionSlots, seriesPartitionTable);
-    for (Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> entry :
-        seriesPartitionTable.getSeriesPartitionMap().entrySet()) {
-      if (entry.getValue().size() > 0) {
-        slotRegionMap.put(entry.getKey(), new DataRegionId(entry.getValue().get(0).getId()));
-      }
-    }
-    return allPartitionExists;
+  public DataRegionId getDataRegionId(PartialPath path) {
+    int idx = Math.abs(path.hashCode()) % regionNum;
+    return regionIds[idx];
   }
 
   /**
@@ -100,82 +68,20 @@ public class LocalDataPartitionTable {
    * @return data region id in list
    */
   public List<DataRegionId> getAllDataRegionId() {
-    Set<DataRegionId> regionIdSet = new HashSet<>();
-    for (SeriesPartitionTable partitionTable : partitionTableMap.values()) {
-      Map<TTimePartitionSlot, List<TConsensusGroupId>> partitionGroupIdMap =
-          partitionTable.getSeriesPartitionMap();
-      for (List<TConsensusGroupId> ids : partitionGroupIdMap.values()) {
-        ids.forEach(x -> regionIdSet.add(new DataRegionId(x.getId())));
-      }
-    }
-    return new ArrayList<>(regionIdSet);
-  }
-
-  public int getTimeSlotNum() {
-    int slotCount = 0;
-    for (SeriesPartitionTable partitionTable : partitionTableMap.values()) {
-      Map<TTimePartitionSlot, List<TConsensusGroupId>> partitionGroupIdMap =
-          partitionTable.getSeriesPartitionMap();
-      slotCount += partitionGroupIdMap.size();
-    }
-    return slotCount;
-  }
-
-  public DataRegionId getDataRegionWithAutoExtension(
-      PartialPath path, TTimePartitionSlot timePartitionSlot) {
-    DataRegionId regionId;
-    if ((regionId = checkExpansion(path, timePartitionSlot)) != null) {
-      // return the newly allocated region
-      regionSlotCountMap.get(regionId).incrementAndGet();
-      return regionId;
-    }
-
-    // select a region with min time partition slot
-    List<Pair<DataRegionId, AtomicInteger>> slotCountList = new ArrayList<>();
-    regionSlotCountMap.forEach((id, count) -> slotCountList.add(new Pair<>(id, count)));
-    slotCountList.sort(Comparator.comparingInt(o -> o.right.get()));
-    DataRegionId chosenId = slotCountList.get(0).left;
-    regionSlotCountMap.get(chosenId).incrementAndGet();
-    return chosenId;
-  }
-
-  public DataRegionId checkExpansion(PartialPath path, TTimePartitionSlot timePartitionSlot) {
-    if (regionList.size() == 0) {
-      // there is no data region for this storage group, create one
-      return carryOutExpansion(path, timePartitionSlot);
-    }
-
-    // check the load and carry out the expansion if necessary
-    double allocatedSlotNum = getTimeSlotNum();
-    double maxRegionNum = IoTDBDescriptor.getInstance().getConfig().getDataRegionNum();
-    double maxSlotNum = IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum();
-    if (regionList.size() < maxRegionNum
-        && allocatedSlotNum / (double) regionList.size() > maxSlotNum / maxRegionNum) {
-      return carryOutExpansion(path, timePartitionSlot);
-    }
-    return null;
+    return Arrays.asList(regionIds);
   }
 
-  private DataRegionId carryOutExpansion(PartialPath path, TTimePartitionSlot timePartitionSlot) {
-    if (!partitionTableMap.containsKey(path)) {
-      partitionTableMap.put(path, new SeriesPartitionTable());
+  public DataRegionId getDataRegionWithAutoExtension(PartialPath path) {
+    int idx = Math.abs(path.hashCode()) % regionNum;
+    if (regionIds[idx] == null) {
+      int nextId = DataRegionIdGenerator.getInstance().getNextId();
+      regionIds[idx] = new DataRegionId(nextId);
     }
-    SeriesPartitionTable seriesPartitionTable = partitionTableMap.get(path);
-    Map<TTimePartitionSlot, List<TConsensusGroupId>> allotmentMap = new HashMap<>();
-    int nextRegionId = DataRegionIdGenerator.getInstance().getNextId();
-    allotmentMap.put(
-        timePartitionSlot,
-        Collections.singletonList(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, nextRegionId)));
-    SeriesPartitionTable requestTable = new SeriesPartitionTable(allotmentMap);
-    Map<TConsensusGroupId, AtomicInteger> deltaMap = new HashMap<>();
-    seriesPartitionTable.createDataPartition(requestTable, deltaMap);
-    regionList.add(new DataRegionId(nextRegionId));
-    regionSlotCountMap.put(new DataRegionId(nextRegionId), new AtomicInteger(0));
-    return new DataRegionId(nextRegionId);
+    return regionIds[idx];
   }
 
   public void clear() {
     // TODO: clear the table
+    regionIds = null;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index 3afef548f5..34e2a9e85f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -72,8 +71,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import static org.apache.iotdb.db.localconfignode.LocalConfigNode.STANDALONE_MOCK_TIME_SLOT_START_TIME;
-
 public class StandaloneSchedulerTest {
   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
 
@@ -409,8 +406,7 @@ public class StandaloneSchedulerTest {
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath(deviceId));
-    configNode.getBelongedDataRegionIdWithAutoCreate(
-        new PartialPath(deviceId), new TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME));
+    configNode.getBelongedDataRegionIdWithAutoCreate(new PartialPath(deviceId));
     MPPQueryContext context =
         new MPPQueryContext(
             "",
@@ -491,8 +487,7 @@ public class StandaloneSchedulerTest {
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     configNode.getBelongedSchemaRegionIdWithAutoCreate(deviceId);
-    configNode.getBelongedDataRegionIdWithAutoCreate(
-        deviceId, new TTimePartitionSlot(STANDALONE_MOCK_TIME_SLOT_START_TIME));
+    configNode.getBelongedDataRegionIdWithAutoCreate(deviceId);
     MPPQueryContext context =
         new MPPQueryContext(
             "",