You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/04/13 03:05:02 UTC
[iotdb] branch master updated: [IOTDB-2866] Support get or create SchemaPartition with PatternTree in config node (#5493)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c368467c20 [IOTDB-2866] Support get or create SchemaPartition with PatternTree in config node (#5493)
c368467c20 is described below
commit c368467c202a0a398ba5a5862dc32f8ef96fce9e
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Wed Apr 13 11:04:58 2022 +0800
[IOTDB-2866] Support get or create SchemaPartition with PatternTree in config node (#5493)
[IOTDB-2866] Support get or create SchemaPartition with PatternTree in config node (#5493)
---
.../resources/conf/iotdb-confignode.properties | 13 +-
.../iotdb/confignode/conf/ConfigNodeConf.java | 14 +
.../confignode/conf/ConfigNodeDescriptor.java | 4 +
.../consensus/response/DataPartitionDataSet.java | 23 +-
.../consensus/response/SchemaPartitionDataSet.java | 77 +++---
.../response/StorageGroupSchemaDataSet.java | 12 +-
.../iotdb/confignode/manager/ConfigManager.java | 104 ++++++-
.../iotdb/confignode/manager/ConsensusManager.java | 32 ---
.../apache/iotdb/confignode/manager/Manager.java | 24 +-
.../iotdb/confignode/manager/PartitionManager.java | 130 +++++----
.../iotdb/confignode/manager/RegionManager.java | 4 +
.../confignode/partition/StorageGroupSchema.java | 9 +
.../persistence/PartitionInfoPersistence.java | 60 ++--
.../persistence/RegionInfoPersistence.java | 16 ++
.../iotdb/confignode/physical/PhysicalPlan.java | 2 +-
.../physical/crud/CreateDataPartitionPlan.java | 5 +-
.../physical/crud/CreateRegionsPlan.java | 5 +-
.../physical/crud/CreateSchemaPartitionPlan.java | 70 ++++-
.../crud/GetOrCreateDataPartitionPlan.java | 6 +-
.../crud/GetOrCreateSchemaPartitionPlan.java | 86 +++---
.../confignode/service/executor/PlanExecutor.java | 4 +-
.../server/ConfigNodeRPCServerProcessor.java | 64 +++--
.../confignode/consensus/RatisConsensusDemo.java | 10 +-
.../hash/DeviceGroupHashExecutorManualTest.java | 5 +-
.../physical/SerializeDeserializeUT.java | 39 ++-
.../server/ConfigNodeRPCServerProcessorTest.java | 303 +++++++++++++--------
.../iotdb/commons/partition/DataPartition.java | 4 +-
.../iotdb/commons/partition/SchemaPartition.java | 120 +++++---
.../src/main/thrift/confignode.thrift | 67 +++--
thrift/src/main/thrift/common.thrift | 43 ++-
30 files changed, 882 insertions(+), 473 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 63703ec172..c22da95dfd 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -31,7 +31,7 @@ config_node_rpc_port=22277
# used for communication between config node and config node
# Datatype: int
-# config_node_internal_port=22278
+config_node_internal_port=22278
# this feature is under development, set this as false before it is done.
# Datatype: boolean
@@ -137,6 +137,17 @@ config_node_rpc_port=22277
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# consensus_dir=data/consensus
+####################
+### StorageGroup Configuration
+####################
+
+# Default TTL for storage groups that are not set TTL by statements, in ms. If not set (default),
+# the TTL will be unlimited.
+# Notice: if this property is changed, previous created storage group which are not set TTL will
+# also be affected. And negative values are accepted, which means you can only insert future data.
+# Datatype: long
+# default_ttl=36000000
+
####################
### Region Configuration
####################
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 09c1a05649..586da08506 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -84,8 +84,14 @@ public class ConfigNodeConf {
private String consensusDir =
ConfigNodeConstant.DATA_DIR + File.separator + ConfigNodeConstant.CONSENSUS_FOLDER;
+ /** Default TTL for storage groups that are not set TTL by statements, in ms. */
+ private long defaultTTL = 36000000;
+
+ /** The number of replicas of each region */
private int regionReplicaCount = 3;
+ /** The number of SchemaRegions of each StorageGroup */
private int schemaRegionCount = 1;
+ /** The number of DataRegions of each StorageGroup */
private int dataRegionCount = 1;
public ConfigNodeConf() {
@@ -252,6 +258,14 @@ public class ConfigNodeConf {
this.dataDirs = dataDirs;
}
+ public long getDefaultTTL() {
+ return defaultTTL;
+ }
+
+ public void setDefaultTTL(long defaultTTL) {
+ this.defaultTTL = defaultTTL;
+ }
+
public int getRegionReplicaCount() {
return regionReplicaCount;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 29123554d7..83f4c539c2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -165,6 +165,10 @@ public class ConfigNodeDescriptor {
conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
+ conf.setDefaultTTL(
+ Long.parseLong(
+ properties.getProperty("default_ttl", String.valueOf(conf.getDefaultTTL()))));
+
conf.setRegionReplicaCount(
Integer.parseInt(
properties.getProperty(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index 91397f7ade..2b88a8e8ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -40,6 +41,10 @@ public class DataPartitionDataSet implements DataSet {
private DataPartition dataPartition;
+ public DataPartitionDataSet() {
+ // Empty constructor
+ }
+
public TSStatus getStatus() {
return status;
}
@@ -48,10 +53,6 @@ public class DataPartitionDataSet implements DataSet {
this.status = status;
}
- public DataPartition getDataPartition() {
- return dataPartition;
- }
-
public void setDataPartition(DataPartition dataPartition) {
this.dataPartition = dataPartition;
}
@@ -65,6 +66,7 @@ public class DataPartitionDataSet implements DataSet {
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
dataPartitionMap = new HashMap<>();
resp.setStatus(status);
+
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition
.getDataPartitionMap()
@@ -82,7 +84,6 @@ public class DataPartitionDataSet implements DataSet {
.get(storageGroup)
.putIfAbsent(tSeriesPartitionSlot, new HashMap<>());
- // Extract Map<TimePartitionSlot, List<RegionReplicaSet>>
timePartitionSlotReplicaSetListMap.forEach(
((timePartitionSlot, regionReplicaSets) -> {
// Extract TTimePartitionSlot
@@ -97,14 +98,12 @@ public class DataPartitionDataSet implements DataSet {
regionReplicaSets.forEach(
regionReplicaSet -> {
TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
-
// Set TRegionReplicaSet's RegionId
- tRegionReplicaSet.setRegionId(
- regionReplicaSet.getConsensusGroupId().getId());
-
- // Set TRegionReplicaSet's GroupType
- tRegionReplicaSet.setGroupType("DataRegion");
-
+ ByteBuffer buffer =
+ ByteBuffer.allocate(Byte.BYTES + Integer.BYTES);
+ regionReplicaSet.getConsensusGroupId().serializeImpl(buffer);
+ buffer.flip();
+ tRegionReplicaSet.setRegionId(buffer);
// Set TRegionReplicaSet's EndPoints
List<EndPoint> endPointList = new ArrayList<>();
regionReplicaSet
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
index bef6f975b1..aad817a0e1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
@@ -22,16 +22,18 @@ package org.apache.iotdb.confignode.consensus.response;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-/** TODO: Reconstruct this class after PatterTree is moved to node-commons */
public class SchemaPartitionDataSet implements DataSet {
private TSStatus status;
@@ -50,45 +52,54 @@ public class SchemaPartitionDataSet implements DataSet {
this.status = status;
}
- public SchemaPartition getSchemaPartition() {
- return schemaPartition;
- }
-
public void setSchemaPartition(SchemaPartition schemaPartition) {
this.schemaPartition = schemaPartition;
}
public void convertToRpcSchemaPartitionResp(TSchemaPartitionResp resp) {
- Map<String, Map<Integer, TRegionReplicaSet>> schemaRegionMap = new HashMap<>();
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>();
+ resp.setStatus(status);
+
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ schemaPartition
+ .getSchemaPartitionMap()
+ .forEach(
+ (storageGroup, seriesPartitionSlotRegionReplicaSetMap) -> {
+ // Extract StorageGroupName
+ schemaPartitionMap.putIfAbsent(storageGroup, new HashMap<>());
- schemaPartition
- .getSchemaPartitionMap()
- .forEach(
- (storageGroup, seriesPartitionSlotRegionReplicaSetMap) -> {
- // Extract StorageGroupName
- schemaRegionMap.putIfAbsent(storageGroup, new HashMap<>());
+ // Extract Map<SeriesPartitionSlot, RegionReplicaSet>
+ seriesPartitionSlotRegionReplicaSetMap.forEach(
+ ((seriesPartitionSlot, regionReplicaSet) -> {
+ // Extract TSeriesPartitionSlot
+ TSeriesPartitionSlot tSeriesPartitionSlot =
+ new TSeriesPartitionSlot(seriesPartitionSlot.getSlotId());
- // Extract Map<SeriesPartitionSlot, RegionReplicaSet>
- seriesPartitionSlotRegionReplicaSetMap.forEach(
- ((seriesPartitionSlot, regionReplicaSet) -> {
- TRegionReplicaSet regionMessage = new TRegionReplicaSet();
- regionMessage.setRegionId(regionReplicaSet.getConsensusGroupId().getId());
- List<EndPoint> endPointList = new ArrayList<>();
- regionReplicaSet
- .getDataNodeList()
- .forEach(
- dataNodeLocation ->
- endPointList.add(
- new EndPoint(
- dataNodeLocation.getEndPoint().getIp(),
- dataNodeLocation.getEndPoint().getPort())));
- regionMessage.setEndpoint(endPointList);
- schemaRegionMap
- .get(storageGroup)
- .put(seriesPartitionSlot.getSlotId(), regionMessage);
- }));
- });
+ // Extract TRegionReplicaSet
+ TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
+ // Set TRegionReplicaSet's RegionId
+ ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES);
+ regionReplicaSet.getConsensusGroupId().serializeImpl(buffer);
+ buffer.flip();
+ tRegionReplicaSet.setRegionId(buffer);
+ // Set TRegionReplicaSet's EndPoints
+ List<EndPoint> endPointList = new ArrayList<>();
+ regionReplicaSet
+ .getDataNodeList()
+ .forEach(
+ dataNodeLocation ->
+ endPointList.add(
+ new EndPoint(
+ dataNodeLocation.getEndPoint().getIp(),
+ dataNodeLocation.getEndPoint().getPort())));
+ tRegionReplicaSet.setEndpoint(endPointList);
+ schemaPartitionMap
+ .get(storageGroup)
+ .put(tSeriesPartitionSlot, tRegionReplicaSet);
+ }));
+ });
+ }
- // resp.setSchemaRegionMap(schemaRegionMap);
+ resp.setSchemaRegionMap(schemaPartitionMap);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
index 245cd58d91..48e0c181ce 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.confignode.consensus.response;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -53,14 +53,14 @@ public class StorageGroupSchemaDataSet implements DataSet {
this.schemaList = schemaList;
}
- public void convertToRPCStorageGroupMessageResp(TStorageGroupMessageResp resp) {
+ public void convertToRPCStorageGroupSchemaResp(TStorageGroupSchemaResp resp) {
resp.setStatus(status);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- Map<String, TStorageGroupMessage> storageGroupMessageMap = new HashMap<>();
+ Map<String, TStorageGroupSchema> storageGroupMessageMap = new HashMap<>();
for (StorageGroupSchema schema : schemaList) {
- storageGroupMessageMap.put(schema.getName(), new TStorageGroupMessage(schema.getName()));
+ storageGroupMessageMap.put(schema.getName(), new TStorageGroupSchema(schema.getName()));
}
- resp.setStorageGroupMessageMap(storageGroupMessageMap);
+ resp.setStorageGroupSchemaMap(storageGroupMessageMap);
}
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index d84ace336e..1a4fbf07f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -22,12 +22,14 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
@@ -35,9 +37,16 @@ import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.rpc.TSStatusCode;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/** Entry of all management, AssignPartitionManager,AssignRegionManager. */
public class ConfigManager implements Manager {
@@ -123,26 +132,94 @@ public class ConfigManager implements Manager {
}
@Override
- public DataSet getSchemaPartition(PhysicalPlan physicalPlan) {
+ public DataSet getSchemaPartition(PathPatternTree patternTree) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ List<String> devicePaths = patternTree.findAllDevicePaths();
+ List<String> storageGroups = getRegionManager().getStorageGroupNames();
+
+ GetOrCreateSchemaPartitionPlan getSchemaPartitionPlan =
+ new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetSchemaPartition);
+ Map<String, List<SeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
+
+ boolean getAll = false;
+ Set<String> getAllSet = new HashSet<>();
+ for (String devicePath : devicePaths) {
+ boolean matchStorageGroup = false;
+ for (String storageGroup : storageGroups) {
+ if (devicePath.contains(storageGroup)) {
+ matchStorageGroup = true;
+ if (devicePath.contains("*")) {
+ // Get all SchemaPartitions of this StorageGroup if the devicePath contains "*"
+ getAllSet.add(storageGroup);
+ } else {
+ // Get the specific SchemaPartition
+ partitionSlotsMap
+ .computeIfAbsent(storageGroup, key -> new ArrayList<>())
+ .add(getPartitionManager().getSeriesPartitionSlot(devicePath));
+ }
+ break;
+ }
+ }
+ if (!matchStorageGroup && devicePath.contains("**")) {
+ // Get all SchemaPartitions if there exists one devicePath that contains "**"
+ getAll = true;
+ }
+ }
- // TODO: Only leader can query SchemaPartition
+ if (getAll) {
+ partitionSlotsMap = new HashMap<>();
+ } else {
+ for (String storageGroup : getAllSet) {
+ if (partitionSlotsMap.containsKey(storageGroup)) {
+ partitionSlotsMap.replace(storageGroup, new ArrayList<>());
+ } else {
+ partitionSlotsMap.put(storageGroup, new ArrayList<>());
+ }
+ }
+ }
- if (physicalPlan instanceof GetOrCreateSchemaPartitionPlan) {
- return partitionManager.getSchemaPartition((GetOrCreateSchemaPartitionPlan) physicalPlan);
+ getSchemaPartitionPlan.setPartitionSlotsMap(partitionSlotsMap);
+ return partitionManager.getSchemaPartition(getSchemaPartitionPlan);
+ } else {
+ SchemaPartitionDataSet dataSet = new SchemaPartitionDataSet();
+ dataSet.setStatus(status);
+ return dataSet;
}
- return new SchemaPartitionDataSet();
}
@Override
- public DataSet getOrCreateSchemaPartition(PhysicalPlan physicalPlan) {
+ public DataSet getOrCreateSchemaPartition(PathPatternTree patternTree) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ List<String> devicePaths = patternTree.findAllDevicePaths();
+ List<String> storageGroups = getRegionManager().getStorageGroupNames();
- // TODO: Only leader can apply SchemaPartition
+ GetOrCreateSchemaPartitionPlan getOrCreateSchemaPartitionPlan =
+ new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetOrCreateSchemaPartition);
+ Map<String, List<SeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
- if (physicalPlan instanceof GetOrCreateSchemaPartitionPlan) {
- return partitionManager.getOrCreateSchemaPartition(
- (GetOrCreateSchemaPartitionPlan) physicalPlan);
+ for (String devicePath : devicePaths) {
+ if (!devicePath.contains("*")) {
+ // Only check devicePaths that without "*"
+ for (String storageGroup : storageGroups) {
+ if (devicePath.contains(storageGroup)) {
+ partitionSlotsMap
+ .computeIfAbsent(storageGroup, key -> new ArrayList<>())
+ .add(getPartitionManager().getSeriesPartitionSlot(devicePath));
+ break;
+ }
+ }
+ }
+ }
+
+ getOrCreateSchemaPartitionPlan.setPartitionSlotsMap(partitionSlotsMap);
+ return partitionManager.getOrCreateSchemaPartition(getOrCreateSchemaPartitionPlan);
+ } else {
+ SchemaPartitionDataSet dataSet = new SchemaPartitionDataSet();
+ dataSet.setStatus(status);
+ return dataSet;
}
- return new SchemaPartitionDataSet();
}
@Override
@@ -201,6 +278,11 @@ public class ConfigManager implements Manager {
return consensusManager;
}
+ @Override
+ public PartitionManager getPartitionManager() {
+ return partitionManager;
+ }
+
@Override
public TSStatus operatePermission(PhysicalPlan physicalPlan) {
if (physicalPlan instanceof AuthorPlan) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index a7c5205caf..e264556ddd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
-import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
@@ -38,8 +36,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -52,10 +48,7 @@ public class ConsensusManager {
private ConsensusGroupId consensusGroupId;
private IConsensus consensusImpl;
- private SeriesPartitionExecutor executor;
-
public ConsensusManager() throws IOException {
- setSeriesPartitionExecutor();
setConsensusLayer();
}
@@ -63,31 +56,6 @@ public class ConsensusManager {
consensusImpl.stop();
}
- /** Build DeviceGroupHashExecutor */
- private void setSeriesPartitionExecutor() {
- try {
- Class<?> executor = Class.forName(conf.getSeriesPartitionExecutorClass());
- Constructor<?> executorConstructor = executor.getConstructor(int.class);
- this.executor =
- (SeriesPartitionExecutor)
- executorConstructor.newInstance(conf.getSeriesPartitionSlotNum());
- } catch (ClassNotFoundException
- | NoSuchMethodException
- | InstantiationException
- | IllegalAccessException
- | InvocationTargetException e) {
- LOGGER.error(
- "Couldn't Constructor SeriesPartitionExecutor class: {}",
- conf.getSeriesPartitionExecutorClass(),
- e);
- executor = null;
- }
- }
-
- public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
- return executor.getSeriesPartitionSlot(device);
- }
-
/** Build ConfigNodeGroup ConsensusLayer */
private void setConsensusLayer() throws IOException {
// There is only one ConfigNodeGroup
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 25f40841e2..291713002a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
/**
* a subset of services provided by {@ConfigManager}. For use internally only, passed to Managers,
@@ -36,26 +37,33 @@ public interface Manager {
boolean isStopped();
/**
- * get data node info manager
+ * Get DataManager
*
- * @return DataNodeInfoManager instance
+ * @return DataNodeManager instance
*/
DataNodeManager getDataNodeManager();
/**
- * get consensus manager
+ * Get ConsensusManager
*
* @return ConsensusManager instance
*/
ConsensusManager getConsensusManager();
/**
- * get assign region manager
+ * Get RegionManager
*
- * @return AssignRegionManager instance
+ * @return RegionManager instance
*/
RegionManager getRegionManager();
+ /**
+ * Get PartitionManager
+ *
+ * @return PartitionManager instance
+ */
+ PartitionManager getPartitionManager();
+
/**
* Register DataNode
*
@@ -90,18 +98,16 @@ public interface Manager {
/**
* Get SchemaPartition
*
- * @param physicalPlan SchemaPartitionPlan
* @return SchemaPartitionDataSet
*/
- DataSet getSchemaPartition(PhysicalPlan physicalPlan);
+ DataSet getSchemaPartition(PathPatternTree patternTree);
/**
* Get or create SchemaPartition
*
- * @param physicalPlan SchemaPartitionPlan
* @return SchemaPartitionDataSet
*/
- DataSet getOrCreateSchemaPartition(PhysicalPlan physicalPlan);
+ DataSet getOrCreateSchemaPartition(PathPatternTree patternTree);
/**
* Get DataPartition
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index c2bde8119e..6a6b9ef26e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -21,21 +21,25 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -52,8 +56,11 @@ public class PartitionManager {
private final Manager configNodeManager;
+ private SeriesPartitionExecutor executor;
+
public PartitionManager(Manager configNodeManager) {
this.configNodeManager = configNodeManager;
+ setSeriesPartitionExecutor();
}
private ConsensusManager getConsensusManager() {
@@ -61,9 +68,9 @@ public class PartitionManager {
}
/**
- * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
+ * Get SchemaPartition
*
- * @param physicalPlan SchemaPartitionPlan with PatternTree
+ * @param physicalPlan SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionDataSet that contains only existing SchemaPartition
*/
public DataSet getSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
@@ -74,55 +81,58 @@ public class PartitionManager {
}
/**
- * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
- * and create a new one if it does not exist
+ * Get SchemaPartition and create a new one if it does not exist
*
- * @param physicalPlan SchemaPartitionPlan with PatternTree
+ * @param physicalPlan SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionDataSet
*/
public DataSet getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
- String storageGroup = physicalPlan.getStorageGroup();
- List<Integer> seriesPartitionSlots = physicalPlan.getSeriesPartitionSlots();
- List<Integer> noAssignedSeriesPartitionSlots =
- partitionInfoPersistence.filterSchemaRegionNoAssignedPartitionSlots(
- storageGroup, seriesPartitionSlots);
-
- if (noAssignedSeriesPartitionSlots.size() > 0) {
- // allocate partition by storage group and device group id
- Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets =
- allocateSchemaPartition(storageGroup, noAssignedSeriesPartitionSlots);
- physicalPlan.setSchemaPartitionReplicaSet(schemaPartitionReplicaSets);
-
- ConsensusWriteResponse consensusWriteResponse = getConsensusManager().write(physicalPlan);
- if (consensusWriteResponse.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("Allocate schema partition to {}.", schemaPartitionReplicaSets);
- }
+ Map<String, List<SeriesPartitionSlot>> noAssignedSchemaPartitionSlots =
+ partitionInfoPersistence.filterNoAssignedSchemaPartitionSlots(
+ physicalPlan.getPartitionSlotsMap());
+
+ if (noAssignedSchemaPartitionSlots.size() > 0) {
+ // Allocate SchemaPartition
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> assignedSchemaPartition =
+ allocateSchemaPartition(noAssignedSchemaPartitionSlots);
+
+ // Persist SchemaPartition
+ CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
+ createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
+ getConsensusManager().write(createPlan);
}
- physicalPlan.setSchemaPartitionReplicaSet(new HashMap<>());
return getSchemaPartition(physicalPlan);
}
/**
* TODO: allocate schema partition by LoadManager
*
- * @param storageGroup StorageGroupName
- * @param noAssignedSeriesPartitionSlots not assigned SeriesPartitionSlots
- * @return assign result
+ * @param noAssignedSchemaPartitionSlotsMap Map<StorageGroupName, List<SeriesPartitionSlot>>
+ * @return assign result, Map<StorageGroupName, Map<SeriesPartitionSlot, RegionReplicaSet>>
*/
- private Map<Integer, RegionReplicaSet> allocateSchemaPartition(
- String storageGroup, List<Integer> noAssignedSeriesPartitionSlots) {
- List<RegionReplicaSet> schemaRegionEndPoints =
- RegionInfoPersistence.getInstance().getSchemaRegionEndPoint(storageGroup);
- Random random = new Random();
- Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets = new HashMap<>();
- for (Integer seriesPartitionSlot : noAssignedSeriesPartitionSlots) {
- RegionReplicaSet schemaRegionReplicaSet =
- schemaRegionEndPoints.get(random.nextInt(schemaRegionEndPoints.size()));
- schemaPartitionReplicaSets.put(seriesPartitionSlot, schemaRegionReplicaSet);
+ private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> allocateSchemaPartition(
+ Map<String, List<SeriesPartitionSlot>> noAssignedSchemaPartitionSlotsMap) {
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> result = new HashMap<>();
+
+ for (String storageGroup : noAssignedSchemaPartitionSlotsMap.keySet()) {
+ List<SeriesPartitionSlot> noAssignedPartitionSlots =
+ noAssignedSchemaPartitionSlotsMap.get(storageGroup);
+ List<RegionReplicaSet> schemaRegionEndPoints =
+ RegionInfoPersistence.getInstance().getSchemaRegionEndPoint(storageGroup);
+ Random random = new Random();
+
+ Map<SeriesPartitionSlot, RegionReplicaSet> allocateResult = new HashMap<>();
+ noAssignedPartitionSlots.forEach(
+ seriesPartitionSlot ->
+ allocateResult.put(
+ seriesPartitionSlot,
+ schemaRegionEndPoints.get(random.nextInt(schemaRegionEndPoints.size()))));
+
+ result.put(storageGroup, allocateResult);
}
- return schemaPartitionReplicaSets;
+
+ return result;
}
/**
@@ -155,15 +165,11 @@ public class PartitionManager {
// Allocate DataPartition
Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
assignedDataPartition = allocateDataPartition(noAssignedDataPartitionSlots);
+
+ // Persist DataPartition
CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
createPlan.setAssignedDataPartition(assignedDataPartition);
-
- // Persistence DataPartition
- ConsensusWriteResponse consensusWriteResponse = getConsensusManager().write(createPlan);
- if (consensusWriteResponse.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("Allocate data partition to {}.", assignedDataPartition);
- }
+ getConsensusManager().write(createPlan);
}
return getDataPartition(physicalPlan);
@@ -209,4 +215,36 @@ public class PartitionManager {
}
return result;
}
+
+ /** Construct SeriesPartitionExecutor by iotdb-confignode.propertis */
+ private void setSeriesPartitionExecutor() {
+ ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
+ try {
+ Class<?> executor = Class.forName(conf.getSeriesPartitionExecutorClass());
+ Constructor<?> executorConstructor = executor.getConstructor(int.class);
+ this.executor =
+ (SeriesPartitionExecutor)
+ executorConstructor.newInstance(conf.getSeriesPartitionSlotNum());
+ } catch (ClassNotFoundException
+ | NoSuchMethodException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ LOGGER.error(
+ "Couldn't Constructor SeriesPartitionExecutor class: {}",
+ conf.getSeriesPartitionExecutorClass(),
+ e);
+ executor = null;
+ }
+ }
+
+ /**
+ * Get SeriesPartitionSlot
+ *
+ * @param devicePath Full path ending with device name
+ * @return SeriesPartitionSlot
+ */
+ public SeriesPartitionSlot getSeriesPartitionSlot(String devicePath) {
+ return executor.getSeriesPartitionSlot(devicePath);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
index 86e6f31774..1f76920582 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
@@ -131,4 +131,8 @@ public class RegionManager {
getConsensusManager().read(new QueryStorageGroupSchemaPlan());
return (StorageGroupSchemaDataSet) readResponse.getDataset();
}
+
+ public List<String> getStorageGroupNames() {
+ return regionInfoPersistence.getStorageGroupNames();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
index 9bbe657503..e128a2092c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
@@ -30,6 +30,7 @@ import java.util.Objects;
public class StorageGroupSchema {
private String name;
+ private long TTL;
private final List<ConsensusGroupId> schemaRegionGroupIds;
private final List<ConsensusGroupId> dataRegionGroupIds;
@@ -48,6 +49,14 @@ public class StorageGroupSchema {
return name;
}
+ public long getTTL() {
+ return TTL;
+ }
+
+ public void setTTL(long TTL) {
+ this.TTL = TTL;
+ }
+
public List<ConsensusGroupId> getSchemaRegionGroupIds() {
return schemaRegionGroupIds;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
index 2b8d8a5395..ab0eb0c221 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.consensus.common.DataSet;
@@ -64,45 +65,45 @@ public class PartitionInfoPersistence {
}
/**
- * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
+ * Get SchemaPartition
*
- * @param physicalPlan SchemaPartitionPlan with PatternTree
+ * @param physicalPlan SchemaPartitionPlan with partitionSlotsMap
* @return SchemaPartitionDataSet that contains only existing SchemaPartition
*/
public DataSet getSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
SchemaPartitionDataSet schemaPartitionDataSet = new SchemaPartitionDataSet();
schemaPartitionReadWriteLock.readLock().lock();
+
try {
- String storageGroup = physicalPlan.getStorageGroup();
- List<Integer> deviceGroupIDs = physicalPlan.getSeriesPartitionSlots();
- SchemaPartition schemaPartitionInfo = new SchemaPartition();
- schemaPartitionInfo.setSchemaPartitionMap(
- schemaPartition.getSchemaPartition(storageGroup, deviceGroupIDs));
- schemaPartitionDataSet.setSchemaPartition(schemaPartitionInfo);
+ schemaPartitionDataSet.setSchemaPartition(
+ schemaPartition.getSchemaPartition(physicalPlan.getPartitionSlotsMap()));
} finally {
schemaPartitionReadWriteLock.readLock().unlock();
schemaPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
+
return schemaPartitionDataSet;
}
/**
- * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
- * and create a new one if it does not exist
+ * Create SchemaPartition
*
- * @param physicalPlan SchemaPartitionPlan with PatternTree
- * @return SchemaPartitionDataSet
+ * @param physicalPlan CreateSchemaPartitionPlan with SchemaPartition assigned result
+ * @return TSStatusCode.SUCCESS_STATUS when creation successful
*/
- public TSStatus createSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
+ public TSStatus createSchemaPartition(CreateSchemaPartitionPlan physicalPlan) {
schemaPartitionReadWriteLock.writeLock().lock();
try {
- // Allocate SchemaPartition by SchemaPartitionPlan
- String storageGroup = physicalPlan.getStorageGroup();
- Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets =
- physicalPlan.getSchemaPartitionReplicaSets();
- schemaPartitionReplicaSets.forEach(
- (key, value) -> schemaPartition.setSchemaRegionReplicaSet(storageGroup, key, value));
+ // Allocate SchemaPartition by CreateSchemaPartitionPlan
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> assignedResult =
+ physicalPlan.getAssignedSchemaPartition();
+ assignedResult.forEach(
+ (storageGroup, partitionSlots) ->
+ partitionSlots.forEach(
+ (seriesPartitionSlot, regionReplicaSet) ->
+ schemaPartition.createSchemaPartition(
+ storageGroup, seriesPartitionSlot, regionReplicaSet)));
} finally {
schemaPartitionReadWriteLock.writeLock().unlock();
}
@@ -110,14 +111,12 @@ public class PartitionInfoPersistence {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- /** TODO: Reconstruct this interface after PatterTree is moved to node-commons */
- public List<Integer> filterSchemaRegionNoAssignedPartitionSlots(
- String storageGroup, List<Integer> seriesPartitionSlots) {
- List<Integer> result;
+ public Map<String, List<SeriesPartitionSlot>> filterNoAssignedSchemaPartitionSlots(
+ Map<String, List<SeriesPartitionSlot>> partitionSlotsMap) {
+ Map<String, List<SeriesPartitionSlot>> result;
schemaPartitionReadWriteLock.readLock().lock();
try {
- result =
- schemaPartition.filterNoAssignedSeriesPartitionSlot(storageGroup, seriesPartitionSlots);
+ result = schemaPartition.filterNoAssignedSchemaPartitionSlot(partitionSlotsMap);
} finally {
schemaPartitionReadWriteLock.readLock().unlock();
}
@@ -127,13 +126,13 @@ public class PartitionInfoPersistence {
/**
* Get DataPartition
*
- * @param physicalPlan DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
- * List<TimePartitionSlot>>>
+ * @param physicalPlan DataPartitionPlan with partitionSlotsMap
* @return DataPartitionDataSet that contains only existing DataPartition
*/
public DataSet getDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
DataPartitionDataSet dataPartitionDataSet = new DataPartitionDataSet();
dataPartitionReadWriteLock.readLock().lock();
+
try {
dataPartitionDataSet.setDataPartition(
dataPartition.getDataPartition(physicalPlan.getPartitionSlotsMap()));
@@ -141,9 +140,16 @@ public class PartitionInfoPersistence {
dataPartitionReadWriteLock.readLock().unlock();
dataPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
+
return dataPartitionDataSet;
}
+ /**
+ * Create DataPartition
+ *
+ * @param physicalPlan CreateDataPartitionPlan with DataPartition assigned result
+ * @return TSStatusCode.SUCCESS_STATUS when creation successful
+ */
public TSStatus createDataPartition(CreateDataPartitionPlan physicalPlan) {
dataPartitionReadWriteLock.writeLock().lock();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
index 620814a7e3..97249e85d9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
@@ -162,6 +162,22 @@ public class RegionInfoPersistence {
return dataRegionEndPoints;
}
+ /**
+ * Get all StorageGroups' name
+ *
+ * @return List<String>, all storageGroups' name
+ */
+ public List<String> getStorageGroupNames() {
+ List<String> storageGroups;
+ regionReadWriteLock.readLock().lock();
+ try {
+ storageGroups = new ArrayList<>(storageGroupsMap.keySet());
+ } finally {
+ regionReadWriteLock.readLock().unlock();
+ }
+ return storageGroups;
+ }
+
public int generateNextRegionGroupId() {
int result;
regionAllocateLock.writeLock().lock();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
index 957e26d52e..7bd81645a5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
@@ -108,7 +108,7 @@ public abstract class PhysicalPlan implements IConsensusRequest {
plan = new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetSchemaPartition);
break;
case CreateSchemaPartition:
- plan = new CreateSchemaPartitionPlan(PhysicalPlanType.CreateSchemaPartition);
+ plan = new CreateSchemaPartitionPlan();
break;
case GetOrCreateSchemaPartition:
plan = new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetOrCreateSchemaPartition);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
index d2bb602e09..9daf5ef921 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
+/** Create DataPartition by assignedDataPartition */
public class CreateDataPartitionPlan extends PhysicalPlan {
private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
@@ -100,13 +101,11 @@ public class CreateDataPartitionPlan extends PhysicalPlan {
.put(timePartitionSlot, new ArrayList<>());
int regionReplicaSetNum = buffer.getInt();
for (int l = 0; l < regionReplicaSetNum; l++) {
- RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
- regionReplicaSet.deserializeImpl(buffer);
assignedDataPartition
.get(storageGroupName)
.get(seriesPartitionSlot)
.get(timePartitionSlot)
- .add(regionReplicaSet);
+ .add(RegionReplicaSet.deserializeImpl(buffer));
}
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
index d61a8bb702..56176a70dc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+/** Create regions for specific StorageGroup */
public class CreateRegionsPlan extends PhysicalPlan {
private String storageGroup;
@@ -74,9 +75,7 @@ public class CreateRegionsPlan extends PhysicalPlan {
int length = buffer.getInt();
for (int i = 0; i < length; i++) {
- RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
- regionReplicaSet.deserializeImpl(buffer);
- regionReplicaSets.add(regionReplicaSet);
+ regionReplicaSets.add(RegionReplicaSet.deserializeImpl(buffer));
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
index 15b5b2ec54..6a93dfdb6b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
@@ -18,22 +18,82 @@
*/
package org.apache.iotdb.confignode.physical.crud;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
-/** TODO: Reconstruct this interface after PatterTree is moved to node-commons */
+/** Create SchemaPartition by assignedSchemaPartition */
public class CreateSchemaPartitionPlan extends PhysicalPlan {
- public CreateSchemaPartitionPlan(PhysicalPlanType type) {
- super(type);
+ private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> assignedSchemaPartition;
+
+ public CreateSchemaPartitionPlan() {
+ super(PhysicalPlanType.CreateSchemaPartition);
+ }
+
+ public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getAssignedSchemaPartition() {
+ return assignedSchemaPartition;
+ }
+
+ public void setAssignedSchemaPartition(
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> assignedSchemaPartition) {
+ this.assignedSchemaPartition = assignedSchemaPartition;
}
@Override
- protected void serializeImpl(ByteBuffer buffer) {}
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(PhysicalPlanType.CreateSchemaPartition.ordinal());
+
+ buffer.putInt(assignedSchemaPartition.size());
+ assignedSchemaPartition.forEach(
+ (storageGroup, partitionSlots) -> {
+ SerializeDeserializeUtil.write(storageGroup, buffer);
+ buffer.putInt(partitionSlots.size());
+ partitionSlots.forEach(
+ (seriesPartitionSlot, regionReplicaSet) -> {
+ seriesPartitionSlot.serializeImpl(buffer);
+ regionReplicaSet.serializeImpl(buffer);
+ });
+ });
+ }
@Override
- protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ assignedSchemaPartition = new HashMap<>();
+
+ int storageGroupNum = buffer.getInt();
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = SerializeDeserializeUtil.readString(buffer);
+ assignedSchemaPartition.put(storageGroup, new HashMap<>());
+ int seriesPartitionSlotNum = buffer.getInt();
+ for (int j = 0; j < seriesPartitionSlotNum; j++) {
+ SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot();
+ seriesPartitionSlot.deserializeImpl(buffer);
+ assignedSchemaPartition
+ .get(storageGroup)
+ .put(seriesPartitionSlot, RegionReplicaSet.deserializeImpl(buffer));
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CreateSchemaPartitionPlan that = (CreateSchemaPartitionPlan) o;
+ return assignedSchemaPartition.equals(that.assignedSchemaPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(assignedSchemaPartition);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
index 5cd6aec141..7715e7e6da 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.physical.crud;
import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -32,7 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-/** Query or apply DataPartition by the specific storageGroup and the deviceGroupStartTimeMap. */
+/** Get or create DataPartition by the specific partitionSlotsMap. */
public class GetOrCreateDataPartitionPlan extends PhysicalPlan {
private Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap;
@@ -45,6 +46,7 @@ public class GetOrCreateDataPartitionPlan extends PhysicalPlan {
return partitionSlotsMap;
}
+ @TestOnly
public void setPartitionSlotsMap(
Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
this.partitionSlotsMap = partitionSlotsMap;
@@ -85,7 +87,7 @@ public class GetOrCreateDataPartitionPlan extends PhysicalPlan {
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.GetDataPartition.ordinal());
+ buffer.putInt(getType().ordinal());
buffer.putInt(partitionSlotsMap.size());
partitionSlotsMap.forEach(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
index 642aceed5c..5b6e3d5149 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
@@ -18,81 +18,79 @@
*/
package org.apache.iotdb.confignode.physical.crud;
-import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
-/** Query or apply SchemaPartition by the specific storageGroup and the deviceGroupStartTimeMap. */
+/** Get or create SchemaPartition by the specific partitionSlotsMap. */
public class GetOrCreateSchemaPartitionPlan extends PhysicalPlan {
- private String storageGroup;
- private List<Integer> seriesPartitionSlots;
- private Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets;
+
+ // Map<StorageGroup, List<SeriesPartitionSlot>>
+ // Get all SchemaPartitions when the partitionSlotsMap is empty
+ // Get all exists SchemaPartitions in one StorageGroup when the SeriesPartitionSlot is empty
+ private Map<String, List<SeriesPartitionSlot>> partitionSlotsMap;
public GetOrCreateSchemaPartitionPlan(PhysicalPlanType physicalPlanType) {
super(physicalPlanType);
}
- public GetOrCreateSchemaPartitionPlan(
- PhysicalPlanType physicalPlanType, String storageGroup, List<Integer> seriesPartitionSlots) {
- this(physicalPlanType);
- this.storageGroup = storageGroup;
- this.seriesPartitionSlots = seriesPartitionSlots;
- }
-
- public void setSchemaPartitionReplicaSet(
- Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets) {
- this.schemaPartitionReplicaSets = deviceGroupIdReplicaSets;
+ public void setPartitionSlotsMap(Map<String, List<SeriesPartitionSlot>> partitionSlotsMap) {
+ this.partitionSlotsMap = partitionSlotsMap;
}
- public Map<Integer, RegionReplicaSet> getSchemaPartitionReplicaSets() {
- return schemaPartitionReplicaSets;
+ public Map<String, List<SeriesPartitionSlot>> getPartitionSlotsMap() {
+ return partitionSlotsMap;
}
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.GetDataPartition.ordinal());
- SerializeDeserializeUtil.write(storageGroup, buffer);
- buffer.putInt(seriesPartitionSlots.size());
- seriesPartitionSlots.forEach(id -> SerializeDeserializeUtil.write(id, buffer));
+ buffer.putInt(getType().ordinal());
- buffer.putInt(schemaPartitionReplicaSets.size());
- for (Map.Entry<Integer, RegionReplicaSet> entry : schemaPartitionReplicaSets.entrySet()) {
- buffer.putInt(entry.getKey());
- entry.getValue().serializeImpl(buffer);
- }
+ buffer.putInt(partitionSlotsMap.size());
+ partitionSlotsMap.forEach(
+ (storageGroup, seriesPartitionSlots) -> {
+ SerializeDeserializeUtil.write(storageGroup, buffer);
+ buffer.putInt(seriesPartitionSlots.size());
+ seriesPartitionSlots.forEach(
+ seriesPartitionSlot -> seriesPartitionSlot.serializeImpl(buffer));
+ });
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- storageGroup = SerializeDeserializeUtil.readString(buffer);
- int idSize = SerializeDeserializeUtil.readInt(buffer);
- for (int i = 0; i < idSize; i++) {
- seriesPartitionSlots.add(SerializeDeserializeUtil.readInt(buffer));
- }
-
- if (schemaPartitionReplicaSets == null) {
- schemaPartitionReplicaSets = new HashMap<>();
- }
- int size = buffer.getInt();
- for (int i = 0; i < size; i++) {
- RegionReplicaSet schemaRegionReplicaSet = new RegionReplicaSet();
- schemaRegionReplicaSet.deserializeImpl(buffer);
- schemaPartitionReplicaSets.put(buffer.getInt(), schemaRegionReplicaSet);
+ partitionSlotsMap = new HashMap<>();
+ int storageGroupNum = buffer.getInt();
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = SerializeDeserializeUtil.readString(buffer);
+ partitionSlotsMap.put(storageGroup, new ArrayList<>());
+ int seriesPartitionSlotNum = buffer.getInt();
+ for (int j = 0; j < seriesPartitionSlotNum; j++) {
+ SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot();
+ seriesPartitionSlot.deserializeImpl(buffer);
+ partitionSlotsMap.get(storageGroup).add(seriesPartitionSlot);
+ }
}
}
- public String getStorageGroup() {
- return storageGroup;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ GetOrCreateSchemaPartitionPlan that = (GetOrCreateSchemaPartitionPlan) o;
+ return partitionSlotsMap.equals(that.partitionSlotsMap);
}
- public List<Integer> getSeriesPartitionSlots() {
- return seriesPartitionSlots;
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitionSlotsMap);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
index 53c95585e0..85d11bfa14 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
@@ -93,8 +94,7 @@ public class PlanExecutor {
case CreateRegions:
return regionInfoPersistence.createRegions((CreateRegionsPlan) plan);
case CreateSchemaPartition:
- return partitionInfoPersistence.createSchemaPartition(
- (GetOrCreateSchemaPartitionPlan) plan);
+ return partitionInfoPersistence.createSchemaPartition((CreateSchemaPartitionPlan) plan);
case CreateDataPartition:
return partitionInfoPersistence.createDataPartition((CreateDataPartitionPlan) plan);
case CREATE_USER:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index a4392a1716..6bcb05877e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.service.thrift.server;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
@@ -44,14 +46,17 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.db.auth.AuthException;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
@@ -97,6 +102,9 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
SetStorageGroupPlan plan =
new SetStorageGroupPlan(new StorageGroupSchema(req.getStorageGroup()));
+ // TODO: Set TTL by optional field TSetStorageGroupReq.TTL
+ plan.getSchema().setTTL(ConfigNodeDescriptor.getInstance().getConf().getDefaultTTL());
+
return configManager.setStorageGroup(plan);
}
@@ -107,48 +115,44 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
}
@Override
- public TStorageGroupMessageResp getStorageGroupsMessage() throws TException {
+ public TSStatus setTTL(TSetTTLReq req) throws TException {
+ // TODO: Set TTL
+ return null;
+ }
+
+ @Override
+ public TStorageGroupSchemaResp getStorageGroupsSchema() throws TException {
StorageGroupSchemaDataSet dataSet =
(StorageGroupSchemaDataSet) configManager.getStorageGroupSchema();
- TStorageGroupMessageResp resp = new TStorageGroupMessageResp();
- dataSet.convertToRPCStorageGroupMessageResp(resp);
+ TStorageGroupSchemaResp resp = new TStorageGroupSchemaResp();
+ dataSet.convertToRPCStorageGroupSchemaResp(resp);
return resp;
}
@Override
public TSchemaPartitionResp getSchemaPartition(TSchemaPartitionReq req) throws TException {
- // TODO: Get SchemaPartition by specific PatternTree
-
- // SchemaPartitionPlan querySchemaPartitionPlan =
- // new SchemaPartitionPlan(
- // PhysicalPlanType.QuerySchemaPartition, req.getStorageGroup(),
- // req.getDeviceGroupIDs());
- // DataSet dataSet = configManager.getSchemaPartition(querySchemaPartitionPlan);
- // return ((SchemaPartitionDataSet) dataSet).convertRpcSchemaPartitionInfo();
- return null;
+ PathPatternTree patternTree =
+ PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
+ SchemaPartitionDataSet dataSet =
+ (SchemaPartitionDataSet) configManager.getSchemaPartition(patternTree);
+
+ TSchemaPartitionResp resp = new TSchemaPartitionResp();
+ dataSet.convertToRpcSchemaPartitionResp(resp);
+ return resp;
}
@Override
public TSchemaPartitionResp getOrCreateSchemaPartition(TSchemaPartitionReq req)
throws TException {
- // TODO: Get or create SchemaPartition by specific PatternTree
-
- // SchemaPartitionPlan applySchemaPartitionPlan =
- // new SchemaPartitionPlan(
- // PhysicalPlanType.ApplySchemaPartition,
- // req.getStorageGroup(),
- // req.getSeriesPartitionSlots());
- // SchemaPartitionDataSet dataSet =
- // (SchemaPartitionDataSet) configManager.applySchemaPartition(applySchemaPartitionPlan);
- //
- // TSchemaPartitionResp resp = new TSchemaPartitionResp();
- // resp.setStatus(dataSet.getStatus());
- // if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // dataSet.convertToRpcSchemaPartitionResp(resp);
- // }
- // return resp;
- return null;
+ PathPatternTree patternTree =
+ PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
+ SchemaPartitionDataSet dataSet =
+ (SchemaPartitionDataSet) configManager.getOrCreateSchemaPartition(patternTree);
+
+ TSchemaPartitionResp resp = new TSchemaPartitionResp();
+ dataSet.convertToRpcSchemaPartitionResp(resp);
+ return resp;
}
@Override
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
index 9a5bdf83e3..696d85a538 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -129,11 +129,11 @@ public class RatisConsensusDemo {
TimeUnit.SECONDS.sleep(1);
for (int i = 0; i < 3; i++) {
- TStorageGroupMessageResp msgMap = clients[i].getStorageGroupsMessage();
+ TStorageGroupSchemaResp msgMap = clients[i].getStorageGroupsSchema();
System.out.printf(
"\nQuery StorageGroup message from ConfigNode 0.0.0.0:%d. Result: {\n", 22277 + i * 2);
- for (Map.Entry<String, TStorageGroupMessage> entry :
- msgMap.getStorageGroupMessageMap().entrySet()) {
+ for (Map.Entry<String, TStorageGroupSchema> entry :
+ msgMap.getStorageGroupSchemaMap().entrySet()) {
System.out.printf(" Key(%s)=%s\n", entry.getKey(), entry.getValue().toString());
}
System.out.println("}");
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
index 1b935f79fb..4f668d407b 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
@@ -18,7 +18,8 @@
*/
package org.apache.iotdb.confignode.manager.hash;
-import org.apache.iotdb.confignode.manager.ConsensusManager;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.PartitionManager;
import java.io.IOException;
import java.util.ArrayList;
@@ -60,7 +61,7 @@ public class DeviceGroupHashExecutorManualTest {
}
public void GeneralIndexTest() throws IOException {
- ConsensusManager manager = new ConsensusManager();
+ PartitionManager manager = new PartitionManager(new ConfigManager());
int[] bucket = new int[deviceGroupCount];
Arrays.fill(bucket, 0);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
index 0e33e3eaf0..0878cf483e 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
@@ -28,7 +28,9 @@ import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
@@ -110,13 +112,42 @@ public class SerializeDeserializeUT {
}
@Test
- public void CreateSchemaPartitionPlanTest() {
- // TODO: Add serialize and deserialize test
+ public void CreateSchemaPartitionPlanTest() throws IOException {
+ String storageGroup = "root.sg0";
+ SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot(10);
+ RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+ regionReplicaSet.setConsensusGroupId(new SchemaRegionId(0));
+ regionReplicaSet.setDataNodeList(
+ Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
+
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> assignedSchemaPartition =
+ new HashMap<>();
+ assignedSchemaPartition.put(storageGroup, new HashMap<>());
+ assignedSchemaPartition.get(storageGroup).put(seriesPartitionSlot, regionReplicaSet);
+
+ CreateSchemaPartitionPlan plan0 = new CreateSchemaPartitionPlan();
+ plan0.setAssignedSchemaPartition(assignedSchemaPartition);
+ plan0.serialize(buffer);
+ CreateSchemaPartitionPlan plan1 =
+ (CreateSchemaPartitionPlan) PhysicalPlan.Factory.create(buffer);
+ Assert.assertEquals(plan0, plan1);
}
@Test
- public void GetOrCreateSchemaPartitionPlanTest() {
- // TODO: Add serialize and deserialize test
+ public void GetOrCreateSchemaPartitionPlanTest() throws IOException {
+ String storageGroup = "root.sg0";
+ SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot(10);
+
+ Map<String, List<SeriesPartitionSlot>> partitionSlotsMap = new HashMap<>();
+ partitionSlotsMap.put(storageGroup, Collections.singletonList(seriesPartitionSlot));
+
+ GetOrCreateSchemaPartitionPlan plan0 =
+ new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetOrCreateSchemaPartition);
+ plan0.setPartitionSlotsMap(partitionSlotsMap);
+ plan0.serialize(buffer);
+ GetOrCreateSchemaPartitionPlan plan1 =
+ (GetOrCreateSchemaPartitionPlan) PhysicalPlan.Factory.create(buffer);
+ Assert.assertEquals(plan0, plan1);
}
@Test
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index 6db4d61a20..7ed1fe2a2c 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
@@ -35,10 +38,15 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.ratis.util.FileUtils;
import org.apache.thrift.TException;
@@ -49,6 +57,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -167,9 +176,9 @@ public class ConfigNodeRPCServerProcessorTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// query StorageGroupSchema
- TStorageGroupMessageResp resp = processor.getStorageGroupsMessage();
+ TStorageGroupSchemaResp resp = processor.getStorageGroupsSchema();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
- Map<String, TStorageGroupMessage> msgMap = resp.getStorageGroupMessageMap();
+ Map<String, TStorageGroupSchema> msgMap = resp.getStorageGroupSchemaMap();
Assert.assertEquals(1, msgMap.size());
Assert.assertNotNull(msgMap.get(sg));
Assert.assertEquals(sg, msgMap.get(sg).getStorageGroup());
@@ -180,58 +189,42 @@ public class ConfigNodeRPCServerProcessorTest {
TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode(), status.getCode());
}
- // TODO: Reuse this test after PatterTree is moved to node-commons
- public void applySchemaPartitionTest() throws TException, IOException {
- TSStatus status;
- final String sg = "root.sg0";
+ /** Generate a PatternTree and serialize it into a ByteBuffer */
+ private ByteBuffer generatePatternTreeBuffer(String[] paths)
+ throws IllegalPathException, IOException {
+ PathPatternTree patternTree = new PathPatternTree();
+ for (String path : paths) {
+ patternTree.appendPath(new PartialPath(path));
+ }
+ patternTree.constructTree();
- // failed because there are not enough DataNodes
- TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg);
- status = processor.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), status.getCode());
- Assert.assertEquals("DataNode is not enough, please register more.", status.getMessage());
+ PublicBAOS baos = new PublicBAOS();
+ patternTree.serialize(baos);
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
- // register DataNodes
- TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
- TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
- TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
- status = processor.registerDataNode(registerReq0).getStatus();
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.registerDataNode(registerReq1).getStatus();
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.registerDataNode(registerReq2).getStatus();
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ @Test
+ public void getAndCreateSchemaPartitionTest()
+ throws TException, IOException, IllegalPathException {
+ final String sg = "root.sg";
+ final String sg0 = "root.sg0";
+ final String sg1 = "root.sg1";
- // set StorageGroup
- status = processor.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ final String d00 = sg0 + ".d0.s";
+ final String d01 = sg0 + ".d1.s";
+ final String d10 = sg1 + ".d0.s";
+ final String d11 = sg1 + ".d1.s";
- // applySchemaPartition
- TSchemaPartitionReq getSchemaPartitionReq = new TSchemaPartitionReq();
- List<Integer> deviceGroupIds = new ArrayList<>();
- Integer deviceGroupId = 1000;
- deviceGroupIds.add(deviceGroupId);
- // getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
- // SchemaPartitionInfo schemaPartitionInfo =
- // processor.applySchemaPartition(getSchemaPartitionReq);
- // Assert.assertNotNull(schemaPartitionInfo);
- // Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
- // schemaPartitionInfo
- // .getSchemaRegionDataNodesMap()
- // .get(sg)
- // .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
- }
+ final String allPaths = "root.**";
+ final String allSg0 = "root.sg0.**";
+ final String allSg1 = "root.sg1.**";
- // TODO: Reuse this test after PatterTree is moved to node-commons
- public void getSchemaPartitionTest() throws TException, IOException {
TSStatus status;
- final String sg = "root.sg0";
+ ByteBuffer buffer;
+ TSchemaPartitionReq schemaPartitionReq;
+ TSchemaPartitionResp schemaPartitionResp;
- // failed because there are not enough DataNodes
- TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg);
- status = processor.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), status.getCode());
- Assert.assertEquals("DataNode is not enough, please register more.", status.getMessage());
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap;
// register DataNodes
TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
@@ -244,56 +237,132 @@ public class ConfigNodeRPCServerProcessorTest {
status = processor.registerDataNode(registerReq2).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- // set StorageGroup
- status = processor.setStorageGroup(setReq);
+ // Set StorageGroups
+ status = processor.setStorageGroup(new TSetStorageGroupReq(sg0));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ status = processor.setStorageGroup(new TSetStorageGroupReq(sg1));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- // getSchemaPartition
- TSchemaPartitionReq getSchemaPartitionReq = new TSchemaPartitionReq();
- List<Integer> deviceGroupIds = new ArrayList<>();
- Integer deviceGroupId = 1000;
- deviceGroupIds.add(deviceGroupId);
- // getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
- // SchemaPartitionInfo schemaPartitionInfo =
- // processor.getSchemaPartition(getSchemaPartitionReq);
- // Assert.assertNotNull(schemaPartitionInfo);
- // Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
- //
- // // because does not apply schema partition, so schema partition is null
- //
- // Assert.assertNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
- //
- // // applySchemaPartition
- // deviceGroupIds.add(deviceGroupId);
- // getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
- // schemaPartitionInfo = processor.applySchemaPartition(getSchemaPartitionReq);
- // Assert.assertNotNull(schemaPartitionInfo);
- // Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
- // schemaPartitionInfo
- // .getSchemaRegionDataNodesMap()
- // .get(sg)
- // .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
- //
- // // getSchemaPartition twice
- // getSchemaPartitionReq = new GetSchemaPartitionReq();
- // deviceGroupIds = new ArrayList<>();
- // deviceGroupIds.add(deviceGroupId);
- // getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
- // schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
- // Assert.assertNotNull(schemaPartitionInfo);
- // Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
- //
- // // because apply schema partition, so schema partition is not null
- // Assert.assertNotNull(
- // schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+ // Test getSchemaPartition, the result should be empty
+ buffer = generatePatternTreeBuffer(new String[] {d00, d01, allSg1});
+ schemaPartitionReq = new TSchemaPartitionReq(buffer);
+ schemaPartitionResp = processor.getSchemaPartition(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), schemaPartitionResp.getStatus().getCode());
+ Assert.assertEquals(0, schemaPartitionResp.getSchemaRegionMapSize());
+
+ // Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartitions and return
+ buffer = generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
+ schemaPartitionReq.setPathPatternTree(buffer);
+ schemaPartitionResp = processor.getOrCreateSchemaPartition(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), schemaPartitionResp.getStatus().getCode());
+ Assert.assertEquals(2, schemaPartitionResp.getSchemaRegionMapSize());
+ schemaPartitionMap = schemaPartitionResp.getSchemaRegionMap();
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(schemaPartitionMap.containsKey(sg + i));
+ Assert.assertEquals(2, schemaPartitionMap.get(sg + i).size());
+ schemaPartitionMap
+ .get(sg + i)
+ .forEach(
+ (tSeriesPartitionSlot, tRegionReplicaSet) -> {
+ Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
+ ConsensusGroupId regionId = null;
+ try {
+ regionId =
+ ConsensusGroupId.Factory.create(
+ ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
+ } catch (IOException ignore) {
+ // Ignore
+ }
+ Assert.assertTrue(regionId instanceof SchemaRegionId);
+ });
+ }
+
+ // Test getSchemaPartition, when a device path doesn't match any StorageGroup and including
+ // "**",
+ // ConfigNode will return all the SchemaPartitions
+ buffer = generatePatternTreeBuffer(new String[] {allPaths});
+ schemaPartitionReq.setPathPatternTree(buffer);
+ schemaPartitionResp = processor.getSchemaPartition(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), schemaPartitionResp.getStatus().getCode());
+ Assert.assertEquals(2, schemaPartitionResp.getSchemaRegionMapSize());
+ schemaPartitionMap = schemaPartitionResp.getSchemaRegionMap();
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(schemaPartitionMap.containsKey(sg + i));
+ Assert.assertEquals(2, schemaPartitionMap.get(sg + i).size());
+ schemaPartitionMap
+ .get(sg + i)
+ .forEach(
+ (tSeriesPartitionSlot, tRegionReplicaSet) -> {
+ Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
+ ConsensusGroupId regionId = null;
+ try {
+ regionId =
+ ConsensusGroupId.Factory.create(
+ ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
+ } catch (IOException ignore) {
+ // Ignore
+ }
+ Assert.assertTrue(regionId instanceof SchemaRegionId);
+ });
+ }
+
+ // Test getSchemaPartition, when a device path matches with a StorageGroup and end with "*",
+ // ConfigNode will return all the SchemaPartitions in this StorageGroup
+ buffer = generatePatternTreeBuffer(new String[] {allSg0, d11});
+ schemaPartitionReq.setPathPatternTree(buffer);
+ schemaPartitionResp = processor.getSchemaPartition(schemaPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), schemaPartitionResp.getStatus().getCode());
+ Assert.assertEquals(2, schemaPartitionResp.getSchemaRegionMapSize());
+ schemaPartitionMap = schemaPartitionResp.getSchemaRegionMap();
+ // Check "root.sg0"
+ Assert.assertTrue(schemaPartitionMap.containsKey(sg0));
+ Assert.assertEquals(2, schemaPartitionMap.get(sg0).size());
+ schemaPartitionMap
+ .get(sg0)
+ .forEach(
+ (tSeriesPartitionSlot, tRegionReplicaSet) -> {
+ Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
+ ConsensusGroupId regionId = null;
+ try {
+ regionId =
+ ConsensusGroupId.Factory.create(
+ ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
+ } catch (IOException ignore) {
+ // Ignore
+ }
+ Assert.assertTrue(regionId instanceof SchemaRegionId);
+ });
+ // Check "root.sg1"
+ Assert.assertTrue(schemaPartitionMap.containsKey(sg1));
+ Assert.assertEquals(1, schemaPartitionMap.get(sg1).size());
+ schemaPartitionMap
+ .get(sg1)
+ .forEach(
+ (tSeriesPartitionSlot, tRegionReplicaSet) -> {
+ Assert.assertEquals(3, tRegionReplicaSet.getEndpointSize());
+ ConsensusGroupId regionId = null;
+ try {
+ regionId =
+ ConsensusGroupId.Factory.create(
+ ByteBuffer.wrap(tRegionReplicaSet.getRegionId()));
+ } catch (IOException ignore) {
+ // Ignore
+ }
+ Assert.assertTrue(regionId instanceof SchemaRegionId);
+ });
}
private Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- constructPartitionSlotsMap(int sgNum, int seriesPartitionSlotNum, long timePartitionSlotNum) {
+ constructPartitionSlotsMap(
+ int storageGroupNum, int seriesPartitionSlotNum, long timePartitionSlotNum) {
final String sg = "root.sg";
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result = new HashMap<>();
- for (int i = 0; i < sgNum; i++) {
+ for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
result.put(storageGroup, new HashMap<>());
for (int j = 0; j < seriesPartitionSlotNum; j++) {
@@ -310,14 +379,14 @@ public class ConfigNodeRPCServerProcessorTest {
}
private void checkDataPartitionMap(
- int sgNum,
+ int storageGroupNum,
int seriesPartitionSlotNum,
long timePartitionSlotNum,
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
dataPartitionMap) {
final String sg = "root.sg";
- Assert.assertEquals(sgNum, dataPartitionMap.size());
- for (int i = 0; i < sgNum; i++) {
+ Assert.assertEquals(storageGroupNum, dataPartitionMap.size());
+ for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = sg + i;
Assert.assertTrue(dataPartitionMap.containsKey(storageGroup));
Assert.assertEquals(seriesPartitionSlotNum, dataPartitionMap.get(storageGroup).size());
@@ -342,6 +411,22 @@ public class ConfigNodeRPCServerProcessorTest {
.get(seriesPartitionSlot)
.get(timePartitionSlot)
.size());
+ // Is DataRegion
+ ConsensusGroupId regionId = null;
+ try {
+ regionId =
+ ConsensusGroupId.Factory.create(
+ ByteBuffer.wrap(
+ dataPartitionMap
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .get(timePartitionSlot)
+ .get(0)
+ .getRegionId()));
+ } catch (IOException ignore) {
+ // Ignore
+ }
+ Assert.assertTrue(regionId instanceof DataRegionId);
// Including three RegionReplica
Assert.assertEquals(
3,
@@ -358,7 +443,14 @@ public class ConfigNodeRPCServerProcessorTest {
@Test
public void getAndCreateDataPartitionTest() throws TException {
+ final String sg = "root.sg";
+ final int storageGroupNum = 2;
+ final int seriesPartitionSlotNum = 4;
+ final long timePartitionSlotNum = 6;
+
TSStatus status;
+ TDataPartitionReq dataPartitionReq;
+ TDataPartitionResp dataPartitionResp;
// register DataNodes
TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
@@ -371,40 +463,35 @@ public class ConfigNodeRPCServerProcessorTest {
status = processor.registerDataNode(registerReq2).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- final String sg = "root.sg";
- final int sgNum = 2;
- final int seriesPartitionSlotNum = 4;
- final long timePartitionSlotNum = 6;
-
// Prepare partitionSlotsMap
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap0 =
- constructPartitionSlotsMap(sgNum, seriesPartitionSlotNum, timePartitionSlotNum);
+ constructPartitionSlotsMap(storageGroupNum, seriesPartitionSlotNum, timePartitionSlotNum);
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap1 =
- constructPartitionSlotsMap(sgNum * 2, seriesPartitionSlotNum * 2, timePartitionSlotNum * 2);
+ constructPartitionSlotsMap(
+ storageGroupNum * 2, seriesPartitionSlotNum * 2, timePartitionSlotNum * 2);
// set StorageGroups
- for (int i = 0; i < sgNum; i++) {
+ for (int i = 0; i < storageGroupNum; i++) {
TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg + i);
status = processor.setStorageGroup(setReq);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
}
// Test getDataPartition, the result should be empty
- TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
- dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap0);
- TDataPartitionResp dataPartitionResp = processor.getDataPartition(dataPartitionReq);
+ dataPartitionReq = new TDataPartitionReq(partitionSlotsMap0);
+ dataPartitionResp = processor.getDataPartition(dataPartitionReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
Assert.assertEquals(0, dataPartitionResp.getDataPartitionMapSize());
- // Test getOrCreateDataPartition, ConfigNode should create DataPartition for PartitionSlots
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
dataPartitionResp = processor.getOrCreateDataPartition(dataPartitionReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
checkDataPartitionMap(
- sgNum,
+ storageGroupNum,
seriesPartitionSlotNum,
timePartitionSlotNum,
dataPartitionResp.getDataPartitionMap());
@@ -416,7 +503,7 @@ public class ConfigNodeRPCServerProcessorTest {
TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
checkDataPartitionMap(
- sgNum,
+ storageGroupNum,
seriesPartitionSlotNum,
timePartitionSlotNum,
dataPartitionResp.getDataPartitionMap());
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index e16f878c59..3f677ce2c9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -101,8 +101,8 @@ public class DataPartition {
*
* @param partitionSlotsMap Map<StorageGroupName, Map<SeriesPartitionSlot,
* List<TimePartitionSlot>>>
- * @return Map<StorageGroupName, Map<SeriesPartitionSlot, Map<TimePartitionSlot,
- * List<RegionReplicaSet>>>>
+ * @return Subset of current DataPartition, including Map<StorageGroupName,
+ * Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
*/
public DataPartition getDataPartition(
Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index 9d428ccedf..d4af85106b 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -18,18 +18,23 @@
*/
package org.apache.iotdb.commons.partition;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class SchemaPartition {
- // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
+ // Map<StorageGroup, Map<SeriesPartitionSlot, SchemaRegionPlaceInfo>>
private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
public SchemaPartition() {
- schemaPartitionMap = new HashMap<>();
+ // Empty constructor
+ }
+
+ public SchemaPartition(
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap) {
+ this.schemaPartitionMap = schemaPartitionMap;
}
public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getSchemaPartitionMap() {
@@ -41,47 +46,88 @@ public class SchemaPartition {
this.schemaPartitionMap = schemaPartitionMap;
}
- public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getSchemaPartition(
- String storageGroup, List<Integer> seriesPartitionSlots) {
- Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> storageGroupMap = new HashMap<>();
- Map<SeriesPartitionSlot, RegionReplicaSet> deviceGroupMap = new HashMap<>();
- seriesPartitionSlots.forEach(
- deviceGroupID -> {
- if (schemaPartitionMap.get(storageGroup) != null
- && schemaPartitionMap
- .get(storageGroup)
- .containsKey(new SeriesPartitionSlot(deviceGroupID))) {
- deviceGroupMap.put(
- new SeriesPartitionSlot(deviceGroupID),
- schemaPartitionMap.get(storageGroup).get(new SeriesPartitionSlot(deviceGroupID)));
- }
- });
- storageGroupMap.put(storageGroup, deviceGroupMap);
- return storageGroupMap;
- }
+ /* Interfaces for ConfigNode */
/**
- * Filter out unassigned SeriesPartitionSlots
+ * Get SchemaPartition by partitionSlotsMap
*
- * @param storageGroup storage group name
- * @param seriesPartitionSlots SeriesPartitionSlotIds
- * @return not assigned seriesPartitionSlots
+ * @param partitionSlotsMap Map<StorageGroup, List<SeriesPartitionSlot>>
+ * @return Subset of current SchemaPartition, including Map<StorageGroup, Map<SeriesPartitionSlot,
+ * RegionReplicaSet>>
*/
- public List<Integer> filterNoAssignedSeriesPartitionSlot(
- String storageGroup, List<Integer> seriesPartitionSlots) {
- if (!schemaPartitionMap.containsKey(storageGroup)) {
- return seriesPartitionSlots;
+ public SchemaPartition getSchemaPartition(
+ Map<String, List<SeriesPartitionSlot>> partitionSlotsMap) {
+ if (partitionSlotsMap.isEmpty()) {
+ // Return all SchemaPartitions when the partitionSlotsMap is empty
+ return new SchemaPartition(new HashMap<>(schemaPartitionMap));
+ } else {
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> result = new HashMap<>();
+
+ partitionSlotsMap.forEach(
+ (storageGroup, seriesPartitionSlots) -> {
+ if (schemaPartitionMap.containsKey(storageGroup)) {
+ if (seriesPartitionSlots.isEmpty()) {
+ // Return all SchemaPartitions in one StorageGroup when the queried
+ // SeriesPartitionSlots is empty
+ result.put(storageGroup, new HashMap<>(schemaPartitionMap.get(storageGroup)));
+ } else {
+ // Return the specific SchemaPartition
+ seriesPartitionSlots.forEach(
+ seriesPartitionSlot -> {
+ if (schemaPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
+ result
+ .computeIfAbsent(storageGroup, key -> new HashMap<>())
+ .put(
+ seriesPartitionSlot,
+ schemaPartitionMap.get(storageGroup).get(seriesPartitionSlot));
+ }
+ });
+ }
+ }
+ });
+
+ return new SchemaPartition(result);
}
- return seriesPartitionSlots.stream()
- .filter(
- id -> !schemaPartitionMap.get(storageGroup).containsKey(new SeriesPartitionSlot(id)))
- .collect(Collectors.toList());
}
- public void setSchemaRegionReplicaSet(
- String storageGroup, int deviceGroupId, RegionReplicaSet regionReplicaSet) {
+ /**
+ * Filter out unassigned PartitionSlots
+ *
+ * @param partitionSlotsMap Map<StorageGroupName, List<SeriesPartitionSlot>>
+ * @return Map<String, List<SeriesPartitionSlot>>, unassigned PartitionSlots
+ */
+ public Map<String, List<SeriesPartitionSlot>> filterNoAssignedSchemaPartitionSlot(
+ Map<String, List<SeriesPartitionSlot>> partitionSlotsMap) {
+ Map<String, List<SeriesPartitionSlot>> result = new HashMap<>();
+
+ partitionSlotsMap.forEach(
+ (storageGroup, seriesPartitionSlots) -> {
+ // Compare StorageGroup
+ if (!schemaPartitionMap.containsKey(storageGroup)) {
+ result.put(storageGroup, partitionSlotsMap.get(storageGroup));
+ } else {
+ seriesPartitionSlots.forEach(
+ seriesPartitionSlot -> {
+ // Compare SeriesPartitionSlot
+ if (!schemaPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) {
+ result
+ .computeIfAbsent(storageGroup, key -> new ArrayList<>())
+ .add(seriesPartitionSlot);
+ }
+ });
+ }
+ });
+
+ return result;
+ }
+
+ /** Create a SchemaPartition by ConfigNode */
+ public void createSchemaPartition(
+ String storageGroup,
+ SeriesPartitionSlot seriesPartitionSlot,
+ RegionReplicaSet regionReplicaSet) {
schemaPartitionMap
- .computeIfAbsent(storageGroup, value -> new HashMap<>())
- .put(new SeriesPartitionSlot(deviceGroupId), regionReplicaSet);
+ .computeIfAbsent(storageGroup, key -> new HashMap<>())
+ .put(seriesPartitionSlot, regionReplicaSet);
}
}
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 7d3c3b7038..951e863596 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -23,19 +23,22 @@ namespace py iotdb.thrift.confignode
// DataNode
struct TDataNodeRegisterReq {
- 1: required common.EndPoint endPoint
+ 1: required common.EndPoint endPoint
+ // Map<StorageGroupName, TStorageGroupSchema>
+ // DataNode can use statusMap to report its status to the ConfigNode when restart
+ 2: optional map<string, TStorageGroupSchema> statusMap
}
struct TGlobalConfig {
- 1: optional string dataNodeConsensusProtocolClass
- 2: optional i32 seriesPartitionSlotNum
- 3: optional string seriesPartitionExecutorClass
+ 1: optional string dataNodeConsensusProtocolClass
+ 2: optional i32 seriesPartitionSlotNum
+ 3: optional string seriesPartitionExecutorClass
}
struct TDataNodeRegisterResp {
- 1: required common.TSStatus status
- 2: optional i32 dataNodeID
- 3: optional TGlobalConfig globalConfig
+ 1: required common.TSStatus status
+ 2: optional i32 dataNodeID
+ 3: optional TGlobalConfig globalConfig
}
struct TDataNodeMessageResp {
@@ -51,39 +54,49 @@ struct TDataNodeMessage {
// StorageGroup
struct TSetStorageGroupReq {
- 1: required string storageGroup
- 2: optional i64 ttl
+ 1: required string storageGroup
+ 2: optional i64 TTL
}
struct TDeleteStorageGroupReq {
- 1: required string storageGroup
+ 1: required string storageGroup
+}
+
+struct TSetTTLReq {
+ 1: required string storageGroup
+ 2: required i64 TTL
}
-struct TStorageGroupMessageResp {
+struct TStorageGroupSchemaResp {
1: required common.TSStatus status
// map<string, StorageGroupMessage>
- 2: optional map<string, TStorageGroupMessage> storageGroupMessageMap
+ 2: optional map<string, TStorageGroupSchema> storageGroupSchemaMap
}
-struct TStorageGroupMessage {
- 1: required string storageGroup
+struct TStorageGroupSchema {
+ 1: required string storageGroup
+ 2: optional i64 TTL
+ // list<DataRegionId>
+ 3: optional list<binary> dataRegionGroupIds
+ // list<SchemaRegionId>
+ 4: optional list<binary> schemaRegionGroupIds
}
// Schema
struct TSchemaPartitionReq {
- 1: required binary pathPatternTree
+ 1: required binary pathPatternTree
}
struct TSchemaPartitionResp {
1: required common.TSStatus status
- // map<StorageGroupName, map<TSeriesPartitionSlot, TRegionReplicaSet>>
+ // map<StorageGroupName, map<TSeriesPartitionSlot, TRegionReplicaSet>>
2: optional map<string, map<common.TSeriesPartitionSlot, common.TRegionReplicaSet>> schemaRegionMap
}
// Data
struct TDataPartitionReq {
- // map<StorageGroupName, map<TSeriesPartitionSlot, list<TTimePartitionSlot>>>
- 1: required map<string, map<common.TSeriesPartitionSlot, list<common.TTimePartitionSlot>>> partitionSlotsMap
+ // map<StorageGroupName, map<TSeriesPartitionSlot, list<TTimePartitionSlot>>>
+ 1: required map<string, map<common.TSeriesPartitionSlot, list<common.TTimePartitionSlot>>> partitionSlotsMap
}
struct TDataPartitionResp {
@@ -94,13 +107,13 @@ struct TDataPartitionResp {
// Authorize
struct TAuthorizerReq {
- 1: required i32 authorType
- 2: required string userName
- 3: required string roleName
- 4: required string password
- 5: required string newPassword
- 6: required set<i32> permissions
- 7: required string nodeName
+ 1: required i32 authorType
+ 2: required string userName
+ 3: required string roleName
+ 4: required string password
+ 5: required string newPassword
+ 6: required set<i32> permissions
+ 7: required string nodeName
}
service ConfigIService {
@@ -117,7 +130,9 @@ service ConfigIService {
common.TSStatus deleteStorageGroup(TDeleteStorageGroupReq req)
- TStorageGroupMessageResp getStorageGroupsMessage()
+ common.TSStatus setTTL(TSetTTLReq req)
+
+ TStorageGroupSchemaResp getStorageGroupsSchema()
/* Schema */
diff --git a/thrift/src/main/thrift/common.thrift b/thrift/src/main/thrift/common.thrift
index 8a8e61999f..444c598e2e 100644
--- a/thrift/src/main/thrift/common.thrift
+++ b/thrift/src/main/thrift/common.thrift
@@ -20,29 +20,28 @@
namespace java org.apache.iotdb.common.rpc.thrift
namespace py iotdb.thrift.common
- struct EndPoint {
- 1: required string ip
- 2: required i32 port
- }
+struct EndPoint {
+ 1: required string ip
+ 2: required i32 port
+}
- // The return status code and message in each response.
- struct TSStatus {
- 1: required i32 code
- 2: optional string message
- 3: optional list<TSStatus> subStatus
- 4: optional EndPoint redirectNode
- }
+// The return status code and message in each response.
+struct TSStatus {
+ 1: required i32 code
+ 2: optional string message
+ 3: optional list<TSStatus> subStatus
+ 4: optional EndPoint redirectNode
+}
- struct TRegionReplicaSet {
- 1: required i32 regionId
- 2: required string groupType
- 3: required list<EndPoint> endpoint
- }
+struct TRegionReplicaSet {
+ 1: required binary regionId
+ 2: required list<EndPoint> endpoint
+}
- struct TSeriesPartitionSlot {
- 1: required i32 slotId
- }
+struct TSeriesPartitionSlot {
+ 1: required i32 slotId
+}
- struct TTimePartitionSlot {
- 1: required i64 startTime
- }
\ No newline at end of file
+struct TTimePartitionSlot {
+ 1: required i64 startTime
+}
\ No newline at end of file