You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/11 01:19:55 UTC
[iotdb] branch master updated: [IOTDB-4058] DataPartition inheritance policy (#6931)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4c9fa69917 [IOTDB-4058] DataPartition inheritance policy (#6931)
4c9fa69917 is described below
commit 4c9fa69917020ab5ce1d602fff4d0fc375058cfa
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Thu Aug 11 09:19:50 2022 +0800
[IOTDB-4058] DataPartition inheritance policy (#6931)
---
.../resources/conf/iotdb-confignode.properties | 4 +-
.../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +-
.../iotdb/confignode/manager/PartitionManager.java | 33 ++-
.../partition/GreedyPartitionAllocator.java | 25 +-
.../persistence/partition/PartitionInfo.java | 40 +--
.../partition/StorageGroupPartitionTable.java | 16 ++
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 284 +------------------
.../java/org/apache/iotdb/it/env/MppConfig.java | 7 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +
.../db/it/IoTDBClusterPartitionTableTest.java | 308 +++++++++++++++++++++
.../commons/partition/DataPartitionTable.java | 21 ++
.../commons/partition/SeriesPartitionTable.java | 22 ++
12 files changed, 459 insertions(+), 311 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index db7131d023..19f5417af3 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -129,9 +129,9 @@ target_config_nodes=0.0.0.0:22277
# default_ttl=36000000
-# Time partition interval in seconds
+# Time partition interval in seconds, default is equal to one day
# Datatype: long
-# time_partition_interval=604800
+# time_partition_interval=86400
# Default number of SchemaRegion replicas
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 65ae278a1a..42f929b997 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -105,7 +105,7 @@ public class ConfigNodeConfig {
IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.TMP_FOLDER_NAME;
/** Time partition interval in seconds */
- private long timePartitionInterval = 604800;
+ private long timePartitionInterval = 86400;
/** Default number of SchemaRegion replicas */
private int schemaReplicationFactor = 1;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 85702318fc..5aeb9885e3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -65,6 +65,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** The PartitionManager Manages cluster PartitionTable read and write requests. */
@@ -216,9 +217,16 @@ public class PartitionManager {
// Map<StorageGroup, unassigned SeriesPartitionSlot count>
Map<String, Integer> unassignedDataPartitionSlotsCountMap = new ConcurrentHashMap<>();
unassignedDataPartitionSlotsMap.forEach(
- (storageGroup, unassignedDataPartitionSlots) ->
- unassignedDataPartitionSlotsCountMap.put(
- storageGroup, unassignedDataPartitionSlots.size()));
+ (storageGroup, unassignedDataPartitionSlots) -> {
+ AtomicInteger unassignedDataPartitionSlotsCount = new AtomicInteger(0);
+ unassignedDataPartitionSlots
+ .values()
+ .forEach(
+ timePartitionSlots ->
+ unassignedDataPartitionSlotsCount.getAndAdd(timePartitionSlots.size()));
+ unassignedDataPartitionSlotsCountMap.put(
+ storageGroup, unassignedDataPartitionSlotsCount.get());
+ });
TSStatus status =
extendRegionsIfNecessary(
unassignedDataPartitionSlotsCountMap, TConsensusGroupType.DataRegion);
@@ -320,6 +328,25 @@ public class PartitionManager {
return result;
}
+ /**
+ * Only leader use this interface. Checks whether the specified DataPartition has a predecessor
+ * and returns if it does
+ *
+ * @param storageGroup StorageGroupName
+ * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @param timePartitionInterval Time partition interval
+ * @return The specific DataPartition's predecessor if exists, null otherwise
+ */
+ public TConsensusGroupId getPrecededDataPartition(
+ String storageGroup,
+ TSeriesPartitionSlot seriesPartitionSlot,
+ TTimePartitionSlot timePartitionSlot,
+ long timePartitionInterval) {
+ return partitionInfo.getPrecededDataPartition(
+ storageGroup, seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
+ }
+
/**
* Get the DataNodes who contain the specific StorageGroup's Schema or Data
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
index f92c02694a..7feeeb2a80 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.PartitionManager;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -92,12 +93,24 @@ public class GreedyPartitionAllocator implements IPartitionAllocator {
Map<TTimePartitionSlot, List<TConsensusGroupId>> seriesPartitionMap =
new ConcurrentHashMap<>();
for (TTimePartitionSlot timePartitionSlot : seriesPartitionEntry.getValue()) {
- // Greedy allocation
- seriesPartitionMap.put(
- timePartitionSlot,
- Collections.singletonList(regionSlotsCounter.get(0).getRight()));
- // Bubble sort
- bubbleSort(regionSlotsCounter);
+ TConsensusGroupId predecessor =
+ getPartitionManager()
+ .getPrecededDataPartition(
+ storageGroup,
+ seriesPartitionEntry.getKey(),
+ timePartitionSlot,
+ ConfigNodeDescriptor.getInstance().getConf().getTimePartitionInterval());
+ if (predecessor != null) {
+ // For DataPartition allocation, we consider predecessor first
+ seriesPartitionMap.put(timePartitionSlot, Collections.singletonList(predecessor));
+ } else {
+ // Greedy allocation
+ seriesPartitionMap.put(
+ timePartitionSlot,
+ Collections.singletonList(regionSlotsCounter.get(0).getRight()));
+ // Bubble sort
+ bubbleSort(regionSlotsCounter);
+ }
}
dataPartitionMap.put(
seriesPartitionEntry.getKey(), new SeriesPartitionTable(seriesPartitionMap));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 22dae748e5..2e8414cadc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -120,7 +120,7 @@ public class PartitionInfo implements SnapshotProcessor {
Metric.STORAGE_GROUP.toString(),
MetricLevel.CORE,
storageGroupPartitionTables,
- o -> o.size(),
+ ConcurrentHashMap::size,
Tag.NAME.toString(),
"number");
MetricsService.getInstance()
@@ -349,6 +349,29 @@ public class PartitionInfo implements SnapshotProcessor {
dataPartition);
}
+ /**
+ * Checks whether the specified DataPartition has a predecessor and returns if it does
+ *
+ * @param storageGroup StorageGroupName
+ * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @param timePartitionInterval Time partition interval
+ * @return The specific DataPartition's predecessor if exists, null otherwise
+ */
+ public TConsensusGroupId getPrecededDataPartition(
+ String storageGroup,
+ TSeriesPartitionSlot seriesPartitionSlot,
+ TTimePartitionSlot timePartitionSlot,
+ long timePartitionInterval) {
+ if (storageGroupPartitionTables.containsKey(storageGroup)) {
+ return storageGroupPartitionTables
+ .get(storageGroup)
+ .getPrecededDataPartition(seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
+ } else {
+ return null;
+ }
+ }
+
private boolean isStorageGroupExisted(String storageGroup) {
final StorageGroupPartitionTable storageGroupPartitionTable =
storageGroupPartitionTables.get(storageGroup);
@@ -599,21 +622,6 @@ public class PartitionInfo implements SnapshotProcessor {
return storageGroupPartitionTables.get(storageGroup).getSortedRegionGroupSlotsCounter(type);
}
- /**
- * Get total region number
- *
- * @param type SchemaRegion or DataRegion
- * @return the number of SchemaRegion or DataRegion
- */
- public int getTotalRegionCount(TConsensusGroupType type) {
- Set<RegionGroup> regionGroups = new HashSet<>();
- for (Map.Entry<String, StorageGroupPartitionTable> entry :
- storageGroupPartitionTables.entrySet()) {
- regionGroups.addAll(entry.getValue().getRegionGroups(type));
- }
- return regionGroups.size();
- }
-
/**
* Update RegionGroup-related metric
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 3edab76a40..9b22112529 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -232,6 +232,22 @@ public class StorageGroupPartitionTable {
return dataPartitionTable.getDataPartition(partitionSlots, dataPartition);
}
+ /**
+ * Checks whether the specified DataPartition has a predecessor and returns if it does
+ *
+ * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @param timePartitionInterval Time partition interval
+ * @return The specific DataPartition's predecessor if exists, null otherwise
+ */
+ public TConsensusGroupId getPrecededDataPartition(
+ TSeriesPartitionSlot seriesPartitionSlot,
+ TTimePartitionSlot timePartitionSlot,
+ long timePartitionInterval) {
+ return dataPartitionTable.getPrecededDataPartition(
+ seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
+ }
+
/**
* Create SchemaPartition within the specific StorageGroup
*
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 8472d186eb..bce8f77cd4 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -18,16 +18,12 @@
*/
package org.apache.iotdb.confignode.service.thrift;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -45,14 +41,10 @@ import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetDataReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaReplicationFactorReq;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
@@ -75,9 +67,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -205,7 +195,7 @@ public class ConfigNodeRPCServiceProcessorTest {
Assert.assertEquals(Long.MAX_VALUE, storageGroupSchema.getTTL());
Assert.assertEquals(1, storageGroupSchema.getSchemaReplicationFactor());
Assert.assertEquals(1, storageGroupSchema.getDataReplicationFactor());
- Assert.assertEquals(604800, storageGroupSchema.getTimePartitionInterval());
+ Assert.assertEquals(86400, storageGroupSchema.getTimePartitionInterval());
storageGroupSchema = schemaMap.get(sg1);
Assert.assertNotNull(storageGroupSchema);
Assert.assertEquals(sg1, storageGroupSchema.getName());
@@ -259,278 +249,6 @@ public class ConfigNodeRPCServiceProcessorTest {
return ByteBuffer.wrap(baos.toByteArray());
}
- @Test
- public void testGetAndCreateSchemaPartition()
- throws TException, IOException, IllegalPathException {
- final String sg = "root.sg";
- final String sg0 = "root.sg0";
- final String sg1 = "root.sg1";
-
- final String d00 = sg0 + ".d0.s";
- final String d01 = sg0 + ".d1.s";
- final String d10 = sg1 + ".d0.s";
- final String d11 = sg1 + ".d1.s";
-
- final String allPaths = "root.**";
- final String allSg0 = "root.sg0.**";
- final String allSg1 = "root.sg1.**";
-
- TSStatus status;
- ByteBuffer buffer;
- TSchemaPartitionReq schemaPartitionReq;
- TSchemaPartitionResp schemaPartitionResp;
-
- Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap;
-
- // Set StorageGroups
- status = processor.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg0)));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg1)));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
-
- // Test getOrCreateSchemaPartition, the result should be NOT_ENOUGH_DATANODE
- buffer = generatePatternTreeBuffer(new String[] {d00, d01, allSg1});
- schemaPartitionReq = new TSchemaPartitionReq(buffer);
- schemaPartitionResp = processor.getOrCreateSchemaPartition(schemaPartitionReq);
- Assert.assertEquals(
- TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode(),
- schemaPartitionResp.getStatus().getCode());
- Assert.assertNull(schemaPartitionResp.getSchemaRegionMap());
-
- // register DataNodes
- registerDataNodes();
-
- // Test getSchemaPartition, the result should be empty
- buffer = generatePatternTreeBuffer(new String[] {d00, d01, allSg1});
- schemaPartitionReq = new TSchemaPartitionReq(buffer);
- schemaPartitionResp = processor.getSchemaPartition(schemaPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), schemaPartitionResp.getStatus().getCode());
- Assert.assertEquals(0, schemaPartitionResp.getSchemaRegionMapSize());
-
- // Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartitions and return
- buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
- schemaPartitionReq.setPathPatternTree(buffer);
- schemaPartitionResp = processor.getOrCreateSchemaPartition(schemaPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), schemaPartitionResp.getStatus().getCode());
- Assert.assertEquals(2, schemaPartitionResp.getSchemaRegionMapSize());
- schemaPartitionMap = schemaPartitionResp.getSchemaRegionMap();
- for (int i = 0; i < 2; i++) {
- Assert.assertTrue(schemaPartitionMap.containsKey(sg + i));
- Assert.assertEquals(2, schemaPartitionMap.get(sg + i).size());
- schemaPartitionMap
- .get(sg + i)
- .forEach(
- (tSeriesPartitionSlot, tRegionReplicaSet) -> {
- Assert.assertEquals(1, tRegionReplicaSet.getDataNodeLocationsSize());
- Assert.assertEquals(
- TConsensusGroupType.SchemaRegion, tRegionReplicaSet.getRegionId().getType());
- });
- }
-
- // Test getSchemaPartition, when a device path doesn't match any StorageGroup and including
- // "**",
- // ConfigNode will return all the SchemaPartitions
- buffer = generatePatternTreeBuffer(new String[] {allPaths});
- schemaPartitionReq.setPathPatternTree(buffer);
- schemaPartitionResp = processor.getSchemaPartition(schemaPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), schemaPartitionResp.getStatus().getCode());
- Assert.assertEquals(2, schemaPartitionResp.getSchemaRegionMapSize());
- schemaPartitionMap = schemaPartitionResp.getSchemaRegionMap();
- for (int i = 0; i < 2; i++) {
- Assert.assertTrue(schemaPartitionMap.containsKey(sg + i));
- Assert.assertEquals(2, schemaPartitionMap.get(sg + i).size());
- schemaPartitionMap
- .get(sg + i)
- .forEach(
- (tSeriesPartitionSlot, tRegionReplicaSet) -> {
- Assert.assertEquals(1, tRegionReplicaSet.getDataNodeLocationsSize());
- Assert.assertEquals(
- TConsensusGroupType.SchemaRegion, tRegionReplicaSet.getRegionId().getType());
- });
- }
-
- // Test getSchemaPartition, when a device path matches with a StorageGroup and end with "*",
- // ConfigNode will return all the SchemaPartitions in this StorageGroup
- buffer = generatePatternTreeBuffer(new String[] {allSg0, d11});
- schemaPartitionReq.setPathPatternTree(buffer);
- schemaPartitionResp = processor.getSchemaPartition(schemaPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), schemaPartitionResp.getStatus().getCode());
- Assert.assertEquals(2, schemaPartitionResp.getSchemaRegionMapSize());
- schemaPartitionMap = schemaPartitionResp.getSchemaRegionMap();
- // Check "root.sg0"
- Assert.assertTrue(schemaPartitionMap.containsKey(sg0));
- Assert.assertEquals(2, schemaPartitionMap.get(sg0).size());
- schemaPartitionMap
- .get(sg0)
- .forEach(
- (tSeriesPartitionSlot, tRegionReplicaSet) -> {
- Assert.assertEquals(1, tRegionReplicaSet.getDataNodeLocationsSize());
- Assert.assertEquals(
- TConsensusGroupType.SchemaRegion, tRegionReplicaSet.getRegionId().getType());
- });
- // Check "root.sg1"
- Assert.assertTrue(schemaPartitionMap.containsKey(sg1));
- Assert.assertEquals(1, schemaPartitionMap.get(sg1).size());
- schemaPartitionMap
- .get(sg1)
- .forEach(
- (tSeriesPartitionSlot, tRegionReplicaSet) -> {
- Assert.assertEquals(1, tRegionReplicaSet.getDataNodeLocationsSize());
- Assert.assertEquals(
- TConsensusGroupType.SchemaRegion, tRegionReplicaSet.getRegionId().getType());
- });
- }
-
- private Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- constructPartitionSlotsMap(
- int storageGroupNum, int seriesPartitionSlotNum, long timePartitionSlotNum) {
- final String sg = "root.sg";
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result = new HashMap<>();
-
- for (int i = 0; i < storageGroupNum; i++) {
- String storageGroup = sg + i;
- result.put(storageGroup, new HashMap<>());
- for (int j = 0; j < seriesPartitionSlotNum; j++) {
- TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
- result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
- for (long k = 0; k < timePartitionSlotNum; k++) {
- TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(k);
- result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
- }
- }
- }
-
- return result;
- }
-
- private void checkDataPartitionMap(
- int storageGroupNum,
- int seriesPartitionSlotNum,
- long timePartitionSlotNum,
- Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
- dataPartitionMap) {
- final String sg = "root.sg";
- Assert.assertEquals(storageGroupNum, dataPartitionMap.size());
- for (int i = 0; i < storageGroupNum; i++) {
- String storageGroup = sg + i;
- Assert.assertTrue(dataPartitionMap.containsKey(storageGroup));
- Assert.assertEquals(seriesPartitionSlotNum, dataPartitionMap.get(storageGroup).size());
- for (int j = 0; j < seriesPartitionSlotNum; j++) {
- TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
- Assert.assertTrue(dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot));
- Assert.assertEquals(
- timePartitionSlotNum,
- dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).size());
- for (long k = 0; k < timePartitionSlotNum; k++) {
- TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(k);
- Assert.assertTrue(
- dataPartitionMap
- .get(storageGroup)
- .get(seriesPartitionSlot)
- .containsKey(timePartitionSlot));
- // One RegionReplicaSet
- Assert.assertEquals(
- 1,
- dataPartitionMap
- .get(storageGroup)
- .get(seriesPartitionSlot)
- .get(timePartitionSlot)
- .size());
- // Is DataRegion
- Assert.assertEquals(
- TConsensusGroupType.DataRegion,
- dataPartitionMap
- .get(storageGroup)
- .get(seriesPartitionSlot)
- .get(timePartitionSlot)
- .get(0)
- .getRegionId()
- .getType());
- // Including one RegionReplica
- Assert.assertEquals(
- 1,
- dataPartitionMap
- .get(storageGroup)
- .get(seriesPartitionSlot)
- .get(timePartitionSlot)
- .get(0)
- .getDataNodeLocationsSize());
- }
- }
- }
- }
-
- @Test
- public void testGetAndCreateDataPartition() throws TException {
- final String sg = "root.sg";
- final int storageGroupNum = 2;
- final int seriesPartitionSlotNum = 4;
- final long timePartitionSlotNum = 6;
-
- TSStatus status;
- TDataPartitionReq dataPartitionReq;
- TDataPartitionResp dataPartitionResp;
-
- // Prepare partitionSlotsMap
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap0 =
- constructPartitionSlotsMap(storageGroupNum, seriesPartitionSlotNum, timePartitionSlotNum);
- Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap1 =
- constructPartitionSlotsMap(
- storageGroupNum * 2, seriesPartitionSlotNum * 2, timePartitionSlotNum * 2);
-
- // set StorageGroups
- for (int i = 0; i < storageGroupNum; i++) {
- TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
- status = processor.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- }
-
- // Test getOrCreateDataPartition, the result should be NOT_ENOUGH_DATANODE
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap0);
- dataPartitionResp = processor.getOrCreateDataPartition(dataPartitionReq);
- Assert.assertEquals(
- TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode(), dataPartitionResp.getStatus().getCode());
- Assert.assertNull(dataPartitionResp.getDataPartitionMap());
-
- // register DataNodes
- registerDataNodes();
-
- // Test getDataPartition, the result should be empty
- dataPartitionReq = new TDataPartitionReq(partitionSlotsMap0);
- dataPartitionResp = processor.getDataPartition(dataPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
- Assert.assertEquals(0, dataPartitionResp.getDataPartitionMapSize());
-
- // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
- dataPartitionResp = processor.getOrCreateDataPartition(dataPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
- checkDataPartitionMap(
- storageGroupNum,
- seriesPartitionSlotNum,
- timePartitionSlotNum,
- dataPartitionResp.getDataPartitionMap());
-
- // Test getDataPartition, the result should only contain DataPartition created before
- dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap1);
- dataPartitionResp = processor.getDataPartition(dataPartitionReq);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
- Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
- checkDataPartitionMap(
- storageGroupNum,
- seriesPartitionSlotNum,
- timePartitionSlotNum,
- dataPartitionResp.getDataPartitionMap());
- }
-
@Test
public void testDeleteStorageGroup() throws TException {
TSStatus status;
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index b1c57791b8..a91136de2b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -226,4 +226,11 @@ public class MppConfig implements BaseConfig {
"data_region_consensus_protocol_class", dataRegionConsensusProtocolClass);
return this;
}
+
+ @Override
+ public BaseConfig setTimePartitionInterval(long timePartitionInterval) {
+ confignodeProperties.setProperty(
+ "time_partition_interval", String.valueOf(timePartitionInterval));
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 36cdd9bc49..5cf5f09d6c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -218,4 +218,12 @@ public interface BaseConfig {
default String getDataRegionConsensusProtocolClass() {
return "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
}
+
+ default BaseConfig setTimePartitionInterval(long timePartitionInterval) {
+ return this;
+ }
+
+ default long getTimePartitionInterval() {
+ return 86400;
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java
new file mode 100644
index 0000000000..2ce0442427
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBClusterPartitionTableTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// TODO: @MiniSho Move this test into org.apache.iotdb.db.it.confignode package
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBClusterPartitionTableTest {
+
+ protected static String originalConfigNodeConsensusProtocolClass;
+ protected static String originalSchemaRegionConsensusProtocolClass;
+ protected static String originalDataRegionConsensusProtocolClass;
+
+ protected static long originalTimePartitionInterval;
+
+ private static final long testTimePartitionInterval = 86400;
+ private static final String sg = "root.sg";
+ private static final int storageGroupNum = 5;
+ private static final int seriesPartitionSlotsNum = 10;
+ private static final int timePartitionSlotsNum = 100;
+
+ @Before
+ public void setUp() throws Exception {
+ originalConfigNodeConsensusProtocolClass =
+ ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
+ originalSchemaRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
+ originalDataRegionConsensusProtocolClass =
+ ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
+ originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
+
+ ConfigFactory.getConfig()
+ .setConfigNodeConsesusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+ ConfigFactory.getConfig()
+ .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+ ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
+
+ EnvFactory.getEnv().initBeforeClass();
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+ ConfigFactory.getConfig()
+ .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+ ConfigFactory.getConfig()
+ .setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+ ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
+ }
+
+ /** Generate a PatternTree and serialize it into a ByteBuffer */
+ private ByteBuffer generatePatternTreeBuffer(String[] paths)
+ throws IllegalPathException, IOException {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (String path : paths) {
+ patternTree.appendPathPattern(new PartialPath(path));
+ }
+ patternTree.constructTree();
+
+ PublicBAOS baos = new PublicBAOS();
+ patternTree.serialize(baos);
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+
+ @Test
+ public void testGetAndCreateSchemaPartition()
+ throws TException, IOException, IllegalPathException {
+ final String sg = "root.sg";
+ final String sg0 = "root.sg0";
+ final String sg1 = "root.sg1";
+
+ final String d00 = sg0 + ".d0.s";
+ final String d01 = sg0 + ".d1.s";
+ final String d10 = sg1 + ".d0.s";
+ final String d11 = sg1 + ".d1.s";
+
+ final String allPaths = "root.**";
+ final String allSg0 = "root.sg0.**";
+ final String allSg1 = "root.sg1.**";
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ TSStatus status;
+ ByteBuffer buffer;
+ TSchemaPartitionReq schemaPartitionReq;
+ TSchemaPartitionTableResp schemaPartitionTableResp;
+ Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable;
+
+ // Set StorageGroups
+ status = client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg0)));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ status = client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg1)));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ // Test getSchemaPartition, the result should be empty
+ buffer = generatePatternTreeBuffer(new String[] {d00, d01, allSg1});
+ schemaPartitionReq = new TSchemaPartitionReq(buffer);
+ schemaPartitionTableResp = client.getSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ Assert.assertEquals(0, schemaPartitionTableResp.getSchemaPartitionTableSize());
+
+ // Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartitions and return
+ buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
+ schemaPartitionReq.setPathPatternTree(buffer);
+ schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ Assert.assertEquals(2, schemaPartitionTableResp.getSchemaPartitionTableSize());
+ schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable();
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(schemaPartitionTable.containsKey(sg + i));
+ Assert.assertEquals(2, schemaPartitionTable.get(sg + i).size());
+ }
+
+ // Test getSchemaPartition, when a device path doesn't match any StorageGroup and including
+ // "**", ConfigNode will return all the SchemaPartitions
+ buffer = generatePatternTreeBuffer(new String[] {allPaths});
+ schemaPartitionReq.setPathPatternTree(buffer);
+ schemaPartitionTableResp = client.getSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ Assert.assertEquals(2, schemaPartitionTableResp.getSchemaPartitionTableSize());
+ schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable();
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(schemaPartitionTable.containsKey(sg + i));
+ Assert.assertEquals(2, schemaPartitionTable.get(sg + i).size());
+ }
+
+ // Test getSchemaPartition, when a device path matches with a StorageGroup and end with "*",
+ // ConfigNode will return all the SchemaPartitions in this StorageGroup
+ buffer = generatePatternTreeBuffer(new String[] {allSg0, d11});
+ schemaPartitionReq.setPathPatternTree(buffer);
+ schemaPartitionTableResp = client.getSchemaPartitionTable(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+ Assert.assertEquals(2, schemaPartitionTableResp.getSchemaPartitionTableSize());
+ schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable();
+ // Check "root.sg0"
+ Assert.assertTrue(schemaPartitionTable.containsKey(sg0));
+ Assert.assertEquals(2, schemaPartitionTable.get(sg0).size());
+ // Check "root.sg1"
+ Assert.assertTrue(schemaPartitionTable.containsKey(sg1));
+ Assert.assertEquals(1, schemaPartitionTable.get(sg1).size());
+ }
+ }
+
+ private Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
+ constructPartitionSlotsMap() {
+ final String sg = "root.sg";
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result = new HashMap<>();
+
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = sg + i;
+ result.put(storageGroup, new HashMap<>());
+ for (int j = 0; j < seriesPartitionSlotsNum; j++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
+ result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+ for (long k = 0; k < timePartitionSlotsNum; k++) {
+ TTimePartitionSlot timePartitionSlot =
+ new TTimePartitionSlot(k * testTimePartitionInterval);
+ result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private void checkDataPartitionMap(
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>>
+ dataPartitionTable) {
+ Assert.assertEquals(storageGroupNum, dataPartitionTable.size());
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = sg + i;
+ Assert.assertTrue(dataPartitionTable.containsKey(storageGroup));
+ Assert.assertEquals(seriesPartitionSlotsNum, dataPartitionTable.get(storageGroup).size());
+ for (int j = 0; j < seriesPartitionSlotsNum; j++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
+ Assert.assertTrue(dataPartitionTable.get(storageGroup).containsKey(seriesPartitionSlot));
+ Assert.assertEquals(
+ timePartitionSlotsNum,
+ dataPartitionTable.get(storageGroup).get(seriesPartitionSlot).size());
+
+ Map<TTimePartitionSlot, List<TConsensusGroupId>> timePartitionSlotMap =
+ dataPartitionTable.get(storageGroup).get(seriesPartitionSlot);
+ for (long k = 0; k < timePartitionSlotsNum; k++) {
+ TTimePartitionSlot timePartitionSlot =
+ new TTimePartitionSlot(k * testTimePartitionInterval);
+ Assert.assertTrue(timePartitionSlotMap.containsKey(timePartitionSlot));
+ if (k > 0) {
+ // Check consistency
+ Assert.assertEquals(
+ timePartitionSlotMap.get(new TTimePartitionSlot(0)),
+ timePartitionSlotMap.get(timePartitionSlot));
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testGetAndCreateDataPartition() throws TException, IOException {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ TSStatus status;
+ TDataPartitionReq dataPartitionReq;
+ TDataPartitionTableResp dataPartitionTableResp;
+
+ // Prepare partitionSlotsMap
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ constructPartitionSlotsMap();
+
+ // Set StorageGroups
+ for (int i = 0; i < storageGroupNum; i++) {
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
+ status = client.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+
+ // Test getDataPartitionTable, the result should be empty
+ dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ dataPartitionTableResp = client.getDataPartitionTable(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ Assert.assertEquals(0, dataPartitionTableResp.getDataPartitionTableSize());
+
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+ dataPartitionTableResp = client.getOrCreateDataPartitionTable(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ checkDataPartitionMap(dataPartitionTableResp.getDataPartitionTable());
+
+ // Test getDataPartition, the result should only contain DataPartition created before
+ dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
+ dataPartitionTableResp = client.getDataPartitionTable(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ checkDataPartitionMap(dataPartitionTableResp.getDataPartitionTable());
+ }
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 3a67776e78..9322474cb4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -94,6 +94,27 @@ public class DataPartitionTable {
return result.get();
}
+ /**
+ * Checks whether the specified DataPartition has a predecessor and returns if it does
+ *
+ * @param seriesPartitionSlot Corresponding SeriesPartitionSlot
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @param timePartitionInterval Time partition interval
+ * @return The specific DataPartition's predecessor if exists, null otherwise
+ */
+ public TConsensusGroupId getPrecededDataPartition(
+ TSeriesPartitionSlot seriesPartitionSlot,
+ TTimePartitionSlot timePartitionSlot,
+ long timePartitionInterval) {
+ if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
+ return dataPartitionMap
+ .get(seriesPartitionSlot)
+ .getPrecededDataPartition(timePartitionSlot, timePartitionInterval);
+ } else {
+ return null;
+ }
+ }
+
/**
* Create DataPartition within the specific StorageGroup
*
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 7d76ec942d..ca2d069712 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -86,6 +87,27 @@ public class SeriesPartitionTable {
return result.get();
}
+ /**
+ * Checks whether the specified DataPartition has a predecessor and returns if it does
+ *
+ * @param timePartitionSlot Corresponding TimePartitionSlot
+ * @param timePartitionInterval Time partition interval
+ * @return The specific DataPartition's predecessor if exists, null otherwise
+ */
+ public TConsensusGroupId getPrecededDataPartition(
+ TTimePartitionSlot timePartitionSlot, long timePartitionInterval) {
+ if (timePartitionSlot.getStartTime() < timePartitionInterval) {
+ // The first DataPartition doesn't have predecessor
+ return null;
+ } else {
+ TTimePartitionSlot predecessorSlot =
+ new TTimePartitionSlot(timePartitionSlot.getStartTime() - timePartitionInterval);
+ return seriesPartitionMap
+ .getOrDefault(predecessorSlot, Collections.singletonList(null))
+ .get(0);
+ }
+ }
+
/**
* Create DataPartition within the specific SeriesPartitionSlot
*