You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yo...@apache.org on 2023/05/30 00:17:40 UTC
[iotdb] branch master updated: [IOTDB-5929] Enable DataPartition inherit policy (#9962)
This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f9cebfcb338 [IOTDB-5929] Enable DataPartition inherit policy (#9962)
f9cebfcb338 is described below
commit f9cebfcb338d4d85ef74e98cf258db8ddee4ac19
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Tue May 30 08:17:34 2023 +0800
[IOTDB-5929] Enable DataPartition inherit policy (#9962)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 4 +--
.../confignode/conf/SystemPropertiesUtils.java | 5 +++
.../partition/GreedyPartitionAllocator.java | 36 +++++++++++-----------
.../manager/partition/PartitionManager.java | 14 ++++-----
.../partition/DatabasePartitionTable.java | 4 +--
.../persistence/partition/PartitionInfo.java | 15 ++++-----
.../partition/IoTDBPartitionInheritPolicyIT.java | 26 +++++++++++++---
.../resources/conf/iotdb-common.properties | 6 ++--
.../commons/partition/DataPartitionTable.java | 7 +++--
.../commons/partition/SeriesPartitionTable.java | 24 +++++++++------
10 files changed, 85 insertions(+), 56 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 432a6b08542..6bc95fd034a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -111,9 +111,9 @@ public class ConfigNodeConfig {
/**
* DataPartition within the same SeriesPartitionSlot will inherit the allocation result of the
- * previous TimePartitionSlot if set true
+ * predecessor or successor TimePartitionSlot if set true
*/
- private boolean enableDataPartitionInheritPolicy = false;
+ private boolean enableDataPartitionInheritPolicy = true;
/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 65535;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
index 7d4a7c45d11..6847e7dc937 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java
@@ -220,10 +220,15 @@ public class SystemPropertiesUtils {
// Cluster configuration
systemProperties.setProperty("cluster_name", conf.getClusterName());
+ LOGGER.info("[SystemProperties] store cluster_name: {}", conf.getClusterName());
systemProperties.setProperty("config_node_id", String.valueOf(conf.getConfigNodeId()));
+ LOGGER.info("[SystemProperties] store config_node_id: {}", conf.getConfigNodeId());
systemProperties.setProperty(
"is_seed_config_node",
String.valueOf(ConfigNodeDescriptor.getInstance().isSeedConfigNode()));
+ LOGGER.info(
+ "[SystemProperties] store is_seed_config_node: {}",
+ ConfigNodeDescriptor.getInstance().isSeedConfigNode());
// Startup configuration
systemProperties.setProperty("cn_internal_address", String.valueOf(conf.getInternalAddress()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
index 996cf571165..3ad5cb0a5f1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
@@ -91,14 +91,14 @@ public class GreedyPartitionAllocator implements IPartitionAllocator {
for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> slotsMapEntry :
unassignedDataPartitionSlotsMap.entrySet()) {
- final String storageGroup = slotsMapEntry.getKey();
+ final String database = slotsMapEntry.getKey();
final Map<TSeriesPartitionSlot, TTimeSlotList> unassignedPartitionSlotsMap =
slotsMapEntry.getValue();
// List<Pair<allocatedSlotsNum, TConsensusGroupId>>
List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
getPartitionManager()
- .getSortedRegionGroupSlotsCounter(storageGroup, TConsensusGroupType.DataRegion);
+ .getSortedRegionGroupSlotsCounter(database, TConsensusGroupType.DataRegion);
DataPartitionTable dataPartitionTable = new DataPartitionTable();
@@ -115,33 +115,33 @@ public class GreedyPartitionAllocator implements IPartitionAllocator {
/* 1. Inherit policy */
if (ENABLE_DATA_PARTITION_INHERIT_POLICY) {
- // Check if the current Partition's predecessor is allocated
- // in the same batch of Partition creation
- TConsensusGroupId predecessor =
- seriesPartitionTable.getPrecededDataPartition(
+ // Check if the current Partition's neighbor(predecessor or successor)
+ // is allocated in the same batch of Partition creation
+ TConsensusGroupId neighbor =
+ seriesPartitionTable.getAdjacentDataPartition(
timePartitionSlot, TIME_PARTITION_INTERVAL);
- if (predecessor != null) {
+ if (neighbor != null) {
seriesPartitionTable
.getSeriesPartitionMap()
- .put(timePartitionSlot, Collections.singletonList(predecessor));
- bubbleSort(predecessor, regionSlotsCounter);
+ .put(timePartitionSlot, Collections.singletonList(neighbor));
+ bubbleSort(neighbor, regionSlotsCounter);
continue;
}
- // Check if the current Partition's predecessor was allocated
- // in the former Partition creation
- predecessor =
+ // Check if the current Partition's neighbor(predecessor or successor)
+ // was allocated in the former Partition creation
+ neighbor =
getPartitionManager()
- .getPrecededDataPartition(
- storageGroup,
+ .getAdjacentDataPartition(
+ database,
seriesPartitionEntry.getKey(),
timePartitionSlot,
TIME_PARTITION_INTERVAL);
- if (predecessor != null) {
+ if (neighbor != null) {
seriesPartitionTable
.getSeriesPartitionMap()
- .put(timePartitionSlot, Collections.singletonList(predecessor));
- bubbleSort(predecessor, regionSlotsCounter);
+ .put(timePartitionSlot, Collections.singletonList(neighbor));
+ bubbleSort(neighbor, regionSlotsCounter);
continue;
}
}
@@ -158,7 +158,7 @@ public class GreedyPartitionAllocator implements IPartitionAllocator {
.getDataPartitionMap()
.put(seriesPartitionEntry.getKey(), seriesPartitionTable);
}
- result.put(storageGroup, dataPartitionTable);
+ result.put(database, dataPartitionTable);
}
return result;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index f28c9b237f6..4b1f962b525 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -539,22 +539,22 @@ public class PartitionManager {
}
/**
- * Only leader use this interface. Checks whether the specified DataPartition has a predecessor
- * and returns if it does
+ * Only leader use this interface. Checks whether the specified DataPartition has a predecessor or
+ * successor and returns if it does
*
- * @param storageGroup StorageGroupName
+ * @param database DatabaseName
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
* @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getPrecededDataPartition(
- String storageGroup,
+ public TConsensusGroupId getAdjacentDataPartition(
+ String database,
TSeriesPartitionSlot seriesPartitionSlot,
TTimePartitionSlot timePartitionSlot,
long timePartitionInterval) {
- return partitionInfo.getPrecededDataPartition(
- storageGroup, seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
+ return partitionInfo.getAdjacentDataPartition(
+ database, seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index f890d3b24f7..bec09cf81b2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -258,11 +258,11 @@ public class DatabasePartitionTable {
* @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getPrecededDataPartition(
+ public TConsensusGroupId getAdjacentDataPartition(
TSeriesPartitionSlot seriesPartitionSlot,
TTimePartitionSlot timePartitionSlot,
long timePartitionInterval) {
- return dataPartitionTable.getPrecededDataPartition(
+ return dataPartitionTable.getAdjacentDataPartition(
seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 547802b7b52..6771f17d75b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -374,23 +374,24 @@ public class PartitionInfo implements SnapshotProcessor {
}
/**
- * Checks whether the specified DataPartition has a predecessor and returns if it does
+ * Checks whether the specified DataPartition has a predecessor or successor and returns if it
+ * does
*
- * @param storageGroup StorageGroupName
+ * @param database DatabaseName
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
* @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getPrecededDataPartition(
- String storageGroup,
+ public TConsensusGroupId getAdjacentDataPartition(
+ String database,
TSeriesPartitionSlot seriesPartitionSlot,
TTimePartitionSlot timePartitionSlot,
long timePartitionInterval) {
- if (databasePartitionTables.containsKey(storageGroup)) {
+ if (databasePartitionTables.containsKey(database)) {
return databasePartitionTables
- .get(storageGroup)
- .getPrecededDataPartition(seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
+ .get(database)
+ .getAdjacentDataPartition(seriesPartitionSlot, timePartitionSlot, timePartitionInterval);
} else {
return null;
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index a36df635b8b..8b49948f5f7 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -59,9 +59,9 @@ public class IoTDBPartitionInheritPolicyIT {
private static final String sg = "root.sg";
private static final int storageGroupNum = 2;
- private static final int testSeriesPartitionSlotNum = 1000;
+ private static final int testSeriesPartitionSlotNum = 100;
private static final int seriesPartitionBatchSize = 10;
- private static final int testTimePartitionSlotsNum = 10;
+ private static final int testTimePartitionSlotsNum = 100;
private static final int timePartitionBatchSize = 10;
@BeforeClass
@@ -105,7 +105,11 @@ public class IoTDBPartitionInheritPolicyIT {
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
for (int j = 0; j < testSeriesPartitionSlotNum; j += seriesPartitionBatchSize) {
- for (long k = 0; k < testTimePartitionSlotsNum; k += timePartitionBatchSize) {
+ // Test inherit predecessor or successor
+ boolean isAscending = (j / 10) % 2 == 0;
+ int step = isAscending ? timePartitionBatchSize : -timePartitionBatchSize;
+ int k = isAscending ? 0 : testTimePartitionSlotsNum - timePartitionBatchSize;
+ while (0 <= k && k < testTimePartitionSlotsNum) {
partitionSlotsMap =
ConfigNodeTestUtils.constructPartitionSlotsMap(
storageGroup,
@@ -114,7 +118,6 @@ public class IoTDBPartitionInheritPolicyIT {
k,
k + timePartitionBatchSize,
testTimePartitionInterval);
-
// Let ConfigNode create DataPartition
dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
for (int retry = 0; retry < 5; retry++) {
@@ -124,7 +127,19 @@ public class IoTDBPartitionInheritPolicyIT {
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
dataPartitionTableResp =
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
- if (dataPartitionTableResp != null) {
+ if (dataPartitionTableResp != null
+ && dataPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ storageGroup,
+ j,
+ j + seriesPartitionBatchSize,
+ k,
+ k + timePartitionBatchSize,
+ testTimePartitionInterval,
+ configNodeClient
+ .getDataPartitionTable(dataPartitionReq)
+ .getDataPartitionTable());
break;
}
} catch (Exception e) {
@@ -133,6 +148,7 @@ public class IoTDBPartitionInheritPolicyIT {
TimeUnit.SECONDS.sleep(1);
}
}
+ k += step;
}
}
}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 0cf9835d04f..04fd6c87254 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -129,10 +129,10 @@ cluster_name=defaultCluster
# Whether to enable the DataPartition inherit policy.
-# DataPartition within the same SeriesPartitionSlot will inherit
-# the allocation result of the previous TimePartitionSlot if set true
+# DataPartition within the same SeriesPartitionSlot will inherit the allocation result of
+# the predecessor or successor TimePartitionSlot if set true
# Datatype: Boolean
-# enable_data_partition_inherit_policy=false
+# enable_data_partition_inherit_policy=true
# The policy of cluster RegionGroups' leader distribution.
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 6195b9b3b4d..979b594a690 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -99,21 +99,22 @@ public class DataPartitionTable {
}
/**
- * Checks whether the specified DataPartition has a predecessor and returns if it does
+ * Checks whether the specified DataPartition has a predecessor or successor and returns if it
+ * does
*
* @param seriesPartitionSlot Corresponding SeriesPartitionSlot
* @param timePartitionSlot Corresponding TimePartitionSlot
* @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getPrecededDataPartition(
+ public TConsensusGroupId getAdjacentDataPartition(
TSeriesPartitionSlot seriesPartitionSlot,
TTimePartitionSlot timePartitionSlot,
long timePartitionInterval) {
if (dataPartitionMap.containsKey(seriesPartitionSlot)) {
return dataPartitionMap
.get(seriesPartitionSlot)
- .getPrecededDataPartition(timePartitionSlot, timePartitionInterval);
+ .getAdjacentDataPartition(timePartitionSlot, timePartitionInterval);
} else {
return null;
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 9f8b06fe857..47216cc41ce 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -119,24 +119,30 @@ public class SeriesPartitionTable {
}
/**
- * Checks whether the specified DataPartition has a predecessor and returns if it does
+ * Checks whether the specified DataPartition has a predecessor or successor and returns if it
+ * does
*
* @param timePartitionSlot Corresponding TimePartitionSlot
* @param timePartitionInterval Time partition interval
* @return The specific DataPartition's predecessor if exists, null otherwise
*/
- public TConsensusGroupId getPrecededDataPartition(
+ public TConsensusGroupId getAdjacentDataPartition(
TTimePartitionSlot timePartitionSlot, long timePartitionInterval) {
- if (timePartitionSlot.getStartTime() < timePartitionInterval) {
- // The first DataPartition doesn't have predecessor
- return null;
- } else {
+ if (timePartitionSlot.getStartTime() >= timePartitionInterval) {
+ // Check predecessor first
TTimePartitionSlot predecessorSlot =
new TTimePartitionSlot(timePartitionSlot.getStartTime() - timePartitionInterval);
- return seriesPartitionMap
- .getOrDefault(predecessorSlot, Collections.singletonList(null))
- .get(0);
+ TConsensusGroupId predecessor =
+ seriesPartitionMap.getOrDefault(predecessorSlot, Collections.singletonList(null)).get(0);
+ if (predecessor != null) {
+ return predecessor;
+ }
}
+
+ // Check successor
+ TTimePartitionSlot successorSlot =
+ new TTimePartitionSlot(timePartitionSlot.getStartTime() + timePartitionInterval);
+ return seriesPartitionMap.getOrDefault(successorSlot, Collections.singletonList(null)).get(0);
}
/**