You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/11 07:58:04 UTC
[iotdb] branch master updated: [IOTDB-2687] Base partition policy of data (#5464)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 703a64d2da [IOTDB-2687] Base partition policy of data (#5464)
703a64d2da is described below
commit 703a64d2da7823e755b0b26b2d4cbfb26ad850b6
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Mon Apr 11 15:57:58 2022 +0800
[IOTDB-2687] Base partition policy of data (#5464)
---
confignode/src/assembly/confignode.xml | 9 +-
.../resources/conf/iotdb-confignode.properties | 20 +-
.../iotdb/confignode/conf/ConfigNodeConf.java | 25 +-
.../iotdb/confignode/conf/ConfigNodeConfCheck.java | 29 +-
.../confignode/conf/ConfigNodeDescriptor.java | 8 +-
...aSet.java => DataNodeConfigurationDataSet.java} | 32 +-
.../consensus/response/DataNodesInfoDataSet.java | 12 +-
.../consensus/response/DataPartitionDataSet.java | 104 +++++-
.../consensus/response/SchemaPartitionDataSet.java | 92 ++---
.../response/StorageGroupSchemaDataSet.java | 11 +-
.../iotdb/confignode/manager/ConfigManager.java | 87 +++--
.../iotdb/confignode/manager/ConsensusManager.java | 33 +-
.../iotdb/confignode/manager/DataNodeManager.java | 87 ++---
.../apache/iotdb/confignode/manager/Manager.java | 88 ++---
.../iotdb/confignode/manager/PartitionManager.java | 193 +++++++----
.../iotdb/confignode/manager/RegionManager.java | 126 +++----
.../iotdb/confignode/partition/DataRegionInfo.java | 66 ----
.../confignode/partition/SchemaRegionInfo.java | 59 ----
.../confignode/partition/StorageGroupSchema.java | 77 +++--
.../persistence/DataNodeInfoPersistence.java | 122 ++++---
.../persistence/PartitionInfoPersistence.java | 145 +++++---
.../persistence/RegionInfoPersistence.java | 210 ++++++------
.../iotdb/confignode/physical/PhysicalPlan.java | 32 +-
.../confignode/physical/PhysicalPlanType.java | 12 +-
.../physical/crud/CreateDataPartitionPlan.java | 128 +++++++
.../physical/crud/CreateRegionsPlan.java | 96 ++++++
.../physical/crud/CreateSchemaPartitionPlan.java | 27 +-
.../crud/GetOrCreateDataPartitionPlan.java | 139 ++++++++
.../GetOrCreateSchemaPartitionPlan.java} | 50 +--
.../confignode/physical/sys/DataPartitionPlan.java | 78 -----
.../physical/sys/QueryDataNodeInfoPlan.java | 14 +
.../physical/sys/RegisterDataNodePlan.java | 22 +-
.../physical/sys/SetStorageGroupPlan.java | 43 +--
.../confignode/service/executor/PlanExecutor.java | 27 +-
.../server/ConfigNodeRPCServerProcessor.java | 246 +++++++-------
.../confignode/consensus/RatisConsensusDemo.java | 27 +-
.../manager/ConfigManagerManualTest.java | 29 +-
.../hash/DeviceGroupHashExecutorManualTest.java | 2 +-
.../physical/SerializeDeserializeUT.java | 168 +++++++++
.../server/ConfigNodeRPCServerProcessorTest.java | 377 +++++++++++++++------
.../{partition => cluster}/DataNodeLocation.java | 51 +--
.../iotdb/commons/partition/DataPartition.java | 128 ++++++-
.../iotdb/commons/partition/RegionReplicaSet.java | 1 +
.../iotdb/commons/partition/SchemaPartition.java | 49 ++-
.../commons/partition/SeriesPartitionSlot.java | 35 +-
.../iotdb/commons/partition/TimePartitionSlot.java | 33 ++
.../executor/SeriesPartitionExecutor.java} | 16 +-
.../executor}/hash/APHashExecutor.java | 11 +-
.../executor}/hash/BKDRHashExecutor.java | 11 +-
.../executor}/hash/JSHashExecutor.java | 11 +-
.../executor}/hash/SDBMHashExecutor.java | 11 +-
.../mpp/sql/analyze/FakePartitionFetcherImpl.java | 1 +
.../java/org/apache/iotdb/db/service/DataNode.java | 10 +-
.../db/mpp/sql/plan/DistributionPlannerTest.java | 2 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 7 +-
.../src/main/thrift/confignode.thrift | 137 ++++----
thrift/src/main/thrift/common.thrift | 13 +-
thrift/src/main/thrift/management.thrift | 4 +-
58 files changed, 2348 insertions(+), 1335 deletions(-)
diff --git a/confignode/src/assembly/confignode.xml b/confignode/src/assembly/confignode.xml
index 55707485db..5982b6cdd8 100644
--- a/confignode/src/assembly/confignode.xml
+++ b/confignode/src/assembly/confignode.xml
@@ -33,8 +33,13 @@
</dependencySets>
<fileSets>
<fileSet>
- <directory>src/assembly/resources</directory>
- <outputDirectory>${file.separator}</outputDirectory>
+ <directory>src/assembly/resources/conf</directory>
+ <outputDirectory>conf</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/assembly/resources/sbin</directory>
+ <outputDirectory>sbin</outputDirectory>
+ <fileMode>0755</fileMode>
</fileSet>
</fileSets>
</assembly>
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 3aaf61eddc..63703ec172 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -78,23 +78,23 @@ config_node_rpc_port=22277
# data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
####################
-### DeviceGroup Configuration
+### SeriesPartitionSlot Configuration
####################
-# Number of DeviceGroups per StorageGroup
+# Number of SeriesPartitionSlots per StorageGroup
# Datatype: int
-# device_group_count=10000
+# series_partition_slot_num=10000
-# DeviceGroup hash algorithm
+# SeriesPartitionSlot executor class
# These hashing algorithms are currently supported:
-# 1. org.apache.iotdb.commons.hash.BKDRHashExecutor(Default)
-# 2. org.apache.iotdb.commons.hash.APHashExecutor
-# 3. org.apache.iotdb.commons.hash.JSHashExecutor
-# 4. org.apache.iotdb.commons.hash.SDBMHashExecutor
-# Also, if you want to implement your own hash algorithm, you can inherit the DeviceGroupHashExecutor class and
+# 1. BKDRHashExecutor(Default)
+# 2. APHashExecutor
+# 3. JSHashExecutor
+# 4. SDBMHashExecutor
+# Also, if you want to implement your own SeriesPartition executor, you can inherit the SeriesPartitionExecutor class and
# modify this parameter to correspond to your Java class
# Datatype: String
-# device_group_hash_executor_class=org.apache.iotdb.commons.hash.BKDRHashExecutor
+# series_partition_executor_class=org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor
####################
### Directory Configuration
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 0f863f8841..09c1a05649 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -46,11 +46,12 @@ public class ConfigNodeConf {
private Endpoint[] configNodeGroupAddressList =
Collections.singletonList(new Endpoint("0.0.0.0", 22278)).toArray(new Endpoint[0]);
- /** Number of DeviceGroups per StorageGroup */
- private int deviceGroupCount = 10000;
+ /** Number of SeriesPartitionSlots per StorageGroup */
+ private int seriesPartitionSlotNum = 10000;
- /** DeviceGroup hash executor class */
- private String deviceGroupHashExecutorClass = "org.apache.iotdb.commons.hash.BKDRHashExecutor";
+ /** SeriesPartitionSlot executor class */
+ private String seriesPartitionExecutorClass =
+ "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 65535;
@@ -115,20 +116,20 @@ public class ConfigNodeConf {
return dir;
}
- public int getDeviceGroupCount() {
- return deviceGroupCount;
+ public int getSeriesPartitionSlotNum() {
+ return seriesPartitionSlotNum;
}
- public void setDeviceGroupCount(int deviceGroupCount) {
- this.deviceGroupCount = deviceGroupCount;
+ public void setSeriesPartitionSlotNum(int seriesPartitionSlotNum) {
+ this.seriesPartitionSlotNum = seriesPartitionSlotNum;
}
- public String getDeviceGroupHashExecutorClass() {
- return deviceGroupHashExecutorClass;
+ public String getSeriesPartitionExecutorClass() {
+ return seriesPartitionExecutorClass;
}
- public void setDeviceGroupHashExecutorClass(String deviceGroupHashExecutorClass) {
- this.deviceGroupHashExecutorClass = deviceGroupHashExecutorClass;
+ public void setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
+ this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
}
public int getRpcMaxConcurrentClientNum() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
index 2a8c4957d6..895850c936 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
@@ -103,9 +103,10 @@ public class ConfigNodeConfCheck {
* Therefore, store them in iotdb-confignode-special.properties at the first startup
*/
private void writeSpecialProperties(File specialPropertiesFile) {
- specialProperties.setProperty("device_group_count", String.valueOf(conf.getDeviceGroupCount()));
specialProperties.setProperty(
- "device_group_hash_executor_class", conf.getDeviceGroupHashExecutorClass());
+ "series_partition_slot_num", String.valueOf(conf.getSeriesPartitionSlotNum()));
+ specialProperties.setProperty(
+ "series_partition_executor_class", conf.getSeriesPartitionExecutorClass());
try {
specialProperties.store(new FileOutputStream(specialPropertiesFile), "");
} catch (IOException e) {
@@ -116,26 +117,26 @@ public class ConfigNodeConfCheck {
/** Ensure that special parameters are consistent with each startup except the first one */
private void checkSpecialProperties() throws ConfigurationException {
- int specialDeviceGroupCount =
+ int specialSeriesPartitionSlotNum =
Integer.parseInt(
specialProperties.getProperty(
- "device_group_count", String.valueOf(conf.getDeviceGroupCount())));
- if (specialDeviceGroupCount != conf.getDeviceGroupCount()) {
+ "series_partition_slot_num", String.valueOf(conf.getSeriesPartitionSlotNum())));
+ if (specialSeriesPartitionSlotNum != conf.getSeriesPartitionSlotNum()) {
throw new ConfigurationException(
- "device_group_count",
- String.valueOf(conf.getDeviceGroupCount()),
- String.valueOf(specialDeviceGroupCount));
+ "series_partition_slot_num",
+ String.valueOf(conf.getSeriesPartitionSlotNum()),
+ String.valueOf(specialSeriesPartitionSlotNum));
}
- String specialDeviceGroupHashExecutorClass =
+ String specialSeriesPartitionSlotExecutorClass =
specialProperties.getProperty(
- "device_group_hash_executor_class", conf.getDeviceGroupHashExecutorClass());
+ "series_partition_executor_class", conf.getSeriesPartitionExecutorClass());
if (!Objects.equals(
- specialDeviceGroupHashExecutorClass, conf.getDeviceGroupHashExecutorClass())) {
+ specialSeriesPartitionSlotExecutorClass, conf.getSeriesPartitionExecutorClass())) {
throw new ConfigurationException(
- "device_group_hash_executor_class",
- conf.getDeviceGroupHashExecutorClass(),
- specialDeviceGroupHashExecutorClass);
+ "series_partition_executor_class",
+ conf.getSeriesPartitionExecutorClass(),
+ specialSeriesPartitionSlotExecutorClass);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 4f2590192d..29123554d7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -103,14 +103,14 @@ public class ConfigNodeDescriptor {
Properties properties = new Properties();
properties.load(inputStream);
- conf.setDeviceGroupCount(
+ conf.setSeriesPartitionSlotNum(
Integer.parseInt(
properties.getProperty(
- "device_group_count", String.valueOf(conf.getDeviceGroupCount()))));
+ "series_partition_slot_num", String.valueOf(conf.getSeriesPartitionSlotNum()))));
- conf.setDeviceGroupHashExecutorClass(
+ conf.setSeriesPartitionExecutorClass(
properties.getProperty(
- "device_group_hash_executor_class", conf.getDeviceGroupHashExecutorClass()));
+ "series_partition_executor_class", conf.getSeriesPartitionExecutorClass()));
conf.setRpcAddress(properties.getProperty("config_node_rpc_address", conf.getRpcAddress()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
similarity index 52%
copy from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
index 934f05016d..9e5d67f7ea 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeConfigurationDataSet.java
@@ -18,26 +18,36 @@
*/
package org.apache.iotdb.confignode.consensus.response;
-import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.consensus.common.DataSet;
-import java.util.List;
+public class DataNodeConfigurationDataSet implements DataSet {
-public class StorageGroupSchemaDataSet implements DataSet {
+ private TSStatus status;
+ private int dataNodeId;
+ private TGlobalConfig globalConfig;
- private List<StorageGroupSchema> schemaList;
+ public DataNodeConfigurationDataSet() {
+ // Empty constructor
+ }
- public StorageGroupSchemaDataSet() {}
+ public void setStatus(TSStatus status) {
+ this.status = status;
+ }
- public StorageGroupSchemaDataSet(List<StorageGroupSchema> schemaList) {
- this.schemaList = schemaList;
+ public void setDataNodeId(int dataNodeId) {
+ this.dataNodeId = dataNodeId;
}
- public List<StorageGroupSchema> getSchemaList() {
- return schemaList;
+ public void setGlobalConfig(TGlobalConfig globalConfig) {
+ this.globalConfig = globalConfig;
}
- public void setSchemaList(List<StorageGroupSchema> schemaList) {
- this.schemaList = schemaList;
+ public void convertToRpcDataNodeRegisterResp(TDataNodeRegisterResp resp) {
+ resp.setStatus(status);
+ resp.setDataNodeID(dataNodeId);
+ resp.setGlobalConfig(globalConfig);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
index 62d498f09e..11e3d2ce71 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodesInfoDataSet.java
@@ -18,19 +18,29 @@
*/
package org.apache.iotdb.confignode.consensus.response;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.consensus.common.DataSet;
import java.util.List;
public class DataNodesInfoDataSet implements DataSet {
+ private TSStatus status;
private List<DataNodeLocation> dataNodeList;
public DataNodesInfoDataSet() {
// empty constructor
}
+ public void setStatus(TSStatus status) {
+ this.status = status;
+ }
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
public void setDataNodeList(List<DataNodeLocation> dataNodeList) {
this.dataNodeList = dataNodeList;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
index a02f58a53f..222995501c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataPartitionDataSet.java
@@ -19,17 +19,111 @@
package org.apache.iotdb.confignode.consensus.response;
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class DataPartitionDataSet implements DataSet {
- private DataPartition dataPartitionInfo;
- public DataPartition getDataPartitionInfo() {
- return dataPartitionInfo;
+ private TSStatus status;
+
+ private DataPartition dataPartition;
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TSStatus status) {
+ this.status = status;
+ }
+
+ public DataPartition getDataPartition() {
+ return dataPartition;
+ }
+
+ public void setDataPartition(DataPartition dataPartition) {
+ this.dataPartition = dataPartition;
}
- public void setDataPartitionInfos(DataPartition dataPartitionInfo) {
- this.dataPartitionInfo = dataPartitionInfo;
+ /**
+ * Convert DataPartitionDataSet to TDataPartitionResp
+ *
+ * @param resp TDataPartitionResp
+ */
+ public void convertToRpcDataPartitionResp(TDataPartitionResp resp) {
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+
+ dataPartition
+ .getDataPartitionMap()
+ .forEach(
+ ((storageGroup, seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap) -> {
+ // Extract StorageGroupName
+ dataPartitionMap.putIfAbsent(storageGroup, new HashMap<>());
+
+ seriesPartitionSlotTimePartitionSlotRegionReplicaSetListMap.forEach(
+ ((seriesPartitionSlot, timePartitionSlotReplicaSetListMap) -> {
+ // Extract TSeriesPartitionSlot
+ TSeriesPartitionSlot tSeriesPartitionSlot =
+ new TSeriesPartitionSlot(seriesPartitionSlot.getSlotId());
+ dataPartitionMap
+ .get(storageGroup)
+ .putIfAbsent(tSeriesPartitionSlot, new HashMap<>());
+
+ // Extract Map<TimePartitionSlot, List<RegionReplicaSet>>
+ timePartitionSlotReplicaSetListMap.forEach(
+ ((timePartitionSlot, regionReplicaSets) -> {
+ // Extract TTimePartitionSlot
+ TTimePartitionSlot tTimePartitionSlot =
+ new TTimePartitionSlot(timePartitionSlot.getStartTime());
+ dataPartitionMap
+ .get(storageGroup)
+ .get(tSeriesPartitionSlot)
+ .putIfAbsent(tTimePartitionSlot, new ArrayList<>());
+
+ // Extract TRegionReplicaSets
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ TRegionReplicaSet tRegionReplicaSet = new TRegionReplicaSet();
+
+ // Set TRegionReplicaSet's RegionId
+ tRegionReplicaSet.setRegionId(regionReplicaSet.getId().getId());
+
+ // Set TRegionReplicaSet's GroupType
+ tRegionReplicaSet.setGroupType("DataRegion");
+
+ // Set TRegionReplicaSet's EndPoints
+ List<EndPoint> endPointList = new ArrayList<>();
+ regionReplicaSet
+ .getDataNodeList()
+ .forEach(
+ dataNodeLocation ->
+ endPointList.add(
+ new EndPoint(
+ dataNodeLocation.getEndPoint().getIp(),
+ dataNodeLocation.getEndPoint().getPort())));
+ tRegionReplicaSet.setEndpoint(endPointList);
+
+ dataPartitionMap
+ .get(storageGroup)
+ .get(tSeriesPartitionSlot)
+ .get(tTimePartitionSlot)
+ .add(tRegionReplicaSet);
+ });
+ }));
+ }));
+ }));
+
+ resp.setDataPartitionMap(dataPartitionMap);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
index 407e61a5cf..5c3e95ea3a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/SchemaPartitionDataSet.java
@@ -20,8 +20,10 @@
package org.apache.iotdb.confignode.consensus.response;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
-import org.apache.iotdb.common.rpc.thrift.RegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
import org.apache.iotdb.consensus.common.DataSet;
import java.util.ArrayList;
@@ -29,54 +31,64 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/** TODO: Reconstruct this class after PatterTree is moved to node-commons */
public class SchemaPartitionDataSet implements DataSet {
- private SchemaPartition schemaPartitionInfo;
- public SchemaPartition getSchemaPartitionInfo() {
- return schemaPartitionInfo;
+ private TSStatus status;
+
+ private SchemaPartition schemaPartition;
+
+ public SchemaPartitionDataSet() {
+ // empty constructor
}
- public void setSchemaPartitionInfo(SchemaPartition schemaPartitionInfos) {
- this.schemaPartitionInfo = schemaPartitionInfos;
+ public TSStatus getStatus() {
+ return status;
}
- public static org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo
- convertRpcSchemaPartition(SchemaPartition schemaPartitionInfo) {
- org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo rpcSchemaPartitionInfo =
- new org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo();
+ public void setStatus(TSStatus status) {
+ this.status = status;
+ }
- Map<String, Map<Integer, RegionReplicaSet>> schemaRegionReplicaSets = new HashMap<>();
+ public SchemaPartition getSchemaPartition() {
+ return schemaPartition;
+ }
- schemaPartitionInfo.getSchemaPartition().entrySet().stream()
+ public void setSchemaPartition(SchemaPartition schemaPartition) {
+ this.schemaPartition = schemaPartition;
+ }
+
+ public void convertToRpcSchemaPartitionResp(TSchemaPartitionResp resp) {
+ Map<String, Map<Integer, TRegionReplicaSet>> schemaRegionMap = new HashMap<>();
+
+ schemaPartition
+ .getSchemaPartitionMap()
.forEach(
- entity -> {
- schemaRegionReplicaSets.putIfAbsent(entity.getKey(), new HashMap<>());
- entity
- .getValue()
- .entrySet()
- .forEach(
- replica -> {
- RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
- regionReplicaSet.setRegionId(replica.getValue().getId().getId());
- List<EndPoint> endPoints = new ArrayList<>();
- replica
- .getValue()
- .getDataNodeList()
- .forEach(
- dataNode -> {
- EndPoint endPoint =
- new EndPoint(
- dataNode.getEndPoint().getIp(),
- dataNode.getEndPoint().getPort());
- endPoints.add(endPoint);
- });
- regionReplicaSet.setEndpoint(endPoints);
- schemaRegionReplicaSets
- .get(entity.getKey())
- .put(replica.getKey().getDeviceGroupId(), regionReplicaSet);
- });
+ (storageGroup, seriesPartitionSlotRegionReplicaSetMap) -> {
+ // Extract StorageGroupName
+ schemaRegionMap.putIfAbsent(storageGroup, new HashMap<>());
+
+ // Extract Map<SeriesPartitionSlot, RegionReplicaSet>
+ seriesPartitionSlotRegionReplicaSetMap.forEach(
+ ((seriesPartitionSlot, regionReplicaSet) -> {
+ TRegionReplicaSet regionMessage = new TRegionReplicaSet();
+ regionMessage.setRegionId(regionReplicaSet.getId().getId());
+ List<EndPoint> endPointList = new ArrayList<>();
+ regionReplicaSet
+ .getDataNodeList()
+ .forEach(
+ dataNodeLocation ->
+ endPointList.add(
+ new EndPoint(
+ dataNodeLocation.getEndPoint().getIp(),
+ dataNodeLocation.getEndPoint().getPort())));
+ regionMessage.setEndpoint(endPointList);
+ schemaRegionMap
+ .get(storageGroup)
+ .put(seriesPartitionSlot.getSlotId(), regionMessage);
+ }));
});
- rpcSchemaPartitionInfo.setSchemaRegionDataNodesMap(schemaRegionReplicaSets);
- return rpcSchemaPartitionInfo;
+
+ // resp.setSchemaRegionMap(schemaRegionMap);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
index 934f05016d..700d4acfb5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/StorageGroupSchemaDataSet.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.consensus.response;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
import org.apache.iotdb.consensus.common.DataSet;
@@ -25,12 +26,18 @@ import java.util.List;
public class StorageGroupSchemaDataSet implements DataSet {
+ private TSStatus status;
+
private List<StorageGroupSchema> schemaList;
public StorageGroupSchemaDataSet() {}
- public StorageGroupSchemaDataSet(List<StorageGroupSchema> schemaList) {
- this.schemaList = schemaList;
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TSStatus status) {
+ this.status = status;
}
public List<StorageGroupSchema> getSchemaList() {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a4fc5cac15..8457275a88 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -20,17 +20,17 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.conf.ConfigNodeConf;
-import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
+import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -38,7 +38,7 @@ import java.io.IOException;
/** Entry of all management, AssignPartitionManager,AssignRegionManager. */
public class ConfigManager implements Manager {
- private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
+
private static final TSStatus ERROR_TSSTATUS =
new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
@@ -74,15 +74,21 @@ public class ConfigManager implements Manager {
}
@Override
- public TSStatus registerDataNode(PhysicalPlan physicalPlan) {
+ public DataSet registerDataNode(PhysicalPlan physicalPlan) {
+
+ // TODO: Only leader can register DataNode
+
if (physicalPlan instanceof RegisterDataNodePlan) {
return dataNodeManager.registerDataNode((RegisterDataNodePlan) physicalPlan);
}
- return ERROR_TSSTATUS;
+ return new DataNodeConfigurationDataSet();
}
@Override
public DataSet getDataNodeInfo(PhysicalPlan physicalPlan) {
+
+ // TODO: Only leader can get DataNodeInfo
+
if (physicalPlan instanceof QueryDataNodeInfoPlan) {
return dataNodeManager.getDataNodeInfo((QueryDataNodeInfoPlan) physicalPlan);
}
@@ -91,11 +97,17 @@ public class ConfigManager implements Manager {
@Override
public DataSet getStorageGroupSchema() {
+
+ // TODO: Only leader can get StorageGroupSchema
+
return regionManager.getStorageGroupSchema();
}
@Override
public TSStatus setStorageGroup(PhysicalPlan physicalPlan) {
+
+ // TODO: Only leader can set StorageGroup
+
if (physicalPlan instanceof SetStorageGroupPlan) {
return regionManager.setStorageGroup((SetStorageGroupPlan) physicalPlan);
}
@@ -103,51 +115,58 @@ public class ConfigManager implements Manager {
}
@Override
- public DataNodeManager getDataNodeManager() {
- return dataNodeManager;
- }
+ public DataSet getSchemaPartition(PhysicalPlan physicalPlan) {
- @Override
- public DataSet getDataPartition(PhysicalPlan physicalPlan) {
- if (physicalPlan instanceof DataPartitionPlan) {
- return partitionManager.getDataPartition((DataPartitionPlan) physicalPlan);
+ // TODO: Only leader can query SchemaPartition
+
+ if (physicalPlan instanceof GetOrCreateSchemaPartitionPlan) {
+ return partitionManager.getSchemaPartition((GetOrCreateSchemaPartitionPlan) physicalPlan);
}
- return new DataNodesInfoDataSet();
+ return new SchemaPartitionDataSet();
}
@Override
- public DataSet getSchemaPartition(PhysicalPlan physicalPlan) {
- if (physicalPlan instanceof SchemaPartitionPlan) {
- return partitionManager.getSchemaPartition((SchemaPartitionPlan) physicalPlan);
+ public DataSet getOrCreateSchemaPartition(PhysicalPlan physicalPlan) {
+
+ // TODO: Only leader can apply SchemaPartition
+
+ if (physicalPlan instanceof GetOrCreateSchemaPartitionPlan) {
+ return partitionManager.getOrCreateSchemaPartition(
+ (GetOrCreateSchemaPartitionPlan) physicalPlan);
}
- return new DataNodesInfoDataSet();
+ return new SchemaPartitionDataSet();
}
@Override
- public RegionManager getRegionManager() {
- return regionManager;
+ public DataSet getDataPartition(PhysicalPlan physicalPlan) {
+
+ // TODO: Only leader can query DataPartition
+
+ if (physicalPlan instanceof GetOrCreateDataPartitionPlan) {
+ return partitionManager.getDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
+ }
+ return new DataPartitionDataSet();
}
@Override
- public DataSet applySchemaPartition(PhysicalPlan physicalPlan) {
- if (physicalPlan instanceof SchemaPartitionPlan) {
- return partitionManager.applySchemaPartition((SchemaPartitionPlan) physicalPlan);
+ public DataSet getOrCreateDataPartition(PhysicalPlan physicalPlan) {
+
+ // TODO: only leader can apply DataPartition
+
+ if (physicalPlan instanceof GetOrCreateDataPartitionPlan) {
+ return partitionManager.getOrCreateDataPartition((GetOrCreateDataPartitionPlan) physicalPlan);
}
- return new DataNodesInfoDataSet();
+ return new DataPartitionDataSet();
}
@Override
- public DataSet applyDataPartition(PhysicalPlan physicalPlan) {
- if (physicalPlan instanceof DataPartitionPlan) {
- return partitionManager.applyDataPartition((DataPartitionPlan) physicalPlan);
- }
- return new DataNodesInfoDataSet();
+ public DataNodeManager getDataNodeManager() {
+ return dataNodeManager;
}
@Override
- public DeviceGroupHashInfo getDeviceGroupHashInfo() {
- return new DeviceGroupHashInfo(
- conf.getDeviceGroupCount(), conf.getDeviceGroupHashExecutorClass());
+ public RegionManager getRegionManager() {
+ return regionManager;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index 08a5e32f99..51d2bf7898 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -21,7 +21,8 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
-import org.apache.iotdb.commons.hash.DeviceGroupHashExecutor;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.statemachine.PartitionRegionStateMachine;
@@ -48,14 +49,13 @@ public class ConsensusManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsensusManager.class);
private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
- private IConsensus consensusImpl;
-
private ConsensusGroupId consensusGroupId;
+ private IConsensus consensusImpl;
- private DeviceGroupHashExecutor hashExecutor;
+ private SeriesPartitionExecutor executor;
public ConsensusManager() throws IOException {
- setHashExecutor();
+ setSeriesPartitionExecutor();
setConsensusLayer();
}
@@ -64,27 +64,28 @@ public class ConsensusManager {
}
/** Build DeviceGroupHashExecutor */
- private void setHashExecutor() {
+ private void setSeriesPartitionExecutor() {
try {
- Class<?> executor = Class.forName(conf.getDeviceGroupHashExecutorClass());
+ Class<?> executor = Class.forName(conf.getSeriesPartitionExecutorClass());
Constructor<?> executorConstructor = executor.getConstructor(int.class);
- hashExecutor =
- (DeviceGroupHashExecutor) executorConstructor.newInstance(conf.getDeviceGroupCount());
+ this.executor =
+ (SeriesPartitionExecutor)
+ executorConstructor.newInstance(conf.getSeriesPartitionSlotNum());
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
LOGGER.error(
- "Couldn't Constructor DeviceGroupHashExecutor class: {}",
- conf.getDeviceGroupHashExecutorClass(),
+ "Couldn't Constructor SeriesPartitionExecutor class: {}",
+ conf.getSeriesPartitionExecutorClass(),
e);
- hashExecutor = null;
+ executor = null;
}
}
- public int getDeviceGroupID(String device) {
- return hashExecutor.getDeviceGroupID(device);
+ public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
+ return executor.getSeriesPartitionSlot(device);
}
/** Build ConfigNodeGroup ConsensusLayer */
@@ -128,5 +129,9 @@ public class ConsensusManager {
return consensusImpl.read(consensusGroupId, plan);
}
+ public boolean isLeader() {
+ return consensusImpl.isLeader(consensusGroupId);
+ }
+
// TODO: Interfaces for LoadBalancer control
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
index ab62b694d3..9e0d5a7633 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/DataNodeManager.java
@@ -19,11 +19,15 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
+import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -31,70 +35,77 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/** Manager server info of data node, add node or remove node */
public class DataNodeManager {
+
private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeManager.class);
- private DataNodeInfoPersistence dataNodeInfo = DataNodeInfoPersistence.getInstance();
+ private static final DataNodeInfoPersistence dataNodeInfoPersistence =
+ DataNodeInfoPersistence.getInstance();
- private Manager configManager;
+ private final Manager configManager;
/** TODO:do some operate after add node or remove node */
- private List<ChangeServerListener> listeners = new CopyOnWriteArrayList<>();
-
- private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
-
- private int nextDataNodeId;
+ private final List<ChangeServerListener> listeners = new CopyOnWriteArrayList<>();
public DataNodeManager(Manager configManager) {
this.configManager = configManager;
- this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
+ }
+
+ private void setGlobalConfig(DataNodeConfigurationDataSet dataSet) {
+ // Set TGlobalConfig
+ TGlobalConfig globalConfig = new TGlobalConfig();
+ globalConfig.setDataNodeConsensusProtocolClass(
+ ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass());
+ globalConfig.setSeriesPartitionSlotNum(
+ ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum());
+ globalConfig.setSeriesPartitionExecutorClass(
+ ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass());
+ dataSet.setGlobalConfig(globalConfig);
}
/**
- * register dta node info when data node start
+ * Register DataNode
*
* @param plan RegisterDataNodePlan
- * @return success if data node regist first
+ * @return DataNodeConfigurationDataSet. The TSStatus will be set to SUCCESS_STATUS when register
+ * success, and DATANODE_ALREADY_REGISTERED when the DataNode is already exist.
*/
- public TSStatus registerDataNode(RegisterDataNodePlan plan) {
- TSStatus result;
- DataNodeLocation info = plan.getInfo();
- dataNodeInfoReadWriteLock.writeLock().lock();
- try {
- if (dataNodeInfo.containsValue(info)) {
- // TODO: optimize
- result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- result.setMessage(String.valueOf(dataNodeInfo.getDataNodeInfo(info)));
- } else {
- info.setDataNodeID(nextDataNodeId);
- ConsensusWriteResponse consensusWriteResponse = getConsensusManager().write(plan);
- nextDataNodeId += 1;
- return consensusWriteResponse.getStatus();
- }
- } finally {
- dataNodeInfoReadWriteLock.writeLock().unlock();
+ public DataSet registerDataNode(RegisterDataNodePlan plan) {
+ DataNodeConfigurationDataSet dataSet = new DataNodeConfigurationDataSet();
+
+ if (DataNodeInfoPersistence.getInstance().containsValue(plan.getInfo())) {
+ dataSet.setStatus(new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()));
+ } else {
+ plan.getInfo().setDataNodeId(DataNodeInfoPersistence.getInstance().generateNextDataNodeId());
+ ConsensusWriteResponse resp = getConsensusManager().write(plan);
+ dataSet.setStatus(resp.getStatus());
}
- return result;
+
+ dataSet.setDataNodeId(plan.getInfo().getDataNodeId());
+ setGlobalConfig(dataSet);
+ return dataSet;
}
/**
- * get dta node info
+ * Get DataNode info
*
* @param plan QueryDataNodeInfoPlan
- * @return all data node info if dataNodeId of plan is -1
+ * @return The specific DataNode's info or all DataNode info if dataNodeId in
+ * QueryDataNodeInfoPlan is -1
*/
public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
return (DataNodesInfoDataSet) getConsensusManager().read(plan).getDataset();
}
- public Set<Integer> getDataNodeId() {
- return dataNodeInfo.getDataNodeIds();
+ public int getOnlineDataNodeCount() {
+ return dataNodeInfoPersistence.getOnlineDataNodeCount();
+ }
+
+ public List<DataNodeLocation> getOnlineDataNodes() {
+ return dataNodeInfoPersistence.getOnlineDataNodes();
}
private ConsensusManager getConsensusManager() {
@@ -114,10 +125,6 @@ public class DataNodeManager {
listeners.stream().forEach(serverListener -> serverListener.waiting());
}
- public Map<Integer, DataNodeLocation> getOnlineDataNodes() {
- return dataNodeInfo.getOnlineDataNodes();
- }
-
private class ServerStartListenerThread extends Thread implements ChangeServerListener {
private boolean changed = false;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
index 7c6dda9532..25f40841e2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/Manager.java
@@ -20,11 +20,10 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
import org.apache.iotdb.consensus.common.DataSet;
/**
- * a subset of services provided by {@ConfigManager}. For use internally only, pased to Managers,
+ * a subset of services provided by {@ConfigManager}. For use internally only, passed to Managers,
* services.
*/
public interface Manager {
@@ -34,88 +33,91 @@ public interface Manager {
*
* @return true if service stopped
*/
- public boolean isStopped();
+ boolean isStopped();
/**
- * register data node
+ * get data node info manager
*
- * @param physicalPlan physical plan
- * @return status
+ * @return DataNodeInfoManager instance
*/
- public TSStatus registerDataNode(PhysicalPlan physicalPlan);
+ DataNodeManager getDataNodeManager();
/**
- * get data node info
+ * get consensus manager
*
- * @param physicalPlan physical plan
- * @return data set
+ * @return ConsensusManager instance
*/
- DataSet getDataNodeInfo(PhysicalPlan physicalPlan);
+ ConsensusManager getConsensusManager();
/**
- * get storage group schema
+ * get assign region manager
*
- * @return data set
+ * @return AssignRegionManager instance
*/
- DataSet getStorageGroupSchema();
+ RegionManager getRegionManager();
/**
- * set storage group
+ * Register DataNode
*
- * @param physicalPlan physical plan
- * @return status
+ * @param physicalPlan RegisterDataNodePlan
+ * @return DataNodeConfigurationDataSet
*/
- TSStatus setStorageGroup(PhysicalPlan physicalPlan);
+ DataSet registerDataNode(PhysicalPlan physicalPlan);
/**
- * get data node info manager
+ * Get DataNode info
*
- * @return DataNodeInfoManager instance
+ * @param physicalPlan QueryDataNodeInfoPlan
+ * @return DataNodesInfoDataSet
*/
- DataNodeManager getDataNodeManager();
+ DataSet getDataNodeInfo(PhysicalPlan physicalPlan);
/**
- * get data partition
+ * Get StorageGroupSchemas
*
- * @param physicalPlan physical plan
- * @return data set
+ * @return StorageGroupSchemaDataSet
*/
- DataSet getDataPartition(PhysicalPlan physicalPlan);
+ DataSet getStorageGroupSchema();
/**
- * get schema partition
+ * Set StorageGroup
*
- * @param physicalPlan physical plan
- * @return data set
+ * @param physicalPlan SetStorageGroupPlan
+ * @return status
*/
- DataSet getSchemaPartition(PhysicalPlan physicalPlan);
+ TSStatus setStorageGroup(PhysicalPlan physicalPlan);
/**
- * get assign region manager
+ * Get SchemaPartition
*
- * @return AssignRegionManager instance
+ * @param physicalPlan SchemaPartitionPlan
+ * @return SchemaPartitionDataSet
*/
- RegionManager getRegionManager();
+ DataSet getSchemaPartition(PhysicalPlan physicalPlan);
/**
- * apply schema partition
+ * Get or create SchemaPartition
*
- * @param physicalPlan physical plan
- * @return data set
+ * @param physicalPlan SchemaPartitionPlan
+ * @return SchemaPartitionDataSet
*/
- DataSet applySchemaPartition(PhysicalPlan physicalPlan);
+ DataSet getOrCreateSchemaPartition(PhysicalPlan physicalPlan);
/**
- * apply data partition
+ * Get DataPartition
*
- * @param physicalPlan physical plan
- * @return data set
+ * @param physicalPlan DataPartitionPlan
+ * @return DataPartitionDataSet
*/
- DataSet applyDataPartition(PhysicalPlan physicalPlan);
-
- DeviceGroupHashInfo getDeviceGroupHashInfo();
+ DataSet getDataPartition(PhysicalPlan physicalPlan);
- ConsensusManager getConsensusManager();
+ /**
+ * Get or create DataPartition
+ *
+ * @param physicalPlan DataPartitionPlan
+ * @return DataPartitionDataSet
+ */
+ DataSet getOrCreateDataPartition(PhysicalPlan physicalPlan);
/**
* operate permission
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 09a82f12ec..c2bde8119e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -18,128 +18,195 @@
*/
package org.apache.iotdb.confignode.manager;
-import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/** manage data partition and schema partition */
public class PartitionManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
-
- /** schema partition read write lock */
- private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
- /** data partition read write lock */
- private final ReentrantReadWriteLock dataPartitionReadWriteLock;
+ private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
- // TODO: Serialize and Deserialize
- private final DataPartition dataPartition;
+ private static final PartitionInfoPersistence partitionInfoPersistence =
+ PartitionInfoPersistence.getInstance();
private final Manager configNodeManager;
public PartitionManager(Manager configNodeManager) {
- this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
- this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
this.configNodeManager = configNodeManager;
- this.dataPartition = new DataPartition();
+ }
+
+ private ConsensusManager getConsensusManager() {
+ return configNodeManager.getConsensusManager();
}
/**
- * Get schema partition
+ * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
*
- * @param physicalPlan storageGroup and deviceGroupIDs
- * @return Empty Data Set if does not exist
+ * @param physicalPlan SchemaPartitionPlan with PatternTree
+ * @return SchemaPartitionDataSet that contains only existing SchemaPartition
*/
- public DataSet getSchemaPartition(SchemaPartitionPlan physicalPlan) {
+ public DataSet getSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
SchemaPartitionDataSet schemaPartitionDataSet;
- schemaPartitionReadWriteLock.readLock().lock();
- try {
- ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
- schemaPartitionDataSet = (SchemaPartitionDataSet) consensusReadResponse.getDataset();
- } finally {
- schemaPartitionReadWriteLock.readLock().unlock();
- }
+ ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
+ schemaPartitionDataSet = (SchemaPartitionDataSet) consensusReadResponse.getDataset();
return schemaPartitionDataSet;
}
/**
- * If does not exist, apply a new schema partition
+ * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
+ * and create a new one if it does not exist
*
- * @param physicalPlan storage group and device group id
- * @return Schema Partition data set
+ * @param physicalPlan SchemaPartitionPlan with PatternTree
+ * @return SchemaPartitionDataSet
*/
- public DataSet applySchemaPartition(SchemaPartitionPlan physicalPlan) {
+ public DataSet getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
String storageGroup = physicalPlan.getStorageGroup();
- List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
- List<Integer> noAssignDeviceGroupId =
- PartitionInfoPersistence.getInstance()
- .filterSchemaRegionNoAssignDeviceGroupId(storageGroup, deviceGroupIDs);
-
- // allocate partition by storage group and device group id
- schemaPartitionReadWriteLock.writeLock().lock();
- try {
- Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets =
- allocateSchemaPartition(storageGroup, noAssignDeviceGroupId);
- physicalPlan.setDeviceGroupIdReplicaSet(deviceGroupIdReplicaSets);
- getConsensusManager().write(physicalPlan);
- LOGGER.info("Allocate schema partition to {}.", deviceGroupIdReplicaSets);
- } finally {
- schemaPartitionReadWriteLock.writeLock().unlock();
+ List<Integer> seriesPartitionSlots = physicalPlan.getSeriesPartitionSlots();
+ List<Integer> noAssignedSeriesPartitionSlots =
+ partitionInfoPersistence.filterSchemaRegionNoAssignedPartitionSlots(
+ storageGroup, seriesPartitionSlots);
+
+ if (noAssignedSeriesPartitionSlots.size() > 0) {
+ // allocate partition by storage group and device group id
+ Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets =
+ allocateSchemaPartition(storageGroup, noAssignedSeriesPartitionSlots);
+ physicalPlan.setSchemaPartitionReplicaSet(schemaPartitionReplicaSets);
+
+ ConsensusWriteResponse consensusWriteResponse = getConsensusManager().write(physicalPlan);
+ if (consensusWriteResponse.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.info("Allocate schema partition to {}.", schemaPartitionReplicaSets);
+ }
}
+ physicalPlan.setSchemaPartitionReplicaSet(new HashMap<>());
return getSchemaPartition(physicalPlan);
}
/**
- * TODO: allocate schema partition by balancer
+ * TODO: allocate schema partition by LoadManager
*
- * @param storageGroup storage group
- * @param deviceGroupIDs device group id list
+ * @param storageGroup StorageGroupName
+ * @param noAssignedSeriesPartitionSlots not assigned SeriesPartitionSlots
+ * @return assign result
*/
private Map<Integer, RegionReplicaSet> allocateSchemaPartition(
- String storageGroup, List<Integer> deviceGroupIDs) {
+ String storageGroup, List<Integer> noAssignedSeriesPartitionSlots) {
List<RegionReplicaSet> schemaRegionEndPoints =
- RegionInfoPersistence.getInstance().getSchemaRegionEndPoint();
+ RegionInfoPersistence.getInstance().getSchemaRegionEndPoint(storageGroup);
Random random = new Random();
- Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets = new HashMap<>();
- for (int i = 0; i < deviceGroupIDs.size(); i++) {
+ Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets = new HashMap<>();
+ for (Integer seriesPartitionSlot : noAssignedSeriesPartitionSlots) {
RegionReplicaSet schemaRegionReplicaSet =
schemaRegionEndPoints.get(random.nextInt(schemaRegionEndPoints.size()));
- deviceGroupIdReplicaSets.put(deviceGroupIDs.get(i), schemaRegionReplicaSet);
+ schemaPartitionReplicaSets.put(seriesPartitionSlot, schemaRegionReplicaSet);
}
- return deviceGroupIdReplicaSets;
+ return schemaPartitionReplicaSets;
}
- private ConsensusManager getConsensusManager() {
- return configNodeManager.getConsensusManager();
+ /**
+ * Get DataPartition
+ *
+ * @param physicalPlan DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
+ * List<TimePartitionSlot>>>
+ * @return DataPartitionDataSet that contains only existing DataPartition
+ */
+ public DataSet getDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
+ DataPartitionDataSet dataPartitionDataSet;
+ ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan);
+ dataPartitionDataSet = (DataPartitionDataSet) consensusReadResponse.getDataset();
+ return dataPartitionDataSet;
}
/**
- * TODO:allocate schema partition by balancer
+ * Get DataPartition and create a new one if it does not exist
*
- * @param physicalPlan physical plan
- * @return data set
+ * @param physicalPlan DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
+ * List<TimePartitionSlot>>>
+ * @return DataPartitionDataSet
*/
- public DataSet applyDataPartition(DataPartitionPlan physicalPlan) {
- return null;
+ public DataSet getOrCreateDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> noAssignedDataPartitionSlots =
+ partitionInfoPersistence.filterNoAssignedDataPartitionSlots(
+ physicalPlan.getPartitionSlotsMap());
+
+ if (noAssignedDataPartitionSlots.size() > 0) {
+ // Allocate DataPartition
+ Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ assignedDataPartition = allocateDataPartition(noAssignedDataPartitionSlots);
+ CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
+ createPlan.setAssignedDataPartition(assignedDataPartition);
+
+ // Persistence DataPartition
+ ConsensusWriteResponse consensusWriteResponse = getConsensusManager().write(createPlan);
+ if (consensusWriteResponse.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.info("Allocate data partition to {}.", assignedDataPartition);
+ }
+ }
+
+ return getDataPartition(physicalPlan);
}
- public DataSet getDataPartition(DataPartitionPlan physicalPlan) {
- return null;
+ /**
+ * TODO: allocate by LoadManager
+ *
+ * @param noAssignedDataPartitionSlotsMap Map<StorageGroupName, Map<SeriesPartitionSlot,
+ * List<TimePartitionSlot>>>
+ * @return assign result, Map<StorageGroupName, Map<SeriesPartitionSlot, Map<TimePartitionSlot,
+ * List<RegionReplicaSet>>>>
+ */
+ private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ allocateDataPartition(
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>
+ noAssignedDataPartitionSlotsMap) {
+
+ Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>> result =
+ new HashMap<>();
+
+ for (String storageGroup : noAssignedDataPartitionSlotsMap.keySet()) {
+ Map<SeriesPartitionSlot, List<TimePartitionSlot>> noAssignedPartitionSlotsMap =
+ noAssignedDataPartitionSlotsMap.get(storageGroup);
+ List<RegionReplicaSet> dataRegionEndPoints =
+ RegionInfoPersistence.getInstance().getDataRegionEndPoint(storageGroup);
+ Random random = new Random();
+
+ Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> allocateResult =
+ new HashMap<>();
+ for (Map.Entry<SeriesPartitionSlot, List<TimePartitionSlot>> seriesPartitionEntry :
+ noAssignedPartitionSlotsMap.entrySet()) {
+ allocateResult.put(seriesPartitionEntry.getKey(), new HashMap<>());
+ for (TimePartitionSlot timePartitionSlot : seriesPartitionEntry.getValue()) {
+ allocateResult
+ .get(seriesPartitionEntry.getKey())
+ .computeIfAbsent(timePartitionSlot, key -> new ArrayList<>())
+ .add(dataRegionEndPoints.get(random.nextInt(dataRegionEndPoints.size())));
+ }
+ }
+
+ result.put(storageGroup, allocateResult);
+ }
+ return result;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
index 55017784b8..f45591cffb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/RegionManager.java
@@ -20,44 +20,39 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.GroupType;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.confignode.conf.ConfigNodeConf;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.partition.DataRegionInfo;
-import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
-import org.apache.iotdb.confignode.partition.StorageGroupSchema;
import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.rpc.TSStatusCode;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/** manage data partition and schema partition */
public class RegionManager {
+
private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
private static final int regionReplicaCount = conf.getRegionReplicaCount();
private static final int schemaRegionCount = conf.getSchemaRegionCount();
private static final int dataRegionCount = conf.getDataRegionCount();
- /** partition read write lock */
- private final ReentrantReadWriteLock partitionReadWriteLock;
-
- // TODO: Serialize and Deserialize
- private int nextSchemaRegionGroup = 0;
- // TODO: Serialize and Deserialize
- private int nextDataRegionGroup = 0;
-
- private RegionInfoPersistence regionInfoPersistence = RegionInfoPersistence.getInstance();
+ private static final RegionInfoPersistence regionInfoPersistence =
+ RegionInfoPersistence.getInstance();
private final Manager configNodeManager;
public RegionManager(Manager configNodeManager) {
- this.partitionReadWriteLock = new ReentrantReadWriteLock();
this.configNodeManager = configNodeManager;
}
@@ -66,41 +61,39 @@ public class RegionManager {
}
/**
- * 1. region allocation 2. add to storage group map
+ * Set StorageGroup and allocate the default amount Regions
*
* @param plan SetStorageGroupPlan
- * @return TSStatusCode.SUCCESS_STATUS if region allocate
+ * @return SUCCESS_STATUS if the StorageGroup is set and region allocation successful.
+ * NOT_ENOUGH_DATA_NODE if there are not enough DataNode for Region allocation.
+ * STORAGE_GROUP_ALREADY_EXISTS if the StorageGroup is already set.
*/
public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
TSStatus result;
- partitionReadWriteLock.writeLock().lock();
- try {
- if (configNodeManager.getDataNodeManager().getDataNodeId().size() < regionReplicaCount) {
- result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- result.setMessage("DataNode is not enough, please register more.");
+ if (configNodeManager.getDataNodeManager().getOnlineDataNodeCount() < regionReplicaCount) {
+ result = new TSStatus(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode());
+ result.setMessage("DataNode is not enough, please register more.");
+ } else {
+ if (regionInfoPersistence.containsStorageGroup(plan.getSchema().getName())) {
+ result = new TSStatus(TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode());
+ result.setMessage(
+ String.format("StorageGroup %s is already set.", plan.getSchema().getName()));
} else {
- if (regionInfoPersistence.containsStorageGroup(plan.getSchema().getName())) {
- result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- result.setMessage(
- String.format("StorageGroup %s is already set.", plan.getSchema().getName()));
- } else {
- String storageGroupName = plan.getSchema().getName();
- StorageGroupSchema storageGroupSchema = new StorageGroupSchema(storageGroupName);
-
- // allocate schema region
- SchemaRegionInfo schemaRegionInfo = schemaRegionAllocation(storageGroupSchema);
- plan.setSchemaRegionInfo(schemaRegionInfo);
-
- // allocate data region
- DataRegionInfo dataRegionInfo = dataRegionAllocation(storageGroupSchema);
- plan.setDataRegionInfo(dataRegionInfo);
-
- // write consensus
- result = getConsensusManager().write(plan).getStatus();
- }
+ CreateRegionsPlan createPlan = new CreateRegionsPlan();
+ createPlan.setStorageGroup(plan.getSchema().getName());
+
+ // allocate schema region
+ allocateRegions(GroupType.SchemaRegion, createPlan);
+ // allocate data region
+ allocateRegions(GroupType.DataRegion, createPlan);
+
+ // set StorageGroup
+ getConsensusManager().write(plan);
+
+ // create Region
+ // TODO: Send create Region to DataNode
+ result = getConsensusManager().write(createPlan).getStatus();
}
- } finally {
- partitionReadWriteLock.writeLock().unlock();
}
return result;
}
@@ -109,42 +102,31 @@ public class RegionManager {
return configNodeManager.getDataNodeManager();
}
- private SchemaRegionInfo schemaRegionAllocation(StorageGroupSchema storageGroupSchema) {
+ private void allocateRegions(GroupType type, CreateRegionsPlan plan) {
- SchemaRegionInfo schemaRegionInfo = new SchemaRegionInfo();
// TODO: Use CopySet algorithm to optimize region allocation policy
- for (int i = 0; i < schemaRegionCount; i++) {
- List<Integer> dataNodeList = new ArrayList<>(getDataNodeInfoManager().getDataNodeId());
- Collections.shuffle(dataNodeList);
- schemaRegionInfo.addSchemaRegion(
- nextSchemaRegionGroup, dataNodeList.subList(0, regionReplicaCount));
- storageGroupSchema.addSchemaRegionGroup(nextSchemaRegionGroup);
- nextSchemaRegionGroup += 1;
- }
- return schemaRegionInfo;
- }
- /**
- * TODO: Only perform in leader node, @rongzhao
- *
- * @param storageGroupSchema
- */
- private DataRegionInfo dataRegionAllocation(StorageGroupSchema storageGroupSchema) {
- // TODO: Use CopySet algorithm to optimize region allocation policy
- DataRegionInfo dataRegionInfo = new DataRegionInfo();
- for (int i = 0; i < dataRegionCount; i++) {
- List<Integer> dataNodeList = new ArrayList<>(getDataNodeInfoManager().getDataNodeId());
- Collections.shuffle(dataNodeList);
- dataRegionInfo.createDataRegion(
- nextDataRegionGroup, dataNodeList.subList(0, regionReplicaCount));
- storageGroupSchema.addDataRegionGroup(nextDataRegionGroup);
- nextDataRegionGroup += 1;
+ int regionCount = type.equals(GroupType.SchemaRegion) ? schemaRegionCount : dataRegionCount;
+ List<DataNodeLocation> onlineDataNodes = getDataNodeInfoManager().getOnlineDataNodes();
+ for (int i = 0; i < regionCount; i++) {
+ Collections.shuffle(onlineDataNodes);
+
+ RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+ ConsensusGroupId consensusGroupId = null;
+ switch (type) {
+ case SchemaRegion:
+ consensusGroupId = new SchemaRegionId(regionInfoPersistence.generateNextRegionGroupId());
+ break;
+ case DataRegion:
+ consensusGroupId = new DataRegionId(regionInfoPersistence.generateNextRegionGroupId());
+ }
+ regionReplicaSet.setId(consensusGroupId);
+ regionReplicaSet.setDataNodeList(onlineDataNodes.subList(0, regionReplicaCount));
+ plan.addRegion(regionReplicaSet);
}
- return dataRegionInfo;
}
public StorageGroupSchemaDataSet getStorageGroupSchema() {
-
ConsensusReadResponse readResponse =
getConsensusManager().read(new QueryStorageGroupSchemaPlan());
return (StorageGroupSchemaDataSet) readResponse.getDataset();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
deleted file mode 100644
index 9441d1452b..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/DataRegionInfo.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.partition;
-
-import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class DataRegionInfo {
-
- // TODO: Serialize and Deserialize
- // Map<DataRegionID, List<DataNodeID>>
- private Map<Integer, List<Integer>> dataRegionDataNodesMap;
-
- // Map<StorageGroup, Map<DeviceGroupID, DataPartitionRule>>
- private final Map<String, Map<Integer, DataPartitionRule>> dataPartitionRuleTable;
-
- public DataRegionInfo() {
- this.dataRegionDataNodesMap = new HashMap<>();
- this.dataPartitionRuleTable = new HashMap<>();
- }
-
- public Map<Integer, List<Integer>> getDataRegionDataNodesMap() {
- return dataRegionDataNodesMap;
- }
-
- public void createDataRegion(int dataRegionGroup, List<Integer> dataNodeList) {
- dataRegionDataNodesMap.put(dataRegionGroup, dataNodeList);
- }
-
- public List<Integer> getDataRegionLocation(int dataRegionGroup) {
- return dataRegionDataNodesMap.get(dataRegionGroup);
- }
-
- public void updateDataPartitionRule(
- String StorageGroup, int deviceGroup, DataPartitionRule rule) {
- // TODO: Data partition policy by @YongzaoDan
- }
-
- public void serializeImpl(ByteBuffer buffer) {
- SerializeDeserializeUtil.writeIntMapLists(dataRegionDataNodesMap, buffer);
- }
-
- public void deserializeImpl(ByteBuffer buffer) {
- dataRegionDataNodesMap = SerializeDeserializeUtil.readIntMapLists(buffer);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
deleted file mode 100644
index 9bde9be5e7..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/SchemaRegionInfo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.partition;
-
-import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class SchemaRegionInfo {
-
- // TODO: Serialize and Deserialize
- // Map<SchemaRegionID, List<DataNodeID>>
- private Map<Integer, List<Integer>> schemaRegionDataNodesMap;
-
- public SchemaRegionInfo() {
- this.schemaRegionDataNodesMap = new HashMap<>();
- }
-
- public void addSchemaRegion(int schemaRegion, List<Integer> dataNode) {
- if (!schemaRegionDataNodesMap.containsKey(schemaRegion)) {
- schemaRegionDataNodesMap.put(schemaRegion, dataNode);
- }
- }
-
- public List<Integer> getSchemaRegionLocation(int schemaRegionGroup) {
- return schemaRegionDataNodesMap.get(schemaRegionGroup);
- }
-
- public Map<Integer, List<Integer>> getSchemaRegionDataNodesMap() {
- return schemaRegionDataNodesMap;
- }
-
- public void serializeImpl(ByteBuffer buffer) {
- SerializeDeserializeUtil.writeIntMapLists(schemaRegionDataNodesMap, buffer);
- }
-
- public void deserializeImpl(ByteBuffer buffer) {
- schemaRegionDataNodesMap = SerializeDeserializeUtil.readIntMapLists(buffer);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java b/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
index fc4153c054..9bbe657503 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/partition/StorageGroupSchema.java
@@ -18,22 +18,29 @@
*/
package org.apache.iotdb.confignode.partition;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
public class StorageGroupSchema {
private String name;
- private List<Integer> schemaRegionGroupIDs;
- private List<Integer> dataRegionGroupIDs;
+ private final List<ConsensusGroupId> schemaRegionGroupIds;
+ private final List<ConsensusGroupId> dataRegionGroupIds;
public StorageGroupSchema() {
- // empty constructor
+ schemaRegionGroupIds = new ArrayList<>();
+ dataRegionGroupIds = new ArrayList<>();
}
public StorageGroupSchema(String name) {
+ this();
this.name = name;
}
@@ -41,37 +48,65 @@ public class StorageGroupSchema {
return name;
}
- public List<Integer> getSchemaRegionGroupIDs() {
- return schemaRegionGroupIDs;
+ public List<ConsensusGroupId> getSchemaRegionGroupIds() {
+ return schemaRegionGroupIds;
}
- public void addSchemaRegionGroup(int id) {
- if (schemaRegionGroupIDs == null) {
- schemaRegionGroupIDs = new ArrayList<>();
- }
- schemaRegionGroupIDs.add(id);
+ public void addSchemaRegionGroup(ConsensusGroupId id) {
+ schemaRegionGroupIds.add(id);
}
- public List<Integer> getDataRegionGroupIDs() {
- return dataRegionGroupIDs;
+ public List<ConsensusGroupId> getDataRegionGroupIds() {
+ return dataRegionGroupIds;
}
- public void addDataRegionGroup(int id) {
- if (dataRegionGroupIDs == null) {
- dataRegionGroupIDs = new ArrayList<>();
- }
- dataRegionGroupIDs.add(id);
+ public void addDataRegionGroup(ConsensusGroupId id) {
+ dataRegionGroupIds.add(id);
}
public void serialize(ByteBuffer buffer) {
buffer.putInt(name.length());
buffer.put(name.getBytes());
+
+ buffer.putInt(schemaRegionGroupIds.size());
+ for (ConsensusGroupId schemaRegionGroupId : schemaRegionGroupIds) {
+ schemaRegionGroupId.serializeImpl(buffer);
+ }
+
+ buffer.putInt(dataRegionGroupIds.size());
+ for (ConsensusGroupId dataRegionGroupId : dataRegionGroupIds) {
+ dataRegionGroupId.serializeImpl(buffer);
+ }
}
- public void deserialize(ByteBuffer buffer) {
+ public void deserialize(ByteBuffer buffer) throws IOException {
+ name = SerializeDeserializeUtil.readString(buffer);
+
int length = buffer.getInt();
- byte[] byteName = new byte[length];
- buffer.get(byteName, 0, length);
- name = new String(byteName, 0, length);
+ for (int i = 0; i < length; i++) {
+ ConsensusGroupId schemaRegionId = ConsensusGroupId.Factory.create(buffer);
+ schemaRegionGroupIds.add(schemaRegionId);
+ }
+
+ length = buffer.getInt();
+ for (int i = 0; i < length; i++) {
+ ConsensusGroupId dataRegionId = ConsensusGroupId.Factory.create(buffer);
+ dataRegionGroupIds.add(dataRegionId);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ StorageGroupSchema that = (StorageGroupSchema) o;
+ return name.equals(that.name)
+ && schemaRegionGroupIds.equals(that.schemaRegionGroupIds)
+ && dataRegionGroupIds.equals(that.dataRegionGroupIds);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, schemaRegionGroupIds, dataRegionGroupIds);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
index 0071c94368..2b1ad0f5f6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/DataNodeInfoPersistence.java
@@ -19,19 +19,17 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -39,65 +37,61 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DataNodeInfoPersistence {
- private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInfoPersistence.class);
+
+ private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
+
+ // TODO: serialize and deserialize
+ private int nextDataNodeId = 0;
/** online data nodes */
+ // TODO: serialize and deserialize
private final ConcurrentNavigableMap<Integer, DataNodeLocation> onlineDataNodes =
new ConcurrentSkipListMap();
/** For remove node or draining node */
- private Set<DataNodeLocation> drainingDataNodes = new HashSet<>();
-
- private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
+ private final Set<DataNodeLocation> drainingDataNodes = new HashSet<>();
private DataNodeInfoPersistence() {
this.dataNodeInfoReadWriteLock = new ReentrantReadWriteLock();
}
- public ConcurrentNavigableMap<Integer, DataNodeLocation> getOnlineDataNodes() {
- return onlineDataNodes;
- }
-
public boolean containsValue(DataNodeLocation info) {
- return onlineDataNodes.containsValue(info);
+ boolean result = false;
+ dataNodeInfoReadWriteLock.readLock().lock();
+
+ try {
+ for (Map.Entry<Integer, DataNodeLocation> entry : onlineDataNodes.entrySet()) {
+ if (entry.getValue().getEndPoint().equals(info.getEndPoint())) {
+ result = true;
+ info.setDataNodeId(entry.getKey());
+ break;
+ }
+ }
+ } finally {
+ dataNodeInfoReadWriteLock.readLock().unlock();
+ }
+
+ return result;
}
public void put(int dataNodeID, DataNodeLocation info) {
onlineDataNodes.put(dataNodeID, info);
}
- public int getDataNodeInfo(DataNodeLocation info) {
- // TODO: optimize
- for (Map.Entry<Integer, DataNodeLocation> entry : onlineDataNodes.entrySet()) {
- if (entry.getValue().equals(info)) {
- return info.getDataNodeID();
- }
- }
- return -1;
- }
-
/**
- * register dta node info when data node start
+ * Persist DataNode info
*
* @param plan RegisterDataNodePlan
- * @return success if data node regist first
+ * @return SUCCESS_STATUS
*/
public TSStatus registerDataNode(RegisterDataNodePlan plan) {
TSStatus result;
DataNodeLocation info = plan.getInfo();
dataNodeInfoReadWriteLock.writeLock().lock();
try {
- if (onlineDataNodes.containsValue(info)) {
- result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- result.setMessage(
- String.format(
- "DataNode %s is already registered.", plan.getInfo().getEndPoint().toString()));
- } else {
- onlineDataNodes.put(info.getDataNodeID(), info);
- result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- result.setMessage(String.valueOf(info.getDataNodeID()));
- LOGGER.info("Register data node success, data node is {}", plan);
- }
+ nextDataNodeId = Math.max(nextDataNodeId, info.getDataNodeId());
+ onlineDataNodes.put(info.getDataNodeId(), info);
+ result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
@@ -105,13 +99,16 @@ public class DataNodeInfoPersistence {
}
/**
- * get dta node info
+ * Get DataNode info
*
* @param plan QueryDataNodeInfoPlan
- * @return all data node info if dataNodeId of plan is -1
+ * @return The specific DataNode's info or all DataNode info if dataNodeId in
+ * QueryDataNodeInfoPlan is -1
*/
public DataNodesInfoDataSet getDataNodeInfo(QueryDataNodeInfoPlan plan) {
DataNodesInfoDataSet result = new DataNodesInfoDataSet();
+ result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+
int dataNodeId = plan.getDataNodeID();
dataNodeInfoReadWriteLock.readLock().lock();
try {
@@ -127,46 +124,45 @@ public class DataNodeInfoPersistence {
return result;
}
- public Set<Integer> getDataNodeIds() {
- return onlineDataNodes.keySet();
+ public int getOnlineDataNodeCount() {
+ int result;
+ dataNodeInfoReadWriteLock.readLock().lock();
+ try {
+ result = onlineDataNodes.size();
+ } finally {
+ dataNodeInfoReadWriteLock.readLock().unlock();
+ }
+ return result;
}
- /**
- * Add schema region group
- *
- * @param dataNodeId data node id
- * @param schemaRegionGroup schema region group
- */
- public void addSchemaRegionGroup(int dataNodeId, int schemaRegionGroup) {
- dataNodeInfoReadWriteLock.writeLock().lock();
+ public List<DataNodeLocation> getOnlineDataNodes() {
+ List<DataNodeLocation> result;
+ dataNodeInfoReadWriteLock.readLock().lock();
try {
- if (onlineDataNodes.containsKey(dataNodeId)) {
- onlineDataNodes.get(dataNodeId).addSchemaRegionGroup(schemaRegionGroup);
- }
+ result = new ArrayList<>(onlineDataNodes.values());
} finally {
- dataNodeInfoReadWriteLock.writeLock().unlock();
+ dataNodeInfoReadWriteLock.readLock().unlock();
}
+ return result;
}
- /**
- * Add data region group
- *
- * @param dataNodeId data node id
- * @param dataRegionGroup data region group
- */
- public void addDataRegionGroup(int dataNodeId, int dataRegionGroup) {
- dataNodeInfoReadWriteLock.writeLock().lock();
+ public int generateNextDataNodeId() {
+ int result;
+
try {
- if (onlineDataNodes.containsKey(dataNodeId)) {
- onlineDataNodes.get(dataNodeId).addSchemaRegionGroup(dataRegionGroup);
- }
+ dataNodeInfoReadWriteLock.writeLock().lock();
+ result = nextDataNodeId;
+ nextDataNodeId += 1;
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
}
+
+ return result;
}
@TestOnly
public void clear() {
+ nextDataNodeId = 0;
onlineDataNodes.clear();
drainingDataNodes.clear();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
index 221ceb2993..2b8d8a5395 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfoPersistence.java
@@ -19,25 +19,28 @@
package org.apache.iotdb.confignode.persistence;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.rpc.TSStatusCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** manage data partition and schema partition */
public class PartitionInfoPersistence {
- private static final Logger LOGGER = LoggerFactory.getLogger(PartitionInfoPersistence.class);
/** schema partition read write lock */
private final ReentrantReadWriteLock schemaPartitionReadWriteLock;
@@ -55,86 +58,136 @@ public class PartitionInfoPersistence {
this.schemaPartitionReadWriteLock = new ReentrantReadWriteLock();
this.dataPartitionReadWriteLock = new ReentrantReadWriteLock();
this.schemaPartition = new SchemaPartition();
+ this.schemaPartition.setSchemaPartitionMap(new HashMap<>());
this.dataPartition = new DataPartition();
+ this.dataPartition.setDataPartitionMap(new HashMap<>());
}
/**
- * Get schema partition
+ * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
*
- * @param physicalPlan storageGroup and deviceGroupIDs
- * @return Empty Data Set if does not exist
+ * @param physicalPlan SchemaPartitionPlan with PatternTree
+ * @return SchemaPartitionDataSet that contains only existing SchemaPartition
*/
- public DataSet getSchemaPartition(SchemaPartitionPlan physicalPlan) {
+ public DataSet getSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
SchemaPartitionDataSet schemaPartitionDataSet = new SchemaPartitionDataSet();
schemaPartitionReadWriteLock.readLock().lock();
try {
String storageGroup = physicalPlan.getStorageGroup();
- List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
+ List<Integer> deviceGroupIDs = physicalPlan.getSeriesPartitionSlots();
SchemaPartition schemaPartitionInfo = new SchemaPartition();
- schemaPartitionInfo.setSchemaPartition(
+ schemaPartitionInfo.setSchemaPartitionMap(
schemaPartition.getSchemaPartition(storageGroup, deviceGroupIDs));
- schemaPartitionDataSet.setSchemaPartitionInfo(schemaPartitionInfo);
+ schemaPartitionDataSet.setSchemaPartition(schemaPartitionInfo);
} finally {
schemaPartitionReadWriteLock.readLock().unlock();
+ schemaPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}
return schemaPartitionDataSet;
}
/**
- * If does not exist, apply a new schema partition
+ * TODO: Reconstruct this interface after PatterTree is moved to node-commons Get SchemaPartition
+ * and create a new one if it does not exist
*
- * @param physicalPlan storage group and device group id
- * @return Schema Partition data set
+ * @param physicalPlan SchemaPartitionPlan with PatternTree
+ * @return SchemaPartitionDataSet
*/
- public DataSet applySchemaPartition(SchemaPartitionPlan physicalPlan) {
- String storageGroup = physicalPlan.getStorageGroup();
- List<Integer> deviceGroupIDs = physicalPlan.getDeviceGroupIDs();
- List<Integer> noAssignDeviceGroupId =
- schemaPartition.filterNoAssignDeviceGroupId(storageGroup, deviceGroupIDs);
-
- // allocate partition by storage group and device group id
- Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets =
- physicalPlan.getDeviceGroupIdReplicaSets();
+ public TSStatus createSchemaPartition(GetOrCreateSchemaPartitionPlan physicalPlan) {
schemaPartitionReadWriteLock.writeLock().lock();
- try {
- deviceGroupIdReplicaSets
- .entrySet()
- .forEach(
- entity -> {
- schemaPartition.setSchemaRegionReplicaSet(
- storageGroup, entity.getKey(), entity.getValue());
- });
+ try {
+ // Allocate SchemaPartition by SchemaPartitionPlan
+ String storageGroup = physicalPlan.getStorageGroup();
+ Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets =
+ physicalPlan.getSchemaPartitionReplicaSets();
+ schemaPartitionReplicaSets.forEach(
+ (key, value) -> schemaPartition.setSchemaRegionReplicaSet(storageGroup, key, value));
} finally {
schemaPartitionReadWriteLock.writeLock().unlock();
}
- return getSchemaPartition(physicalPlan);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ /** TODO: Reconstruct this interface after PatterTree is moved to node-commons */
+ public List<Integer> filterSchemaRegionNoAssignedPartitionSlots(
+ String storageGroup, List<Integer> seriesPartitionSlots) {
+ List<Integer> result;
+ schemaPartitionReadWriteLock.readLock().lock();
+ try {
+ result =
+ schemaPartition.filterNoAssignedSeriesPartitionSlot(storageGroup, seriesPartitionSlots);
+ } finally {
+ schemaPartitionReadWriteLock.readLock().unlock();
+ }
+ return result;
}
/**
- * TODO:allocate schema partition by balancer
+ * Get DataPartition
*
- * @param physicalPlan physical plan
- * @return data set
+ * @param physicalPlan DataPartitionPlan with Map<StorageGroupName, Map<SeriesPartitionSlot,
+ * List<TimePartitionSlot>>>
+ * @return DataPartitionDataSet that contains only existing DataPartition
*/
- public DataSet applyDataPartition(DataPartitionPlan physicalPlan) {
- return null;
+ public DataSet getDataPartition(GetOrCreateDataPartitionPlan physicalPlan) {
+ DataPartitionDataSet dataPartitionDataSet = new DataPartitionDataSet();
+ dataPartitionReadWriteLock.readLock().lock();
+ try {
+ dataPartitionDataSet.setDataPartition(
+ dataPartition.getDataPartition(physicalPlan.getPartitionSlotsMap()));
+ } finally {
+ dataPartitionReadWriteLock.readLock().unlock();
+ dataPartitionDataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+ return dataPartitionDataSet;
}
- public DataSet getDataPartition(DataPartitionPlan physicalPlan) {
- return null;
+ public TSStatus createDataPartition(CreateDataPartitionPlan physicalPlan) {
+ dataPartitionReadWriteLock.writeLock().lock();
+
+ try {
+ // Allocate DataPartition by CreateDataPartitionPlan
+ Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ assignedResult = physicalPlan.getAssignedDataPartition();
+ assignedResult.forEach(
+ (storageGroup, seriesPartitionTimePartitionSlots) ->
+ seriesPartitionTimePartitionSlots.forEach(
+ ((seriesPartitionSlot, timePartitionSlotRegionReplicaSets) ->
+ timePartitionSlotRegionReplicaSets.forEach(
+ ((timePartitionSlot, regionReplicaSets) ->
+ regionReplicaSets.forEach(
+ regionReplicaSet ->
+ dataPartition.createDataPartition(
+ storageGroup,
+ seriesPartitionSlot,
+ timePartitionSlot,
+ regionReplicaSet)))))));
+ } finally {
+ dataPartitionReadWriteLock.writeLock().unlock();
+ }
+
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
- public List<Integer> filterSchemaRegionNoAssignDeviceGroupId(
- String storageGroup, List<Integer> deviceGroupIDs) {
- return schemaPartition.filterNoAssignDeviceGroupId(storageGroup, deviceGroupIDs);
+ public Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>
+ filterNoAssignedDataPartitionSlots(
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> result;
+ dataPartitionReadWriteLock.readLock().lock();
+ try {
+ result = dataPartition.filterNoAssignedDataPartitionSlots(partitionSlotsMap);
+ } finally {
+ dataPartitionReadWriteLock.readLock().unlock();
+ }
+ return result;
}
@TestOnly
public void clear() {
- if (schemaPartition.getSchemaPartition() != null) {
- schemaPartition.getSchemaPartition().clear();
+ if (schemaPartition.getSchemaPartitionMap() != null) {
+ schemaPartition.getSchemaPartitionMap().clear();
}
if (dataPartition.getDataPartitionMap() != null) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
index 712c64e496..e7f429b566 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/RegionInfoPersistence.java
@@ -20,15 +20,14 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
-import org.apache.iotdb.confignode.partition.DataRegionInfo;
-import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -37,148 +36,159 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
/** manage data partition and schema partition */
public class RegionInfoPersistence {
- /** partition read write lock */
- private final ReentrantReadWriteLock partitionReadWriteLock;
-
// TODO: Serialize and Deserialize
- // storageGroupName -> StorageGroupSchema
+ // Map<StorageGroupName, StorageGroupSchema>
private final Map<String, StorageGroupSchema> storageGroupsMap;
+ // Region allocate lock
+ private final ReentrantReadWriteLock regionAllocateLock;
// TODO: Serialize and Deserialize
- private int nextSchemaRegionGroup = 0;
- // TODO: Serialize and Deserialize
- private int nextDataRegionGroup = 0;
+ private int nextRegionGroupId = 0;
- // TODO: Serialize and Deserialize
- private final SchemaRegionInfo schemaRegion;
-
- // TODO: Serialize and Deserialize
- private final DataRegionInfo dataRegion;
+ // Region read write lock
+ private final ReentrantReadWriteLock regionReadWriteLock;
+ // Map<ConsensusGroupId, RegionReplicaSet>
+ private final Map<ConsensusGroupId, RegionReplicaSet> regionMap;
public RegionInfoPersistence() {
- this.partitionReadWriteLock = new ReentrantReadWriteLock();
+ this.regionAllocateLock = new ReentrantReadWriteLock();
+ this.regionReadWriteLock = new ReentrantReadWriteLock();
this.storageGroupsMap = new HashMap<>();
- this.schemaRegion = new SchemaRegionInfo();
- this.dataRegion = new DataRegionInfo();
+ this.regionMap = new HashMap<>();
}
/**
- * 1. region allocation 2. add to storage group map
+ * Persistence new StorageGroupSchema
*
* @param plan SetStorageGroupPlan
- * @return TSStatusCode.SUCCESS_STATUS if region allocate
+ * @return SUCCESS_STATUS
*/
public TSStatus setStorageGroup(SetStorageGroupPlan plan) {
TSStatus result;
- partitionReadWriteLock.writeLock().lock();
+ regionReadWriteLock.writeLock().lock();
try {
- if (storageGroupsMap.containsKey(plan.getSchema().getName())) {
- result = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- result.setMessage(
- String.format("StorageGroup %s is already set.", plan.getSchema().getName()));
- } else {
- StorageGroupSchema schema = new StorageGroupSchema(plan.getSchema().getName());
- storageGroupsMap.put(schema.getName(), schema);
-
- plan.getSchemaRegionInfo()
- .getSchemaRegionDataNodesMap()
- .entrySet()
- .forEach(
- entity -> {
- schemaRegion.addSchemaRegion(entity.getKey(), entity.getValue());
- entity
- .getValue()
- .forEach(
- dataNodeId -> {
- DataNodeInfoPersistence.getInstance()
- .addSchemaRegionGroup(dataNodeId, entity.getKey());
- });
- });
-
- plan.getDataRegionInfo()
- .getDataRegionDataNodesMap()
- .entrySet()
- .forEach(
- entity -> {
- dataRegion.createDataRegion(entity.getKey(), entity.getValue());
- entity
- .getValue()
- .forEach(
- dataNodeId -> {
- DataNodeInfoPersistence.getInstance()
- .addDataRegionGroup(dataNodeId, entity.getKey());
- });
- });
-
- result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- }
+ StorageGroupSchema schema = plan.getSchema();
+ storageGroupsMap.put(schema.getName(), schema);
+ result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
- partitionReadWriteLock.writeLock().unlock();
+ regionReadWriteLock.writeLock().unlock();
}
return result;
}
public StorageGroupSchemaDataSet getStorageGroupSchema() {
StorageGroupSchemaDataSet result = new StorageGroupSchemaDataSet();
- partitionReadWriteLock.readLock().lock();
+ regionReadWriteLock.readLock().lock();
try {
result.setSchemaList(new ArrayList<>(storageGroupsMap.values()));
} finally {
- partitionReadWriteLock.readLock().unlock();
+ regionReadWriteLock.readLock().unlock();
+ result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+ return result;
+ }
+
+ /**
+ * Persistence allocation result of new Regions
+ *
+ * @param plan CreateRegionsPlan
+ * @return SUCCESS_STATUS
+ */
+ public TSStatus createRegions(CreateRegionsPlan plan) {
+ TSStatus result;
+ regionReadWriteLock.writeLock().lock();
+ regionAllocateLock.writeLock().lock();
+ try {
+ StorageGroupSchema schema = storageGroupsMap.get(plan.getStorageGroup());
+
+ for (RegionReplicaSet regionReplicaSet : plan.getRegionReplicaSets()) {
+ nextRegionGroupId = Math.max(nextRegionGroupId, regionReplicaSet.getId().getId());
+ regionMap.put(regionReplicaSet.getId(), regionReplicaSet);
+ if (regionReplicaSet.getId() instanceof DataRegionId) {
+ schema.addDataRegionGroup(regionReplicaSet.getId());
+ } else if (regionReplicaSet.getId() instanceof SchemaRegionId) {
+ schema.addSchemaRegionGroup(regionReplicaSet.getId());
+ }
+ }
+
+ result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ } finally {
+ regionAllocateLock.writeLock().unlock();
+ regionReadWriteLock.writeLock().unlock();
}
return result;
}
- /** @return key is schema region id, value is endpoint list */
- public List<RegionReplicaSet> getSchemaRegionEndPoint() {
+ /** @return The SchemaRegion ReplicaSets in the specific StorageGroup */
+ public List<RegionReplicaSet> getSchemaRegionEndPoint(String storageGroup) {
List<RegionReplicaSet> schemaRegionEndPoints = new ArrayList<>();
+ regionReadWriteLock.readLock().lock();
+ try {
+ if (storageGroupsMap.containsKey(storageGroup)) {
+ List<ConsensusGroupId> schemaRegionIds =
+ storageGroupsMap.get(storageGroup).getSchemaRegionGroupIds();
+ for (ConsensusGroupId consensusGroupId : schemaRegionIds) {
+ schemaRegionEndPoints.add(regionMap.get(consensusGroupId));
+ }
+ }
+ } finally {
+ regionReadWriteLock.readLock().unlock();
+ }
- schemaRegion
- .getSchemaRegionDataNodesMap()
- .entrySet()
- .forEach(
- entity -> {
- RegionReplicaSet schemaRegionReplicaSet = new RegionReplicaSet();
- List<Endpoint> endPoints = new ArrayList<>();
- entity
- .getValue()
- .forEach(
- dataNodeId -> {
- if (DataNodeInfoPersistence.getInstance()
- .getOnlineDataNodes()
- .containsKey(dataNodeId)) {
- endPoints.add(
- DataNodeInfoPersistence.getInstance()
- .getOnlineDataNodes()
- .get(dataNodeId)
- .getEndPoint());
- }
- });
- schemaRegionReplicaSet.setId(new SchemaRegionId(entity.getKey()));
- // TODO: (xingtanzjr) We cannot get the dataNodeId here, use 0 as the placeholder
- schemaRegionReplicaSet.setDataNodeList(
- endPoints.stream()
- .map(endpoint -> new DataNodeLocation(0, endpoint))
- .collect(Collectors.toList()));
- schemaRegionEndPoints.add(schemaRegionReplicaSet);
- });
return schemaRegionEndPoints;
}
+ /** @return The DataRegion ReplicaSets in the specific StorageGroup */
+ public List<RegionReplicaSet> getDataRegionEndPoint(String storageGroup) {
+ List<RegionReplicaSet> dataRegionEndPoints = new ArrayList<>();
+ regionReadWriteLock.readLock().lock();
+ try {
+ if (storageGroupsMap.containsKey(storageGroup)) {
+ List<ConsensusGroupId> dataRegionIds =
+ storageGroupsMap.get(storageGroup).getDataRegionGroupIds();
+ for (ConsensusGroupId consensusGroupId : dataRegionIds) {
+ dataRegionEndPoints.add(regionMap.get(consensusGroupId));
+ }
+ }
+ } finally {
+ regionReadWriteLock.readLock().unlock();
+ }
+
+ return dataRegionEndPoints;
+ }
+
+ public int generateNextRegionGroupId() {
+ int result;
+ regionAllocateLock.writeLock().lock();
+ try {
+ result = nextRegionGroupId;
+ nextRegionGroupId += 1;
+ } finally {
+ regionAllocateLock.writeLock().unlock();
+ }
+ return result;
+ }
+
public boolean containsStorageGroup(String storageName) {
- return storageGroupsMap.containsKey(storageName);
+ boolean result;
+ regionReadWriteLock.readLock().lock();
+ try {
+ result = storageGroupsMap.containsKey(storageName);
+ } finally {
+ regionReadWriteLock.readLock().unlock();
+ }
+ return result;
}
@TestOnly
public void clear() {
+ nextRegionGroupId = 0;
storageGroupsMap.clear();
- schemaRegion.getSchemaRegionDataNodesMap().clear();
- dataRegion.getDataRegionDataNodesMap().clear();
+ regionMap.clear();
}
private static class RegionInfoPersistenceHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
index 1c71cf70ff..957e26d52e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlan.java
@@ -18,12 +18,15 @@
*/
package org.apache.iotdb.confignode.physical;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateSchemaPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.QueryStorageGroupSchemaPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -98,17 +101,26 @@ public abstract class PhysicalPlan implements IConsensusRequest {
case QueryStorageGroupSchema:
plan = new QueryStorageGroupSchemaPlan();
break;
- case QueryDataPartition:
- plan = new DataPartitionPlan(PhysicalPlanType.QueryDataPartition);
+ case CreateRegions:
+ plan = new CreateRegionsPlan();
break;
- case ApplyDataPartition:
- plan = new DataPartitionPlan(PhysicalPlanType.ApplyDataPartition);
+ case GetSchemaPartition:
+ plan = new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetSchemaPartition);
break;
- case QuerySchemaPartition:
- plan = new SchemaPartitionPlan(PhysicalPlanType.QuerySchemaPartition);
+ case CreateSchemaPartition:
+ plan = new CreateSchemaPartitionPlan(PhysicalPlanType.CreateSchemaPartition);
break;
- case ApplySchemaPartition:
- plan = new SchemaPartitionPlan(PhysicalPlanType.ApplySchemaPartition);
+ case GetOrCreateSchemaPartition:
+ plan = new GetOrCreateSchemaPartitionPlan(PhysicalPlanType.GetOrCreateSchemaPartition);
+ break;
+ case GetDataPartition:
+ plan = new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
+ break;
+ case CreateDataPartition:
+ plan = new CreateDataPartitionPlan();
+ break;
+ case GetOrCreateDataPartition:
+ plan = new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetOrCreateDataPartition);
break;
case LIST_USER:
case LIST_ROLE:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
index 540bf0bdd6..f047dd9723 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/PhysicalPlanType.java
@@ -24,11 +24,13 @@ public enum PhysicalPlanType {
SetStorageGroup,
DeleteStorageGroup,
QueryStorageGroupSchema,
- CreateRegion,
- QueryDataPartition,
- ApplyDataPartition,
- QuerySchemaPartition,
- ApplySchemaPartition,
+ CreateRegions,
+ GetSchemaPartition,
+ CreateSchemaPartition,
+ GetOrCreateSchemaPartition,
+ GetDataPartition,
+ CreateDataPartition,
+ GetOrCreateDataPartition,
AUTHOR,
CREATE_USER,
CREATE_ROLE,
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
new file mode 100644
index 0000000000..d2bb602e09
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateDataPartitionPlan.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.crud;
+
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateDataPartitionPlan extends PhysicalPlan {
+
+ private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ assignedDataPartition;
+
+ public CreateDataPartitionPlan() {
+ super(PhysicalPlanType.CreateDataPartition);
+ }
+
+ public Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ getAssignedDataPartition() {
+ return assignedDataPartition;
+ }
+
+ public void setAssignedDataPartition(
+ Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ assignedDataPartition) {
+ this.assignedDataPartition = assignedDataPartition;
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(PhysicalPlanType.CreateDataPartition.ordinal());
+
+ buffer.putInt(assignedDataPartition.size());
+ for (Map.Entry<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ seriesPartitionTimePartitionEntry : assignedDataPartition.entrySet()) {
+ SerializeDeserializeUtil.write(seriesPartitionTimePartitionEntry.getKey(), buffer);
+ buffer.putInt(seriesPartitionTimePartitionEntry.getValue().size());
+ for (Map.Entry<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+ timePartitionEntry : seriesPartitionTimePartitionEntry.getValue().entrySet()) {
+ timePartitionEntry.getKey().serializeImpl(buffer);
+ buffer.putInt(timePartitionEntry.getValue().size());
+ for (Map.Entry<TimePartitionSlot, List<RegionReplicaSet>> regionReplicaSetEntry :
+ timePartitionEntry.getValue().entrySet()) {
+ regionReplicaSetEntry.getKey().serializeImpl(buffer);
+ buffer.putInt(regionReplicaSetEntry.getValue().size());
+ for (RegionReplicaSet regionReplicaSet : regionReplicaSetEntry.getValue()) {
+ regionReplicaSet.serializeImpl(buffer);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ assignedDataPartition = new HashMap<>();
+ int storageGroupNum = buffer.getInt();
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroupName = SerializeDeserializeUtil.readString(buffer);
+ assignedDataPartition.put(storageGroupName, new HashMap<>());
+ int seriesPartitionSlotNum = buffer.getInt();
+ for (int j = 0; j < seriesPartitionSlotNum; j++) {
+ SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot();
+ seriesPartitionSlot.deserializeImpl(buffer);
+ assignedDataPartition.get(storageGroupName).put(seriesPartitionSlot, new HashMap<>());
+ int timePartitionSlotNum = buffer.getInt();
+ for (int k = 0; k < timePartitionSlotNum; k++) {
+ TimePartitionSlot timePartitionSlot = new TimePartitionSlot();
+ timePartitionSlot.deserializeImpl(buffer);
+ assignedDataPartition
+ .get(storageGroupName)
+ .get(seriesPartitionSlot)
+ .put(timePartitionSlot, new ArrayList<>());
+ int regionReplicaSetNum = buffer.getInt();
+ for (int l = 0; l < regionReplicaSetNum; l++) {
+ RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+ regionReplicaSet.deserializeImpl(buffer);
+ assignedDataPartition
+ .get(storageGroupName)
+ .get(seriesPartitionSlot)
+ .get(timePartitionSlot)
+ .add(regionReplicaSet);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CreateDataPartitionPlan that = (CreateDataPartitionPlan) o;
+ return assignedDataPartition.equals(that.assignedDataPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(assignedDataPartition);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
new file mode 100644
index 0000000000..d61a8bb702
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateRegionsPlan.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.crud;
+
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class CreateRegionsPlan extends PhysicalPlan {
+
+ private String storageGroup;
+
+ private final List<RegionReplicaSet> regionReplicaSets;
+
+ public CreateRegionsPlan() {
+ super(PhysicalPlanType.CreateRegions);
+ this.regionReplicaSets = new ArrayList<>();
+ }
+
+ public String getStorageGroup() {
+ return storageGroup;
+ }
+
+ public void setStorageGroup(String storageGroup) {
+ this.storageGroup = storageGroup;
+ }
+
+ public void addRegion(RegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSets.add(regionReplicaSet);
+ }
+
+ public List<RegionReplicaSet> getRegionReplicaSets() {
+ return regionReplicaSets;
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(PhysicalPlanType.CreateRegions.ordinal());
+
+ SerializeDeserializeUtil.write(storageGroup, buffer);
+
+ buffer.putInt(regionReplicaSets.size());
+ for (RegionReplicaSet regionReplicaSet : regionReplicaSets) {
+ regionReplicaSet.serializeImpl(buffer);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+ storageGroup = SerializeDeserializeUtil.readString(buffer);
+
+ int length = buffer.getInt();
+ for (int i = 0; i < length; i++) {
+ RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+ regionReplicaSet.deserializeImpl(buffer);
+ regionReplicaSets.add(regionReplicaSet);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CreateRegionsPlan that = (CreateRegionsPlan) o;
+ return storageGroup.equals(that.storageGroup)
+ && regionReplicaSets.equals(that.regionReplicaSets);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(storageGroup, regionReplicaSets);
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
similarity index 58%
copy from node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
index 6521d2890b..15b5b2ec54 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/CreateSchemaPartitionPlan.java
@@ -16,25 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.confignode.physical.crud;
-public class JSHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
- private static final int base = 1315423911;
+import java.io.IOException;
+import java.nio.ByteBuffer;
- public JSHashExecutor(int deviceGroupCount) {
- super(deviceGroupCount);
+/** TODO: Reconstruct this interface after PatterTree is moved to node-commons */
+public class CreateSchemaPartitionPlan extends PhysicalPlan {
+
+ public CreateSchemaPartitionPlan(PhysicalPlanType type) {
+ super(type);
}
@Override
- public int getDeviceGroupID(String device) {
- int hash = base;
-
- for (int i = 0; i < device.length(); i++) {
- hash ^= ((hash << 5) + (int) device.charAt(i) + (hash >> 2));
- }
- hash &= Integer.MAX_VALUE;
+ protected void serializeImpl(ByteBuffer buffer) {}
- return hash % deviceGroupCount;
- }
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
new file mode 100644
index 0000000000..5cd6aec141
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateDataPartitionPlan.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical.crud;
+
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Query or apply DataPartition by the specific storageGroup and the deviceGroupStartTimeMap. */
+public class GetOrCreateDataPartitionPlan extends PhysicalPlan {
+
+ private Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap;
+
+ public GetOrCreateDataPartitionPlan(PhysicalPlanType physicalPlanType) {
+ super(physicalPlanType);
+ }
+
+ public Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> getPartitionSlotsMap() {
+ return partitionSlotsMap;
+ }
+
+ public void setPartitionSlotsMap(
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+ this.partitionSlotsMap = partitionSlotsMap;
+ }
+
+ /**
+ * Convert TDataPartitionReq to DataPartitionPlan
+ *
+ * @param req TDataPartitionReq
+ */
+ public void convertFromRpcTDataPartitionReq(TDataPartitionReq req) {
+ partitionSlotsMap = new HashMap<>();
+ req.getPartitionSlotsMap()
+ .forEach(
+ ((storageGroup, tSeriesPartitionTimePartitionSlots) -> {
+ // Extract StorageGroupName
+ partitionSlotsMap.putIfAbsent(storageGroup, new HashMap<>());
+
+ tSeriesPartitionTimePartitionSlots.forEach(
+ ((tSeriesPartitionSlot, tTimePartitionSlots) -> {
+ // Extract SeriesPartitionSlot
+ SeriesPartitionSlot seriesPartitionSlot =
+ new SeriesPartitionSlot(tSeriesPartitionSlot.getSlotId());
+ partitionSlotsMap
+ .get(storageGroup)
+ .putIfAbsent(seriesPartitionSlot, new ArrayList<>());
+
+ // Extract TimePartitionSlots
+ tTimePartitionSlots.forEach(
+ tTimePartitionSlot ->
+ partitionSlotsMap
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .add(new TimePartitionSlot(tTimePartitionSlot.getStartTime())));
+ }));
+ }));
+ }
+
+ @Override
+ protected void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(PhysicalPlanType.GetDataPartition.ordinal());
+
+ buffer.putInt(partitionSlotsMap.size());
+ partitionSlotsMap.forEach(
+ ((storageGroup, seriesPartitionTimePartitionSlots) -> {
+ SerializeDeserializeUtil.write(storageGroup, buffer);
+ buffer.putInt(seriesPartitionTimePartitionSlots.size());
+ seriesPartitionTimePartitionSlots.forEach(
+ ((seriesPartitionSlot, timePartitionSlots) -> {
+ seriesPartitionSlot.serializeImpl(buffer);
+ buffer.putInt(timePartitionSlots.size());
+ timePartitionSlots.forEach(
+ timePartitionSlot -> timePartitionSlot.serializeImpl(buffer));
+ }));
+ }));
+ }
+
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) {
+ partitionSlotsMap = new HashMap<>();
+ int storageGroupNum = buffer.getInt();
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = SerializeDeserializeUtil.readString(buffer);
+ partitionSlotsMap.put(storageGroup, new HashMap<>());
+ int seriesPartitionSlotNum = buffer.getInt();
+ for (int j = 0; j < seriesPartitionSlotNum; j++) {
+ SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot();
+ seriesPartitionSlot.deserializeImpl(buffer);
+ partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+ int timePartitionSlotNum = buffer.getInt();
+ for (int k = 0; k < timePartitionSlotNum; k++) {
+ TimePartitionSlot timePartitionSlot = new TimePartitionSlot();
+ timePartitionSlot.deserializeImpl(buffer);
+ partitionSlotsMap.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ GetOrCreateDataPartitionPlan that = (GetOrCreateDataPartitionPlan) o;
+ return partitionSlotsMap.equals(that.partitionSlotsMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitionSlotsMap);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
similarity index 58%
rename from confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
index 67fbc2353a..642aceed5c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SchemaPartitionPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/crud/GetOrCreateSchemaPartitionPlan.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.physical.sys;
+package org.apache.iotdb.confignode.physical.crud;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
@@ -29,40 +29,41 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-/** Get DataNodeInfo by the specific DataNode's id. And return all when dataNodeID is set to -1. */
-public class SchemaPartitionPlan extends PhysicalPlan {
+/** Query or apply SchemaPartition by the specific storageGroup and the deviceGroupStartTimeMap. */
+public class GetOrCreateSchemaPartitionPlan extends PhysicalPlan {
private String storageGroup;
- private List<Integer> deviceGroupIDs;
- private Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets;
+ private List<Integer> seriesPartitionSlots;
+ private Map<Integer, RegionReplicaSet> schemaPartitionReplicaSets;
- public SchemaPartitionPlan(PhysicalPlanType physicalPlanType) {
+ public GetOrCreateSchemaPartitionPlan(PhysicalPlanType physicalPlanType) {
super(physicalPlanType);
}
- public SchemaPartitionPlan(
- PhysicalPlanType physicalPlanType, String storageGroup, List<Integer> deviceGroupIDs) {
+ public GetOrCreateSchemaPartitionPlan(
+ PhysicalPlanType physicalPlanType, String storageGroup, List<Integer> seriesPartitionSlots) {
this(physicalPlanType);
this.storageGroup = storageGroup;
- this.deviceGroupIDs = deviceGroupIDs;
+ this.seriesPartitionSlots = seriesPartitionSlots;
}
- public void setDeviceGroupIdReplicaSet(Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets) {
- this.deviceGroupIdReplicaSets = deviceGroupIdReplicaSets;
+ public void setSchemaPartitionReplicaSet(
+ Map<Integer, RegionReplicaSet> deviceGroupIdReplicaSets) {
+ this.schemaPartitionReplicaSets = deviceGroupIdReplicaSets;
}
- public Map<Integer, RegionReplicaSet> getDeviceGroupIdReplicaSets() {
- return deviceGroupIdReplicaSets;
+ public Map<Integer, RegionReplicaSet> getSchemaPartitionReplicaSets() {
+ return schemaPartitionReplicaSets;
}
@Override
protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.QueryDataPartition.ordinal());
+ buffer.putInt(PhysicalPlanType.GetDataPartition.ordinal());
SerializeDeserializeUtil.write(storageGroup, buffer);
- buffer.putInt(deviceGroupIDs.size());
- deviceGroupIDs.forEach(id -> SerializeDeserializeUtil.write(id, buffer));
+ buffer.putInt(seriesPartitionSlots.size());
+ seriesPartitionSlots.forEach(id -> SerializeDeserializeUtil.write(id, buffer));
- buffer.putInt(deviceGroupIdReplicaSets.size());
- for (Map.Entry<Integer, RegionReplicaSet> entry : deviceGroupIdReplicaSets.entrySet()) {
+ buffer.putInt(schemaPartitionReplicaSets.size());
+ for (Map.Entry<Integer, RegionReplicaSet> entry : schemaPartitionReplicaSets.entrySet()) {
buffer.putInt(entry.getKey());
entry.getValue().serializeImpl(buffer);
}
@@ -73,18 +74,17 @@ public class SchemaPartitionPlan extends PhysicalPlan {
storageGroup = SerializeDeserializeUtil.readString(buffer);
int idSize = SerializeDeserializeUtil.readInt(buffer);
for (int i = 0; i < idSize; i++) {
- deviceGroupIDs.add(SerializeDeserializeUtil.readInt(buffer));
+ seriesPartitionSlots.add(SerializeDeserializeUtil.readInt(buffer));
}
- if (deviceGroupIdReplicaSets == null) {
- deviceGroupIdReplicaSets = new HashMap<>();
+ if (schemaPartitionReplicaSets == null) {
+ schemaPartitionReplicaSets = new HashMap<>();
}
int size = buffer.getInt();
-
for (int i = 0; i < size; i++) {
RegionReplicaSet schemaRegionReplicaSet = new RegionReplicaSet();
schemaRegionReplicaSet.deserializeImpl(buffer);
- deviceGroupIdReplicaSets.put(buffer.getInt(), schemaRegionReplicaSet);
+ schemaPartitionReplicaSets.put(buffer.getInt(), schemaRegionReplicaSet);
}
}
@@ -92,7 +92,7 @@ public class SchemaPartitionPlan extends PhysicalPlan {
return storageGroup;
}
- public List<Integer> getDeviceGroupIDs() {
- return deviceGroupIDs;
+ public List<Integer> getSeriesPartitionSlots() {
+ return seriesPartitionSlots;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
deleted file mode 100644
index 556944ecea..0000000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/DataPartitionPlan.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.physical.sys;
-
-import org.apache.iotdb.confignode.physical.PhysicalPlan;
-import org.apache.iotdb.confignode.physical.PhysicalPlanType;
-import org.apache.iotdb.confignode.util.SerializeDeserializeUtil;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-/**
- * query DataPartition or apply DataPartition by the specific storageGroup and
- * deviceGroupStartTimeMap.
- */
-public class DataPartitionPlan extends PhysicalPlan {
- private String storageGroup;
- private Map<Integer, List<Integer>> deviceGroupStartTimeMap;
-
- public DataPartitionPlan(PhysicalPlanType physicalPlanType) {
- super(physicalPlanType);
- }
-
- public DataPartitionPlan(
- PhysicalPlanType physicalPlanType,
- String storageGroup,
- Map<Integer, List<Integer>> deviceGroupStartTimeMap) {
- this(physicalPlanType);
- this.storageGroup = storageGroup;
- this.deviceGroupStartTimeMap = deviceGroupStartTimeMap;
- }
-
- public String getStorageGroup() {
- return storageGroup;
- }
-
- public void setStorageGroup(String storageGroup) {
- this.storageGroup = storageGroup;
- }
-
- public Map<Integer, List<Integer>> getDeviceGroupIDs() {
- return deviceGroupStartTimeMap;
- }
-
- public void setDeviceGroupIDs(Map<Integer, List<Integer>> deviceGroupIDs) {
- this.deviceGroupStartTimeMap = deviceGroupIDs;
- }
-
- @Override
- protected void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(PhysicalPlanType.QueryDataPartition.ordinal());
- SerializeDeserializeUtil.write(storageGroup, buffer);
- SerializeDeserializeUtil.writeIntMapLists(deviceGroupStartTimeMap, buffer);
- }
-
- @Override
- protected void deserializeImpl(ByteBuffer buffer) {
- storageGroup = SerializeDeserializeUtil.readString(buffer);
- deviceGroupStartTimeMap = SerializeDeserializeUtil.readIntMapLists(buffer);
- }
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java
index 95a2c8df8a..503fae05ca 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/QueryDataNodeInfoPlan.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
import java.nio.ByteBuffer;
+import java.util.Objects;
/** Get DataNodeInfo by the specific DataNode's id. And return all when dataNodeID is set to -1. */
public class QueryDataNodeInfoPlan extends PhysicalPlan {
@@ -51,4 +52,17 @@ public class QueryDataNodeInfoPlan extends PhysicalPlan {
protected void deserializeImpl(ByteBuffer buffer) {
this.dataNodeID = buffer.getInt();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ QueryDataNodeInfoPlan that = (QueryDataNodeInfoPlan) o;
+ return dataNodeID == that.dataNodeID;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dataNodeID);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
index eb5cc83a74..c4c3fd5d52 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/RegisterDataNodePlan.java
@@ -18,12 +18,13 @@
*/
package org.apache.iotdb.confignode.physical.sys;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class RegisterDataNodePlan extends PhysicalPlan {
@@ -33,9 +34,9 @@ public class RegisterDataNodePlan extends PhysicalPlan {
super(PhysicalPlanType.RegisterDataNode);
}
- public RegisterDataNodePlan(int dataNodeID, Endpoint endpoint) {
+ public RegisterDataNodePlan(DataNodeLocation info) {
this();
- this.info = new DataNodeLocation(dataNodeID, endpoint);
+ this.info = info;
}
public DataNodeLocation getInfo() {
@@ -45,7 +46,7 @@ public class RegisterDataNodePlan extends PhysicalPlan {
@Override
protected void serializeImpl(ByteBuffer buffer) {
buffer.putInt(PhysicalPlanType.RegisterDataNode.ordinal());
- buffer.putInt(info.getDataNodeID());
+ buffer.putInt(info.getDataNodeId());
buffer.putInt(info.getEndPoint().getIp().length());
buffer.put(info.getEndPoint().getIp().getBytes());
buffer.putInt(info.getEndPoint().getPort());
@@ -62,4 +63,17 @@ public class RegisterDataNodePlan extends PhysicalPlan {
this.info = new DataNodeLocation(dataNodeID, new Endpoint(ip, port));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RegisterDataNodePlan plan = (RegisterDataNodePlan) o;
+ return info.equals(plan.info);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(info);
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
index bf7a7e9ac0..865829592d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/physical/sys/SetStorageGroupPlan.java
@@ -18,22 +18,18 @@
*/
package org.apache.iotdb.confignode.physical.sys;
-import org.apache.iotdb.confignode.partition.DataRegionInfo;
-import org.apache.iotdb.confignode.partition.SchemaRegionInfo;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Objects;
public class SetStorageGroupPlan extends PhysicalPlan {
private StorageGroupSchema schema;
- private SchemaRegionInfo schemaRegionInfo;
-
- private DataRegionInfo dataRegionInfo;
-
public SetStorageGroupPlan() {
super(PhysicalPlanType.SetStorageGroup);
this.schema = new StorageGroupSchema();
@@ -52,34 +48,27 @@ public class SetStorageGroupPlan extends PhysicalPlan {
this.schema = schema;
}
- public SchemaRegionInfo getSchemaRegionInfo() {
- return schemaRegionInfo;
- }
-
- public void setSchemaRegionInfo(SchemaRegionInfo schemaRegionInfo) {
- this.schemaRegionInfo = schemaRegionInfo;
- }
-
- public DataRegionInfo getDataRegionInfo() {
- return dataRegionInfo;
- }
-
- public void setDataRegionInfo(DataRegionInfo dataRegionInfo) {
- this.dataRegionInfo = dataRegionInfo;
- }
-
@Override
protected void serializeImpl(ByteBuffer buffer) {
buffer.putInt(PhysicalPlanType.SetStorageGroup.ordinal());
schema.serialize(buffer);
- schemaRegionInfo.serializeImpl(buffer);
- dataRegionInfo.serializeImpl(buffer);
}
@Override
- protected void deserializeImpl(ByteBuffer buffer) {
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
schema.deserialize(buffer);
- schemaRegionInfo.deserializeImpl(buffer);
- dataRegionInfo.deserializeImpl(buffer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SetStorageGroupPlan that = (SetStorageGroupPlan) o;
+ return schema.equals(that.schema);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schema);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
index b32f40edf0..53c95585e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/executor/PlanExecutor.java
@@ -25,11 +25,13 @@ import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
import org.apache.iotdb.confignode.physical.PhysicalPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
-import org.apache.iotdb.confignode.physical.sys.DataPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.auth.AuthException;
@@ -58,14 +60,12 @@ public class PlanExecutor {
return dataNodeInfoPersistence.getDataNodeInfo((QueryDataNodeInfoPlan) plan);
case QueryStorageGroupSchema:
return regionInfoPersistence.getStorageGroupSchema();
- case QueryDataPartition:
- return partitionInfoPersistence.getDataPartition((DataPartitionPlan) plan);
- case QuerySchemaPartition:
- return partitionInfoPersistence.getSchemaPartition((SchemaPartitionPlan) plan);
- case ApplySchemaPartition:
- return partitionInfoPersistence.applySchemaPartition((SchemaPartitionPlan) plan);
- case ApplyDataPartition:
- return partitionInfoPersistence.applyDataPartition((DataPartitionPlan) plan);
+ case GetDataPartition:
+ case GetOrCreateDataPartition:
+ return partitionInfoPersistence.getDataPartition((GetOrCreateDataPartitionPlan) plan);
+ case GetSchemaPartition:
+ case GetOrCreateSchemaPartition:
+ return partitionInfoPersistence.getSchemaPartition((GetOrCreateSchemaPartitionPlan) plan);
case LIST_USER:
return authorInfoPersistence.executeListUser((AuthorPlan) plan);
case LIST_ROLE:
@@ -90,6 +90,13 @@ public class PlanExecutor {
return dataNodeInfoPersistence.registerDataNode((RegisterDataNodePlan) plan);
case SetStorageGroup:
return regionInfoPersistence.setStorageGroup((SetStorageGroupPlan) plan);
+ case CreateRegions:
+ return regionInfoPersistence.createRegions((CreateRegionsPlan) plan);
+ case CreateSchemaPartition:
+ return partitionInfoPersistence.createSchemaPartition(
+ (GetOrCreateSchemaPartitionPlan) plan);
+ case CreateDataPartition:
+ return partitionInfoPersistence.createDataPartition((CreateDataPartitionPlan) plan);
case CREATE_USER:
case CREATE_ROLE:
case DROP_USER:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
index 6a7f829627..1e17b4d76e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessor.java
@@ -20,39 +20,34 @@ package org.apache.iotdb.confignode.service.thrift.server;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
+import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
-import org.apache.iotdb.confignode.consensus.response.SchemaPartitionDataSet;
+import org.apache.iotdb.confignode.consensus.response.DataPartitionDataSet;
import org.apache.iotdb.confignode.consensus.response.StorageGroupSchemaDataSet;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.partition.StorageGroupSchema;
import org.apache.iotdb.confignode.physical.PhysicalPlanType;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.AuthorPlan;
import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
-import org.apache.iotdb.confignode.physical.sys.SchemaPartitionPlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
-import org.apache.iotdb.confignode.rpc.thrift.AuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.DataPartitionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.DataPartitionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.DeleteStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
-import org.apache.iotdb.confignode.rpc.thrift.FetchDataPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.FetchPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.FetchSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.GetDataPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.GetSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.PartitionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
-import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -66,6 +61,7 @@ import java.util.Map;
/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
+
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeRPCServerProcessor.class);
private final ConfigManager configManager;
@@ -79,51 +75,48 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
}
@Override
- public DataNodeRegisterResp registerDataNode(DataNodeRegisterReq req) throws TException {
- // TODO: handle exception in consensusLayer
+ public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException {
RegisterDataNodePlan plan =
new RegisterDataNodePlan(
- -1, new Endpoint(req.getEndPoint().getIp(), req.getEndPoint().getPort()));
- TSStatus status = configManager.registerDataNode(plan);
- DataNodeRegisterResp result = new DataNodeRegisterResp();
- result.setRegisterResult(status);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- result.setDataNodeID(Integer.parseInt(status.getMessage()));
- LOGGER.info(
- "Register DataNode successful. DataNodeID: {}, {}",
- status.getMessage(),
- req.getEndPoint().toString());
- } else {
- LOGGER.error("Register DataNode failed. {}", status.getMessage());
- }
- return result;
+ new DataNodeLocation(
+ -1, new Endpoint(req.getEndPoint().getIp(), req.getEndPoint().getPort())));
+ DataNodeConfigurationDataSet dataSet =
+ (DataNodeConfigurationDataSet) configManager.registerDataNode(plan);
+ TDataNodeRegisterResp resp = new TDataNodeRegisterResp();
+ dataSet.convertToRpcDataNodeRegisterResp(resp);
+ LOGGER.info(
+ "Register DataNode successful. DataNodeID: {}, {}",
+ resp.getDataNodeID(),
+ req.getEndPoint().toString());
+ return resp;
}
@Override
- public Map<Integer, DataNodeMessage> getDataNodesMessage(int dataNodeID) throws TException {
+ public TDataNodeMessageResp getDataNodesMessage(int dataNodeID) throws TException {
QueryDataNodeInfoPlan plan = new QueryDataNodeInfoPlan(dataNodeID);
- DataSet dataSet = configManager.getDataNodeInfo(plan);
-
- if (dataSet == null) {
- return new HashMap<>();
- } else {
- Map<Integer, DataNodeMessage> result = new HashMap<>();
- for (DataNodeLocation info : ((DataNodesInfoDataSet) dataSet).getDataNodeList()) {
- result.put(
- info.getDataNodeID(),
- new DataNodeMessage(
- info.getDataNodeID(),
+ DataNodesInfoDataSet dataSet = (DataNodesInfoDataSet) configManager.getDataNodeInfo(plan);
+
+ TDataNodeMessageResp resp = new TDataNodeMessageResp();
+ resp.setStatus(dataSet.getStatus());
+ if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Map<Integer, TDataNodeMessage> msgMap = new HashMap<>();
+ for (DataNodeLocation info : dataSet.getDataNodeList()) {
+ msgMap.put(
+ info.getDataNodeId(),
+ new TDataNodeMessage(
+ info.getDataNodeId(),
new EndPoint(info.getEndPoint().getIp(), info.getEndPoint().getPort())));
+ resp.setDataNodeMessageMap(msgMap);
}
- return result;
}
+
+ return resp;
}
@Override
- public TSStatus setStorageGroup(SetStorageGroupReq req) throws TException {
+ public TSStatus setStorageGroup(TSetStorageGroupReq req) throws TException {
SetStorageGroupPlan plan =
- new SetStorageGroupPlan(
- new org.apache.iotdb.confignode.partition.StorageGroupSchema(req.getStorageGroup()));
+ new SetStorageGroupPlan(new StorageGroupSchema(req.getStorageGroup()));
TSStatus resp = configManager.setStorageGroup(plan);
if (resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -135,32 +128,100 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
}
@Override
- public TSStatus deleteStorageGroup(DeleteStorageGroupReq req) throws TException {
+ public TSStatus deleteStorageGroup(TDeleteStorageGroupReq req) throws TException {
+ // TODO: delete StorageGroup
return null;
}
@Override
- public Map<String, StorageGroupMessage> getStorageGroupsMessage() throws TException {
- DataSet dataSet = configManager.getStorageGroupSchema();
-
- if (dataSet == null) {
- return new HashMap<>();
- } else {
- Map<String, StorageGroupMessage> result = new HashMap<>();
- for (StorageGroupSchema schema : ((StorageGroupSchemaDataSet) dataSet).getSchemaList()) {
- result.put(schema.getName(), new StorageGroupMessage(schema.getName()));
+ public TStorageGroupMessageResp getStorageGroupsMessage() throws TException {
+ StorageGroupSchemaDataSet dataSet =
+ (StorageGroupSchemaDataSet) configManager.getStorageGroupSchema();
+
+ TStorageGroupMessageResp resp = new TStorageGroupMessageResp();
+ resp.setStatus(dataSet.getStatus());
+ if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Map<String, TStorageGroupMessage> storageGroupMessageMap = new HashMap<>();
+ for (StorageGroupSchema schema : dataSet.getSchemaList()) {
+ storageGroupMessageMap.put(schema.getName(), new TStorageGroupMessage(schema.getName()));
}
- return result;
+ resp.setStorageGroupMessageMap(storageGroupMessageMap);
}
+
+ return resp;
}
@Override
- public DeviceGroupHashInfo getDeviceGroupHashInfo() throws TException {
- return configManager.getDeviceGroupHashInfo();
+ public TSchemaPartitionResp getSchemaPartition(TSchemaPartitionReq req) throws TException {
+ // TODO: Get SchemaPartition by specific PatternTree
+
+ // SchemaPartitionPlan querySchemaPartitionPlan =
+ // new SchemaPartitionPlan(
+ // PhysicalPlanType.QuerySchemaPartition, req.getStorageGroup(),
+ // req.getDeviceGroupIDs());
+ // DataSet dataSet = configManager.getSchemaPartition(querySchemaPartitionPlan);
+ // return ((SchemaPartitionDataSet) dataSet).convertRpcSchemaPartitionInfo();
+ return null;
+ }
+
+ @Override
+ public TSchemaPartitionResp getOrCreateSchemaPartition(TSchemaPartitionReq req)
+ throws TException {
+ // TODO: Get or create SchemaPartition by specific PatternTree
+
+ // SchemaPartitionPlan applySchemaPartitionPlan =
+ // new SchemaPartitionPlan(
+ // PhysicalPlanType.ApplySchemaPartition,
+ // req.getStorageGroup(),
+ // req.getSeriesPartitionSlots());
+ // SchemaPartitionDataSet dataSet =
+ // (SchemaPartitionDataSet) configManager.applySchemaPartition(applySchemaPartitionPlan);
+ //
+ // TSchemaPartitionResp resp = new TSchemaPartitionResp();
+ // resp.setStatus(dataSet.getStatus());
+ // if (dataSet.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ // dataSet.convertToRpcSchemaPartitionResp(resp);
+ // }
+ // return resp;
+ return null;
+ }
+
+ @Override
+ public TDataPartitionResp getDataPartition(TDataPartitionReq req) throws TException {
+ GetOrCreateDataPartitionPlan getDataPartitionPlan =
+ new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
+ getDataPartitionPlan.convertFromRpcTDataPartitionReq(req);
+ DataPartitionDataSet dataset =
+ (DataPartitionDataSet) configManager.getDataPartition(getDataPartitionPlan);
+
+ TDataPartitionResp resp = new TDataPartitionResp();
+ resp.setStatus(dataset.getStatus());
+ if (dataset.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataset.convertToRpcDataPartitionResp(resp);
+ }
+
+ return resp;
+ }
+
+ @Override
+ public TDataPartitionResp getOrCreateDataPartition(TDataPartitionReq req) throws TException {
+ GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan =
+ new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetOrCreateDataPartition);
+ getOrCreateDataPartitionPlan.convertFromRpcTDataPartitionReq(req);
+ DataPartitionDataSet dataset =
+ (DataPartitionDataSet) configManager.getOrCreateDataPartition(getOrCreateDataPartitionPlan);
+
+ TDataPartitionResp resp = new TDataPartitionResp();
+ resp.setStatus(dataset.getStatus());
+ if (dataset.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ dataset.convertToRpcDataPartitionResp(resp);
+ }
+
+ return resp;
}
@Override
- public TSStatus operatePermission(AuthorizerReq req) throws TException {
+ public TSStatus operatePermission(TAuthorizerReq req) throws TException {
if (req.getAuthorType() < 0 || req.getAuthorType() >= PhysicalPlanType.values().length) {
throw new IndexOutOfBoundsException("Invalid ordinal");
}
@@ -181,55 +242,6 @@ public class ConfigNodeRPCServerProcessor implements ConfigIService.Iface {
return configManager.operatePermission(plan);
}
- @Override
- public DataPartitionInfo applyDataPartition(GetDataPartitionReq req) throws TException {
- return null;
- }
-
- @Override
- public SchemaPartitionInfo applySchemaPartition(GetSchemaPartitionReq req) throws TException {
- SchemaPartitionPlan applySchemaPartitionPlan =
- new SchemaPartitionPlan(
- PhysicalPlanType.ApplySchemaPartition, req.getStorageGroup(), req.getDeviceGroupIDs());
- DataSet dataSet = configManager.applySchemaPartition(applySchemaPartitionPlan);
- ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo();
-
- return SchemaPartitionDataSet.convertRpcSchemaPartition(
- ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo());
- }
-
- @Override
- public SchemaPartitionInfo getSchemaPartition(GetSchemaPartitionReq req) throws TException {
- SchemaPartitionPlan querySchemaPartitionPlan =
- new SchemaPartitionPlan(
- PhysicalPlanType.QuerySchemaPartition, req.getStorageGroup(), req.getDeviceGroupIDs());
- DataSet dataSet = configManager.getSchemaPartition(querySchemaPartitionPlan);
- return SchemaPartitionDataSet.convertRpcSchemaPartition(
- ((SchemaPartitionDataSet) dataSet).getSchemaPartitionInfo());
- }
-
- @Override
- public DataPartitionInfo getDataPartition(GetDataPartitionReq req) throws TException {
-
- return null;
- }
-
- @Override
- public DataPartitionInfoResp fetchDataPartitionInfo(FetchDataPartitionReq req) throws TException {
- return null;
- }
-
- @Override
- public SchemaPartitionInfoResp fetchSchemaPartitionInfo(FetchSchemaPartitionReq req)
- throws TException {
- return null;
- }
-
- @Override
- public PartitionInfoResp fetchPartitionInfo(FetchPartitionReq req) throws TException {
- return null;
- }
-
public void handleClientExit() {}
// TODO: Interfaces for data operations
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
index 09c955246d..90efb510ee 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/RatisConsensusDemo.java
@@ -20,11 +20,12 @@ package org.apache.iotdb.confignode.consensus;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -85,10 +86,9 @@ public class RatisConsensusDemo {
// DataNodes can connect to any ConfigNode and send write requests
for (int i = 0; i < 10; i++) {
EndPoint endPoint = new EndPoint("0.0.0.0", 6667 + i);
- DataNodeRegisterReq req = new DataNodeRegisterReq(endPoint);
- DataNodeRegisterResp resp = clients[0].registerDataNode(req);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+ TDataNodeRegisterReq req = new TDataNodeRegisterReq(endPoint);
+ TDataNodeRegisterResp resp = clients[0].registerDataNode(req);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(i, resp.getDataNodeID());
System.out.printf(
"\nRegister DataNode successful. DataNodeID: %d, %s\n", resp.getDataNodeID(), endPoint);
@@ -103,7 +103,7 @@ public class RatisConsensusDemo {
// DataNodes can connect to any ConfigNode and send read requests
for (int i = 0; i < 3; i++) {
- Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+ TDataNodeMessageResp msgMap = clients[i].getDataNodesMessage(-1);
System.out.printf(
"\nQuery DataNode message from ConfigNode 0.0.0.0:%d. Result: %s\n",
22277 + i * 2, msgMap);
@@ -112,7 +112,7 @@ public class RatisConsensusDemo {
private void setStorageGroups() throws TException, InterruptedException {
for (int i = 0; i < 10; i++) {
- SetStorageGroupReq req = new SetStorageGroupReq("root.sg" + i);
+ TSetStorageGroupReq req = new TSetStorageGroupReq("root.sg" + i);
clients[0].setStorageGroup(req);
System.out.printf("\nSet StorageGroup successful. StorageGroup: %s\n", "root.sg" + i);
TimeUnit.SECONDS.sleep(1);
@@ -124,10 +124,11 @@ public class RatisConsensusDemo {
TimeUnit.SECONDS.sleep(1);
for (int i = 0; i < 3; i++) {
- Map<String, StorageGroupMessage> msgMap = clients[i].getStorageGroupsMessage();
+ TStorageGroupMessageResp msgMap = clients[i].getStorageGroupsMessage();
System.out.printf(
"\nQuery StorageGroup message from ConfigNode 0.0.0.0:%d. Result: {\n", 22277 + i * 2);
- for (Map.Entry<String, StorageGroupMessage> entry : msgMap.entrySet()) {
+ for (Map.Entry<String, TStorageGroupMessage> entry :
+ msgMap.getStorageGroupMessageMap().entrySet()) {
System.out.printf(" Key(%s)=%s\n", entry.getKey(), entry.getValue().toString());
}
System.out.println("}");
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
index cacc2fdd27..b5b5fa2e45 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ConfigManagerManualTest.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -74,10 +74,9 @@ public class ConfigManagerManualTest {
private void registerDataNodes() throws TException {
for (int i = 0; i < 3; i++) {
- DataNodeRegisterReq req = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667 + i));
- DataNodeRegisterResp resp = clients[0].registerDataNode(req);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+ TDataNodeRegisterReq req = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667 + i));
+ TDataNodeRegisterResp resp = clients[0].registerDataNode(req);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(i, resp.getDataNodeID());
}
}
@@ -87,11 +86,12 @@ public class ConfigManagerManualTest {
TimeUnit.SECONDS.sleep(1);
for (int i = 0; i < 3; i++) {
- Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+ Map<Integer, TDataNodeMessage> msgMap =
+ clients[i].getDataNodesMessage(-1).getDataNodeMessageMap();
Assert.assertEquals(3, msgMap.size());
for (int j = 0; j < 3; j++) {
Assert.assertNotNull(msgMap.get(j));
- Assert.assertEquals(j, msgMap.get(j).getDataNodeID());
+ Assert.assertEquals(j, msgMap.get(j).getDataNodeId());
Assert.assertEquals(localhost, msgMap.get(j).getEndPoint().getIp());
Assert.assertEquals(6667 + j, msgMap.get(j).getEndPoint().getPort());
}
@@ -113,17 +113,18 @@ public class ConfigManagerManualTest {
clients[i] = new ConfigIService.Client(new TBinaryProtocol(transport));
}
- DataNodeRegisterResp resp =
- clients[1].registerDataNode(new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6670)));
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.registerResult.getCode());
+ TDataNodeRegisterResp resp =
+ clients[1].registerDataNode(new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6670)));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(3, resp.getDataNodeID());
for (int i = 0; i < 2; i++) {
- Map<Integer, DataNodeMessage> msgMap = clients[i].getDataNodesMessage(-1);
+ Map<Integer, TDataNodeMessage> msgMap =
+ clients[i].getDataNodesMessage(-1).getDataNodeMessageMap();
Assert.assertEquals(4, msgMap.size());
for (int j = 0; j < 4; j++) {
Assert.assertNotNull(msgMap.get(j));
- Assert.assertEquals(j, msgMap.get(j).getDataNodeID());
+ Assert.assertEquals(j, msgMap.get(j).getDataNodeId());
Assert.assertEquals(localhost, msgMap.get(j).getEndPoint().getIp());
Assert.assertEquals(6667 + j, msgMap.get(j).getEndPoint().getPort());
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
index d9f9cbbe2f..1b935f79fb 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/hash/DeviceGroupHashExecutorManualTest.java
@@ -69,7 +69,7 @@ public class DeviceGroupHashExecutorManualTest {
List<String> devices = genBatchDevices();
totalTime -= System.currentTimeMillis();
for (String device : devices) {
- bucket[manager.getDeviceGroupID(device)] += 1;
+ bucket[manager.getSeriesPartitionSlot(device).getSlotId()] += 1;
}
totalTime += System.currentTimeMillis();
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
new file mode 100644
index 0000000000..0f3c55750d
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/physical/SerializeDeserializeUT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.physical;
+
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.confignode.partition.StorageGroupSchema;
+import org.apache.iotdb.confignode.physical.crud.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.crud.CreateRegionsPlan;
+import org.apache.iotdb.confignode.physical.crud.GetOrCreateDataPartitionPlan;
+import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
+import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
+import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SerializeDeserializeUT {
+
+ private final ByteBuffer buffer = ByteBuffer.allocate(10240);
+
+ @After
+ public void cleanBuffer() {
+ buffer.clear();
+ }
+
+ @Test
+ public void RegisterDataNodePlanTest() throws IOException {
+ RegisterDataNodePlan plan0 =
+ new RegisterDataNodePlan(new DataNodeLocation(1, new Endpoint("0.0.0.0", 6667)));
+ plan0.serialize(buffer);
+ RegisterDataNodePlan plan1 = (RegisterDataNodePlan) PhysicalPlan.Factory.create(buffer);
+ Assert.assertEquals(plan0, plan1);
+ }
+
+ @Test
+ public void QueryDataNodeInfoPlanTest() throws IOException {
+ QueryDataNodeInfoPlan plan0 = new QueryDataNodeInfoPlan(-1);
+ plan0.serialize(buffer);
+ QueryDataNodeInfoPlan plan1 = (QueryDataNodeInfoPlan) PhysicalPlan.Factory.create(buffer);
+ Assert.assertEquals(plan0, plan1);
+ }
+
+ @Test
+ public void SetStorageGroupPlanTest() throws IOException {
+ SetStorageGroupPlan plan0 = new SetStorageGroupPlan(new StorageGroupSchema("sg"));
+ plan0.serialize(buffer);
+ SetStorageGroupPlan plan1 = (SetStorageGroupPlan) PhysicalPlan.Factory.create(buffer);
+ Assert.assertEquals(plan0, plan1);
+ }
+
+ @Test
+ public void DeleteStorageGroupPlanTest() {
+ // TODO: Add serialize and deserialize test
+ }
+
+ @Test
+ public void CreateRegionsPlanTest() throws IOException {
+ CreateRegionsPlan plan0 = new CreateRegionsPlan();
+ plan0.setStorageGroup("sg");
+ RegionReplicaSet dataRegionSet = new RegionReplicaSet();
+ dataRegionSet.setId(new DataRegionId(0));
+ dataRegionSet.setDataNodeList(
+ Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
+ plan0.addRegion(dataRegionSet);
+ RegionReplicaSet schemaRegionSet = new RegionReplicaSet();
+ schemaRegionSet.setId(new SchemaRegionId(1));
+ schemaRegionSet.setDataNodeList(
+ Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
+ plan0.addRegion(schemaRegionSet);
+
+ plan0.serialize(buffer);
+ CreateRegionsPlan plan1 = (CreateRegionsPlan) PhysicalPlan.Factory.create(buffer);
+ Assert.assertEquals(plan0, plan1);
+ }
+
+ @Test
+ public void CreateSchemaPartitionPlanTest() {
+ // TODO: Add serialize and deserialize test
+ }
+
+ @Test
+ public void GetOrCreateSchemaPartitionPlanTest() {
+ // TODO: Add serialize and deserialize test
+ }
+
+ @Test
+ public void CreateDataPartitionPlanTest() throws IOException {
+ String storageGroup = "root.sg0";
+ SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot(10);
+ TimePartitionSlot timePartitionSlot = new TimePartitionSlot(100);
+ RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
+ regionReplicaSet.setId(new DataRegionId(0));
+ regionReplicaSet.setDataNodeList(
+ Collections.singletonList(new DataNodeLocation(0, new Endpoint("0.0.0.0", 6667))));
+
+ Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ assignedDataPartition = new HashMap<>();
+ assignedDataPartition.put(storageGroup, new HashMap<>());
+ assignedDataPartition.get(storageGroup).put(seriesPartitionSlot, new HashMap<>());
+ assignedDataPartition
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .put(timePartitionSlot, new ArrayList<>());
+ assignedDataPartition
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .get(timePartitionSlot)
+ .add(regionReplicaSet);
+
+ CreateDataPartitionPlan plan0 = new CreateDataPartitionPlan();
+ plan0.setAssignedDataPartition(assignedDataPartition);
+ plan0.serialize(buffer);
+ CreateDataPartitionPlan plan1 = (CreateDataPartitionPlan) PhysicalPlan.Factory.create(buffer);
+ Assert.assertEquals(plan0, plan1);
+ }
+
+ @Test
+ public void GetOrCreateDataPartitionPlanTest() throws IOException {
+ String storageGroup = "root.sg0";
+ SeriesPartitionSlot seriesPartitionSlot = new SeriesPartitionSlot(10);
+ TimePartitionSlot timePartitionSlot = new TimePartitionSlot(100);
+
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap =
+ new HashMap<>();
+ partitionSlotsMap.put(storageGroup, new HashMap<>());
+ partitionSlotsMap.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+ partitionSlotsMap.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+
+ GetOrCreateDataPartitionPlan plan0 =
+ new GetOrCreateDataPartitionPlan(PhysicalPlanType.GetDataPartition);
+ plan0.setPartitionSlotsMap(partitionSlotsMap);
+ plan0.serialize(buffer);
+ GetOrCreateDataPartitionPlan plan1 =
+ (GetOrCreateDataPartitionPlan) PhysicalPlan.Factory.create(buffer);
+ Assert.assertEquals(plan0, plan1);
+ }
+}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
index ba01aee175..d4f3608920 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerProcessorTest.java
@@ -19,19 +19,25 @@
package org.apache.iotdb.confignode.service.thrift.server;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
import org.apache.iotdb.confignode.persistence.PartitionInfoPersistence;
import org.apache.iotdb.confignode.persistence.RegionInfoPersistence;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeMessage;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
-import org.apache.iotdb.confignode.rpc.thrift.DeviceGroupHashInfo;
-import org.apache.iotdb.confignode.rpc.thrift.GetSchemaPartitionReq;
-import org.apache.iotdb.confignode.rpc.thrift.SchemaPartitionInfo;
-import org.apache.iotdb.confignode.rpc.thrift.SetStorageGroupReq;
-import org.apache.iotdb.confignode.rpc.thrift.StorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeMessageResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessage;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupMessageResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.ratis.util.FileUtils;
@@ -45,6 +51,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,65 +73,90 @@ public class ConfigNodeRPCServerProcessorTest {
FileUtils.deleteFully(new File(ConfigNodeDescriptor.getInstance().getConf().getConsensusDir()));
}
+ private void checkGlobalConfig(TGlobalConfig globalConfig) {
+ Assert.assertEquals(
+ ConfigNodeDescriptor.getInstance().getConf().getDataNodeConsensusProtocolClass(),
+ globalConfig.getDataNodeConsensusProtocolClass());
+ Assert.assertEquals(
+ ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum(),
+ globalConfig.getSeriesPartitionSlotNum());
+ Assert.assertEquals(
+ ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionExecutorClass(),
+ globalConfig.getSeriesPartitionExecutorClass());
+ }
+
@Test
- public void registerDataNodeTest() throws TException, IOException {
- DataNodeRegisterResp resp;
- DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
- DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
- DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+ public void registerAndQueryDataNodeTest() throws TException {
+ TDataNodeRegisterResp resp;
+ TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+ TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+ TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
// test success register
resp = processor.registerDataNode(registerReq0);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getRegisterResult().getCode());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(0, resp.getDataNodeID());
+ checkGlobalConfig(resp.getGlobalConfig());
+
resp = processor.registerDataNode(registerReq1);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getRegisterResult().getCode());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(1, resp.getDataNodeID());
+ checkGlobalConfig(resp.getGlobalConfig());
+
resp = processor.registerDataNode(registerReq2);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getRegisterResult().getCode());
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
Assert.assertEquals(2, resp.getDataNodeID());
+ checkGlobalConfig(resp.getGlobalConfig());
+
+ // test success re-register
+ resp = processor.registerDataNode(registerReq1);
+ Assert.assertEquals(
+ TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode(), resp.getStatus().getCode());
+ Assert.assertEquals(1, resp.getDataNodeID());
+ checkGlobalConfig(resp.getGlobalConfig());
// test query DataNodeInfo
- Map<Integer, DataNodeMessage> messageMap = processor.getDataNodesMessage(-1);
- Assert.assertEquals(3, messageMap.size());
- List<Map.Entry<Integer, DataNodeMessage>> messageList = new ArrayList<>(messageMap.entrySet());
+ TDataNodeMessageResp msgResp = processor.getDataNodesMessage(-1);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), msgResp.getStatus().getCode());
+ Map<Integer, TDataNodeMessage> msgMap = msgResp.getDataNodeMessageMap();
+ Assert.assertEquals(3, msgMap.size());
+ List<Map.Entry<Integer, TDataNodeMessage>> messageList = new ArrayList<>(msgMap.entrySet());
messageList.sort(Comparator.comparingInt(Map.Entry::getKey));
for (int i = 0; i < 3; i++) {
- Assert.assertEquals(i, messageList.get(i).getValue().getDataNodeID());
+ Assert.assertEquals(i, messageList.get(i).getValue().getDataNodeId());
Assert.assertEquals("0.0.0.0", messageList.get(i).getValue().getEndPoint().getIp());
Assert.assertEquals(6667 + i, messageList.get(i).getValue().getEndPoint().getPort());
}
- messageMap = processor.getDataNodesMessage(1);
- Assert.assertEquals(1, messageMap.size());
- Assert.assertNotNull(messageMap.get(1));
- Assert.assertEquals("0.0.0.0", messageMap.get(1).getEndPoint().getIp());
- Assert.assertEquals(6668, messageMap.get(1).getEndPoint().getPort());
+ msgResp = processor.getDataNodesMessage(1);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), msgResp.getStatus().getCode());
+ msgMap = msgResp.getDataNodeMessageMap();
+ Assert.assertEquals(1, msgMap.size());
+ Assert.assertNotNull(msgMap.get(1));
+ Assert.assertEquals("0.0.0.0", msgMap.get(1).getEndPoint().getIp());
+ Assert.assertEquals(6668, msgMap.get(1).getEndPoint().getPort());
}
@Test
- public void setStorageGroupTest() throws TException, IOException {
+ public void setAndQueryStorageGroupTest() throws TException {
TSStatus status;
final String sg = "root.sg0";
// failed because there are not enough DataNodes
- SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg);
status = processor.setStorageGroup(setReq);
- Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), status.getCode());
+ Assert.assertEquals(TSStatusCode.NOT_ENOUGH_DATA_NODE.getStatusCode(), status.getCode());
Assert.assertEquals("DataNode is not enough, please register more.", status.getMessage());
// register DataNodes
- DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
- DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
- DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
- status = processor.registerDataNode(registerReq0).getRegisterResult();
+ TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+ TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+ TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+ status = processor.registerDataNode(registerReq0).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.registerDataNode(registerReq1).getRegisterResult();
+ status = processor.registerDataNode(registerReq1).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.registerDataNode(registerReq2).getRegisterResult();
+ status = processor.registerDataNode(registerReq2).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// set StorageGroup
@@ -132,45 +164,39 @@ public class ConfigNodeRPCServerProcessorTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// query StorageGroupSchema
- Map<String, StorageGroupMessage> messageMap = processor.getStorageGroupsMessage();
- Assert.assertEquals(1, messageMap.size());
- Assert.assertNotNull(messageMap.get(sg));
- Assert.assertEquals(sg, messageMap.get(sg).getStorageGroup());
- }
+ TStorageGroupMessageResp resp = processor.getStorageGroupsMessage();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
+ Map<String, TStorageGroupMessage> msgMap = resp.getStorageGroupMessageMap();
+ Assert.assertEquals(1, msgMap.size());
+ Assert.assertNotNull(msgMap.get(sg));
+ Assert.assertEquals(sg, msgMap.get(sg).getStorageGroup());
- @Test
- public void getDeviceGroupHashInfoTest() throws TException, IOException {
- // get Device Group hash
- DeviceGroupHashInfo deviceGroupHashInfo = new DeviceGroupHashInfo();
- deviceGroupHashInfo = processor.getDeviceGroupHashInfo();
- Assert.assertEquals(
- deviceGroupHashInfo.getDeviceGroupCount(),
- ConfigNodeDescriptor.getInstance().getConf().getDeviceGroupCount());
+ // test fail by re-register
+ status = processor.setStorageGroup(setReq);
Assert.assertEquals(
- deviceGroupHashInfo.getHashClass(),
- ConfigNodeDescriptor.getInstance().getConf().getDeviceGroupHashExecutorClass());
+ TSStatusCode.STORAGE_GROUP_ALREADY_EXISTS.getStatusCode(), status.getCode());
}
- @Test
+ // TODO: Reuse this test after PatterTree is moved to node-commons
public void applySchemaPartitionTest() throws TException, IOException {
TSStatus status;
final String sg = "root.sg0";
// failed because there are not enough DataNodes
- SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg);
status = processor.setStorageGroup(setReq);
Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), status.getCode());
Assert.assertEquals("DataNode is not enough, please register more.", status.getMessage());
// register DataNodes
- DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
- DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
- DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
- status = processor.registerDataNode(registerReq0).getRegisterResult();
+ TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+ TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+ TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+ status = processor.registerDataNode(registerReq0).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.registerDataNode(registerReq1).getRegisterResult();
+ status = processor.registerDataNode(registerReq1).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.registerDataNode(registerReq2).getRegisterResult();
+ status = processor.registerDataNode(registerReq2).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// set StorageGroup
@@ -178,40 +204,41 @@ public class ConfigNodeRPCServerProcessorTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// applySchemaPartition
- GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
+ TSchemaPartitionReq getSchemaPartitionReq = new TSchemaPartitionReq();
List<Integer> deviceGroupIds = new ArrayList<>();
Integer deviceGroupId = 1000;
deviceGroupIds.add(deviceGroupId);
- getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
- SchemaPartitionInfo schemaPartitionInfo = processor.applySchemaPartition(getSchemaPartitionReq);
- Assert.assertNotNull(schemaPartitionInfo);
- Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
- schemaPartitionInfo
- .getSchemaRegionDataNodesMap()
- .get(sg)
- .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
+ // getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+ // SchemaPartitionInfo schemaPartitionInfo =
+ // processor.applySchemaPartition(getSchemaPartitionReq);
+ // Assert.assertNotNull(schemaPartitionInfo);
+ // Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+ // schemaPartitionInfo
+ // .getSchemaRegionDataNodesMap()
+ // .get(sg)
+ // .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
}
- @Test
+ // TODO: Reuse this test after PatterTree is moved to node-commons
public void getSchemaPartitionTest() throws TException, IOException {
TSStatus status;
final String sg = "root.sg0";
// failed because there are not enough DataNodes
- SetStorageGroupReq setReq = new SetStorageGroupReq(sg);
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg);
status = processor.setStorageGroup(setReq);
Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), status.getCode());
Assert.assertEquals("DataNode is not enough, please register more.", status.getMessage());
// register DataNodes
- DataNodeRegisterReq registerReq0 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
- DataNodeRegisterReq registerReq1 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
- DataNodeRegisterReq registerReq2 = new DataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
- status = processor.registerDataNode(registerReq0).getRegisterResult();
+ TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+ TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+ TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+ status = processor.registerDataNode(registerReq0).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.registerDataNode(registerReq1).getRegisterResult();
+ status = processor.registerDataNode(registerReq1).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
- status = processor.registerDataNode(registerReq2).getRegisterResult();
+ status = processor.registerDataNode(registerReq2).getStatus();
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// set StorageGroup
@@ -219,40 +246,176 @@ public class ConfigNodeRPCServerProcessorTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
// getSchemaPartition
- GetSchemaPartitionReq getSchemaPartitionReq = new GetSchemaPartitionReq();
+ TSchemaPartitionReq getSchemaPartitionReq = new TSchemaPartitionReq();
List<Integer> deviceGroupIds = new ArrayList<>();
Integer deviceGroupId = 1000;
deviceGroupIds.add(deviceGroupId);
- getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
- SchemaPartitionInfo schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
- Assert.assertNotNull(schemaPartitionInfo);
- Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+ // getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+ // SchemaPartitionInfo schemaPartitionInfo =
+ // processor.getSchemaPartition(getSchemaPartitionReq);
+ // Assert.assertNotNull(schemaPartitionInfo);
+ // Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+ //
+ // // because does not apply schema partition, so schema partition is null
+ //
+ // Assert.assertNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+ //
+ // // applySchemaPartition
+ // deviceGroupIds.add(deviceGroupId);
+ // getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+ // schemaPartitionInfo = processor.applySchemaPartition(getSchemaPartitionReq);
+ // Assert.assertNotNull(schemaPartitionInfo);
+ // Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+ // schemaPartitionInfo
+ // .getSchemaRegionDataNodesMap()
+ // .get(sg)
+ // .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
+ //
+ // // getSchemaPartition twice
+ // getSchemaPartitionReq = new GetSchemaPartitionReq();
+ // deviceGroupIds = new ArrayList<>();
+ // deviceGroupIds.add(deviceGroupId);
+ // getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
+ // schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
+ // Assert.assertNotNull(schemaPartitionInfo);
+ // Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
+ //
+ // // because apply schema partition, so schema partition is not null
+ // Assert.assertNotNull(
+ // schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+ }
- // because does not apply schema partition, so schema partition is null
- Assert.assertNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+ private Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
+ constructPartitionSlotsMap(int sgNum, int seriesPartitionSlotNum, long timePartitionSlotNum) {
+ final String sg = "root.sg";
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> result = new HashMap<>();
- // applySchemaPartition
- deviceGroupIds.add(deviceGroupId);
- getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
- schemaPartitionInfo = processor.applySchemaPartition(getSchemaPartitionReq);
- Assert.assertNotNull(schemaPartitionInfo);
- Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
- schemaPartitionInfo
- .getSchemaRegionDataNodesMap()
- .get(sg)
- .forEach((key, value) -> Assert.assertEquals(deviceGroupId, key));
-
- // getSchemaPartition twice
- getSchemaPartitionReq = new GetSchemaPartitionReq();
- deviceGroupIds = new ArrayList<>();
- deviceGroupIds.add(deviceGroupId);
- getSchemaPartitionReq.setStorageGroup(sg).setDeviceGroupIDs(deviceGroupIds);
- schemaPartitionInfo = processor.getSchemaPartition(getSchemaPartitionReq);
- Assert.assertNotNull(schemaPartitionInfo);
- Assert.assertNotNull(schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg));
-
- // because apply schema partition, so schema partition is not null
- Assert.assertNotNull(
- schemaPartitionInfo.getSchemaRegionDataNodesMap().get(sg).get(deviceGroupId));
+ for (int i = 0; i < sgNum; i++) {
+ String storageGroup = sg + i;
+ result.put(storageGroup, new HashMap<>());
+ for (int j = 0; j < seriesPartitionSlotNum; j++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
+ result.get(storageGroup).put(seriesPartitionSlot, new ArrayList<>());
+ for (long k = 0; k < timePartitionSlotNum; k++) {
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(k);
+ result.get(storageGroup).get(seriesPartitionSlot).add(timePartitionSlot);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ private void checkDataPartitionMap(
+ int sgNum,
+ int seriesPartitionSlotNum,
+ long timePartitionSlotNum,
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ dataPartitionMap) {
+ final String sg = "root.sg";
+ Assert.assertEquals(sgNum, dataPartitionMap.size());
+ for (int i = 0; i < sgNum; i++) {
+ String storageGroup = sg + i;
+ Assert.assertTrue(dataPartitionMap.containsKey(storageGroup));
+ Assert.assertEquals(seriesPartitionSlotNum, dataPartitionMap.get(storageGroup).size());
+ for (int j = 0; j < seriesPartitionSlotNum; j++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(j);
+ Assert.assertTrue(dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot));
+ Assert.assertEquals(
+ timePartitionSlotNum,
+ dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).size());
+ for (long k = 0; k < timePartitionSlotNum; k++) {
+ TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot(k);
+ Assert.assertTrue(
+ dataPartitionMap
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .containsKey(timePartitionSlot));
+ // One RegionReplicaSet
+ Assert.assertEquals(
+ 1,
+ dataPartitionMap
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .get(timePartitionSlot)
+ .size());
+ // Including three RegionReplica
+ Assert.assertEquals(
+ 3,
+ dataPartitionMap
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .get(timePartitionSlot)
+ .get(0)
+ .getEndpointSize());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void getAndCreateDataPartitionTest() throws TException {
+ TSStatus status;
+
+ // register DataNodes
+ TDataNodeRegisterReq registerReq0 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6667));
+ TDataNodeRegisterReq registerReq1 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6668));
+ TDataNodeRegisterReq registerReq2 = new TDataNodeRegisterReq(new EndPoint("0.0.0.0", 6669));
+ status = processor.registerDataNode(registerReq0).getStatus();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ status = processor.registerDataNode(registerReq1).getStatus();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ status = processor.registerDataNode(registerReq2).getStatus();
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ final String sg = "root.sg";
+ final int sgNum = 2;
+ final int seriesPartitionSlotNum = 4;
+ final long timePartitionSlotNum = 6;
+
+ // Prepare partitionSlotsMap
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap0 =
+ constructPartitionSlotsMap(sgNum, seriesPartitionSlotNum, timePartitionSlotNum);
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap1 =
+ constructPartitionSlotsMap(sgNum * 2, seriesPartitionSlotNum * 2, timePartitionSlotNum * 2);
+
+ // set StorageGroups
+ for (int i = 0; i < sgNum; i++) {
+ TSetStorageGroupReq setReq = new TSetStorageGroupReq(sg + i);
+ status = processor.setStorageGroup(setReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ }
+
+ // Test getDataPartition, the result should be empty
+ TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
+ dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap0);
+ TDataPartitionResp dataPartitionResp = processor.getDataPartition(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
+ Assert.assertEquals(0, dataPartitionResp.getDataPartitionMapSize());
+
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition for PartitionSlots
+ dataPartitionResp = processor.getOrCreateDataPartition(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
+ checkDataPartitionMap(
+ sgNum,
+ seriesPartitionSlotNum,
+ timePartitionSlotNum,
+ dataPartitionResp.getDataPartitionMap());
+
+ // Test getDataPartition, the result should only contain DataPartition created before
+ dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap1);
+ dataPartitionResp = processor.getDataPartition(dataPartitionReq);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), dataPartitionResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionResp.getDataPartitionMap());
+ checkDataPartitionMap(
+ sgNum,
+ seriesPartitionSlotNum,
+ timePartitionSlotNum,
+ dataPartitionResp.getDataPartitionMap());
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataNodeLocation.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
similarity index 58%
rename from node-commons/src/main/java/org/apache/iotdb/commons/partition/DataNodeLocation.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
index 14e638d2a6..e81af759b3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataNodeLocation.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DataNodeLocation.java
@@ -16,67 +16,42 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.partition;
-
-import org.apache.iotdb.commons.cluster.Endpoint;
+package org.apache.iotdb.commons.cluster;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Objects;
public class DataNodeLocation {
- private int dataNodeID;
+ private int dataNodeId;
private Endpoint endPoint;
- private List<Integer> schemaRegionGroupIDs;
- private List<Integer> dataRegionGroupIDs;
-
public DataNodeLocation() {}
- public DataNodeLocation(int dataNodeID, Endpoint endPoint) {
- this.dataNodeID = dataNodeID;
+ public DataNodeLocation(int dataNodeId, Endpoint endPoint) {
+ this.dataNodeId = dataNodeId;
this.endPoint = endPoint;
- dataRegionGroupIDs = new ArrayList<>();
- schemaRegionGroupIDs = new ArrayList<>();
}
- public int getDataNodeID() {
- return dataNodeID;
+ public int getDataNodeId() {
+ return dataNodeId;
}
- public void setDataNodeID(int dataNodeID) {
- this.dataNodeID = dataNodeID;
+ public void setDataNodeId(int dataNodeId) {
+ this.dataNodeId = dataNodeId;
}
public Endpoint getEndPoint() {
return endPoint;
}
- public void addSchemaRegionGroup(int id) {
- schemaRegionGroupIDs.add(id);
- }
-
- public List<Integer> getSchemaRegionGroupIDs() {
- return schemaRegionGroupIDs;
- }
-
- public void addDataRegionGroup(int id) {
- dataRegionGroupIDs.add(id);
- }
-
- public List<Integer> getDataRegionGroupIDs() {
- return dataRegionGroupIDs;
- }
-
public void serializeImpl(ByteBuffer buffer) {
- buffer.putInt(dataNodeID);
+ buffer.putInt(dataNodeId);
endPoint.serializeImpl(buffer);
}
public void deserializeImpl(ByteBuffer buffer) {
- dataNodeID = buffer.getInt();
+ dataNodeId = buffer.getInt();
endPoint = new Endpoint();
endPoint.deserializeImpl(buffer);
}
@@ -90,15 +65,15 @@ public class DataNodeLocation {
return false;
}
DataNodeLocation that = (DataNodeLocation) o;
- return dataNodeID == that.dataNodeID && Objects.equals(endPoint, that.endPoint);
+ return dataNodeId == that.dataNodeId && Objects.equals(endPoint, that.endPoint);
}
@Override
public int hashCode() {
- return Objects.hash(dataNodeID, endPoint);
+ return Objects.hash(dataNodeId, endPoint);
}
public String toString() {
- return String.format("DataNode[%d, %s]", dataNodeID, endPoint);
+ return String.format("DataNode[%d, %s]", dataNodeId, endPoint);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index acd8a5e3c8..e16f878c59 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -18,7 +18,12 @@
*/
package org.apache.iotdb.commons.partition;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class DataPartition {
@@ -27,6 +32,16 @@ public class DataPartition {
private Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
dataPartitionMap;
+ public DataPartition() {
+ // Empty constructor
+ }
+
+ public DataPartition(
+ Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+ dataPartitionMap) {
+ this.dataPartitionMap = dataPartitionMap;
+ }
+
public Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
getDataPartitionMap() {
return dataPartitionMap;
@@ -78,4 +93,115 @@ public class DataPartition {
// TODO: (xingtanzjr) how to handle this exception in IoTDB
return null;
}
+
+ /* Interfaces for ConfigNode */
+
+ /**
+ * Get DataPartition by partitionSlotsMap
+ *
+ * @param partitionSlotsMap Map<StorageGroupName, Map<SeriesPartitionSlot,
+ * List<TimePartitionSlot>>>
+ * @return Map<StorageGroupName, Map<SeriesPartitionSlot, Map<TimePartitionSlot,
+ * List<RegionReplicaSet>>>>
+ */
+ public DataPartition getDataPartition(
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+ Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>> result =
+ new HashMap<>();
+
+ for (String storageGroupName : partitionSlotsMap.keySet()) {
+ // Compare StorageGroupName
+ if (dataPartitionMap.containsKey(storageGroupName)) {
+ Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+ seriesTimePartitionSlotMap = dataPartitionMap.get(storageGroupName);
+ for (SeriesPartitionSlot seriesPartitionSlot :
+ partitionSlotsMap.get(storageGroupName).keySet()) {
+ // Compare SeriesPartitionSlot
+ if (seriesTimePartitionSlotMap.containsKey(seriesPartitionSlot)) {
+ Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionSlotMap =
+ seriesTimePartitionSlotMap.get(seriesPartitionSlot);
+ for (TimePartitionSlot timePartitionSlot :
+ partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)) {
+ // Compare TimePartitionSlot
+ if (timePartitionSlotMap.containsKey(timePartitionSlot)) {
+ result
+ .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+ .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
+ .put(
+ timePartitionSlot,
+ new ArrayList<>(timePartitionSlotMap.get(timePartitionSlot)));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return new DataPartition(result);
+ }
+
+ /**
+ * Filter out unassigned PartitionSlots
+ *
+ * @param partitionSlotsMap Map<StorageGroupName, Map<SeriesPartitionSlot,
+ * List<TimePartitionSlot>>>
+ * @return Map<StorageGroupName, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>, unassigned
+ * PartitionSlots
+ */
+ public Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>>
+ filterNoAssignedDataPartitionSlots(
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> partitionSlotsMap) {
+ Map<String, Map<SeriesPartitionSlot, List<TimePartitionSlot>>> result = new HashMap<>();
+
+ for (String storageGroupName : partitionSlotsMap.keySet()) {
+ // Compare StorageGroupName
+ if (dataPartitionMap.containsKey(storageGroupName)) {
+ Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+ seriesTimePartitionSlotMap = dataPartitionMap.get(storageGroupName);
+ for (SeriesPartitionSlot seriesPartitionSlot :
+ partitionSlotsMap.get(storageGroupName).keySet()) {
+ // Compare SeriesPartitionSlot
+ if (seriesTimePartitionSlotMap.containsKey(seriesPartitionSlot)) {
+ Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionSlotMap =
+ seriesTimePartitionSlotMap.get(seriesPartitionSlot);
+ for (TimePartitionSlot timePartitionSlot :
+ partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)) {
+ // Compare TimePartitionSlot
+ if (!timePartitionSlotMap.containsKey(timePartitionSlot)) {
+ result
+ .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+ .computeIfAbsent(seriesPartitionSlot, key -> new ArrayList<>())
+ .add(timePartitionSlot);
+ }
+ }
+ } else {
+ // Clone all if SeriesPartitionSlot not assigned
+ result
+ .computeIfAbsent(storageGroupName, key -> new HashMap<>())
+ .put(
+ seriesPartitionSlot,
+ new ArrayList<>(
+ partitionSlotsMap.get(storageGroupName).get(seriesPartitionSlot)));
+ }
+ }
+ } else {
+ // Clone all if StorageGroupName not assigned
+ result.put(storageGroupName, new HashMap<>(partitionSlotsMap.get(storageGroupName)));
+ }
+ }
+
+ return result;
+ }
+
+ /** Create a DataPartition by ConfigNode */
+ public void createDataPartition(
+ String storageGroup,
+ SeriesPartitionSlot seriesPartitionSlot,
+ TimePartitionSlot timePartitionSlot,
+ RegionReplicaSet regionReplicaSet) {
+ dataPartitionMap
+ .computeIfAbsent(storageGroup, key -> new HashMap<>())
+ .computeIfAbsent(seriesPartitionSlot, key -> new HashMap<>())
+ .put(timePartitionSlot, Collections.singletonList(regionReplicaSet));
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index fec0cab0c7..115c4406c4 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.commons.partition;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import java.io.IOException;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index b81d569a1a..9d428ccedf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -26,34 +26,34 @@ import java.util.stream.Collectors;
public class SchemaPartition {
// Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
- private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartition;
+ private Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap;
public SchemaPartition() {
- schemaPartition = new HashMap<>();
+ schemaPartitionMap = new HashMap<>();
}
- public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getSchemaPartition() {
- return schemaPartition;
+ public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getSchemaPartitionMap() {
+ return schemaPartitionMap;
}
- public void setSchemaPartition(
- Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartition) {
- this.schemaPartition = schemaPartition;
+ public void setSchemaPartitionMap(
+ Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> schemaPartitionMap) {
+ this.schemaPartitionMap = schemaPartitionMap;
}
public Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> getSchemaPartition(
- String storageGroup, List<Integer> deviceGroupIDs) {
+ String storageGroup, List<Integer> seriesPartitionSlots) {
Map<String, Map<SeriesPartitionSlot, RegionReplicaSet>> storageGroupMap = new HashMap<>();
Map<SeriesPartitionSlot, RegionReplicaSet> deviceGroupMap = new HashMap<>();
- deviceGroupIDs.forEach(
+ seriesPartitionSlots.forEach(
deviceGroupID -> {
- if (schemaPartition.get(storageGroup) != null
- && schemaPartition
+ if (schemaPartitionMap.get(storageGroup) != null
+ && schemaPartitionMap
.get(storageGroup)
.containsKey(new SeriesPartitionSlot(deviceGroupID))) {
deviceGroupMap.put(
new SeriesPartitionSlot(deviceGroupID),
- schemaPartition.get(storageGroup).get(new SeriesPartitionSlot(deviceGroupID)));
+ schemaPartitionMap.get(storageGroup).get(new SeriesPartitionSlot(deviceGroupID)));
}
});
storageGroupMap.put(storageGroup, deviceGroupMap);
@@ -61,31 +61,26 @@ public class SchemaPartition {
}
/**
- * Filter out unassigned device groups
+ * Filter out unassigned SeriesPartitionSlots
*
* @param storageGroup storage group name
- * @param deviceGroupIDs device group id list
- * @return deviceGroupIDs does not assigned
+ * @param seriesPartitionSlots SeriesPartitionSlotIds
+ * @return not assigned seriesPartitionSlots
*/
- public List<Integer> filterNoAssignDeviceGroupId(
- String storageGroup, List<Integer> deviceGroupIDs) {
- if (!schemaPartition.containsKey(storageGroup)) {
- return deviceGroupIDs;
+ public List<Integer> filterNoAssignedSeriesPartitionSlot(
+ String storageGroup, List<Integer> seriesPartitionSlots) {
+ if (!schemaPartitionMap.containsKey(storageGroup)) {
+ return seriesPartitionSlots;
}
- return deviceGroupIDs.stream()
+ return seriesPartitionSlots.stream()
.filter(
- id -> {
- if (schemaPartition.get(storageGroup).containsKey(deviceGroupIDs)) {
- return false;
- }
- return true;
- })
+ id -> !schemaPartitionMap.get(storageGroup).containsKey(new SeriesPartitionSlot(id)))
.collect(Collectors.toList());
}
public void setSchemaRegionReplicaSet(
String storageGroup, int deviceGroupId, RegionReplicaSet regionReplicaSet) {
- schemaPartition
+ schemaPartitionMap
.computeIfAbsent(storageGroup, value -> new HashMap<>())
.put(new SeriesPartitionSlot(deviceGroupId), regionReplicaSet);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionSlot.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionSlot.java
index 52ef9b5fe6..553aeff163 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionSlot.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionSlot.java
@@ -18,27 +18,42 @@
*/
package org.apache.iotdb.commons.partition;
+import java.nio.ByteBuffer;
+
public class SeriesPartitionSlot {
- private int deviceGroupId;
+ private int slotId;
+
+ public SeriesPartitionSlot() {
+ // Empty constructor
+ }
- public SeriesPartitionSlot(int deviceGroupId) {
- this.deviceGroupId = deviceGroupId;
+ public SeriesPartitionSlot(int slotId) {
+ this.slotId = slotId;
}
- public int getDeviceGroupId() {
- return deviceGroupId;
+ public int getSlotId() {
+ return slotId;
}
- public void setDeviceGroupId(int deviceGroupId) {
- this.deviceGroupId = deviceGroupId;
+ public void setSlotId(int slotId) {
+ this.slotId = slotId;
}
+ @Override
public int hashCode() {
- return new Integer(deviceGroupId).hashCode();
+ return new Integer(slotId).hashCode();
}
+ @Override
public boolean equals(Object obj) {
- return obj instanceof SeriesPartitionSlot
- && this.deviceGroupId == ((SeriesPartitionSlot) obj).deviceGroupId;
+ return obj instanceof SeriesPartitionSlot && this.slotId == ((SeriesPartitionSlot) obj).slotId;
+ }
+
+ public void serializeImpl(ByteBuffer buffer) {
+ buffer.putInt(slotId);
+ }
+
+ public void deserializeImpl(ByteBuffer buffer) {
+ slotId = buffer.getInt();
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionSlot.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionSlot.java
index 048406045b..69a3416fcf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionSlot.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionSlot.java
@@ -18,9 +18,21 @@
*/
package org.apache.iotdb.commons.partition;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
public class TimePartitionSlot {
+
private long startTime;
+ public TimePartitionSlot() {
+ // empty constructor
+ }
+
+ public TimePartitionSlot(long startTime) {
+ this.startTime = startTime;
+ }
+
public long getStartTime() {
return startTime;
}
@@ -28,4 +40,25 @@ public class TimePartitionSlot {
public void setStartTime(long startTime) {
this.startTime = startTime;
}
+
+ public void serializeImpl(ByteBuffer buffer) {
+ buffer.putLong(startTime);
+ }
+
+ public void deserializeImpl(ByteBuffer buffer) {
+ startTime = buffer.getLong();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TimePartitionSlot that = (TimePartitionSlot) o;
+ return startTime == that.startTime;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(startTime);
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/DeviceGroupHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
similarity index 62%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/DeviceGroupHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
index a597c938a8..285f2ff95c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/DeviceGroupHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
@@ -16,16 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor;
-/** All DeviceGroup hash algorithm executors must be subclasses of DeviceGroupHashExecutor */
-public abstract class DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
- protected final int deviceGroupCount;
+/** All SeriesPartitionExecutors must be subclasses of SeriesPartitionExecutor */
+public abstract class SeriesPartitionExecutor {
- public DeviceGroupHashExecutor(int deviceGroupCount) {
- this.deviceGroupCount = deviceGroupCount;
+ protected final int seriesPartitionSlotNum;
+
+ public SeriesPartitionExecutor(int seriesPartitionSlotNum) {
+ this.seriesPartitionSlotNum = seriesPartitionSlotNum;
}
- public abstract int getDeviceGroupID(String device);
+ public abstract SeriesPartitionSlot getSeriesPartitionSlot(String device);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/APHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
similarity index 75%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/APHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
index 0fc18ad662..8da263e4ae 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/APHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
@@ -16,16 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor.hash;
-public class APHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
+public class APHashExecutor extends SeriesPartitionExecutor {
public APHashExecutor(int deviceGroupCount) {
super(deviceGroupCount);
}
@Override
- public int getDeviceGroupID(String device) {
+ public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
int hash = 0;
for (int i = 0; i < device.length(); i++) {
@@ -37,6 +40,6 @@ public class APHashExecutor extends DeviceGroupHashExecutor {
}
hash &= Integer.MAX_VALUE;
- return hash % deviceGroupCount;
+ return new SeriesPartitionSlot(hash % seriesPartitionSlotNum);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/BKDRHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
similarity index 73%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/BKDRHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
index b5e08a0718..a238afb416 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/BKDRHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor.hash;
-public class BKDRHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
+public class BKDRHashExecutor extends SeriesPartitionExecutor {
private static final int seed = 131;
@@ -27,7 +30,7 @@ public class BKDRHashExecutor extends DeviceGroupHashExecutor {
}
@Override
- public int getDeviceGroupID(String device) {
+ public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
int hash = 0;
for (int i = 0; i < device.length(); i++) {
@@ -35,6 +38,6 @@ public class BKDRHashExecutor extends DeviceGroupHashExecutor {
}
hash &= Integer.MAX_VALUE;
- return hash % deviceGroupCount;
+ return new SeriesPartitionSlot(hash % seriesPartitionSlotNum);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
similarity index 74%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
index 6521d2890b..13f9468154 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor.hash;
-public class JSHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
+public class JSHashExecutor extends SeriesPartitionExecutor {
private static final int base = 1315423911;
@@ -27,7 +30,7 @@ public class JSHashExecutor extends DeviceGroupHashExecutor {
}
@Override
- public int getDeviceGroupID(String device) {
+ public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
int hash = base;
for (int i = 0; i < device.length(); i++) {
@@ -35,6 +38,6 @@ public class JSHashExecutor extends DeviceGroupHashExecutor {
}
hash &= Integer.MAX_VALUE;
- return hash % deviceGroupCount;
+ return new SeriesPartitionSlot(hash % seriesPartitionSlotNum);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/hash/SDBMHashExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
similarity index 73%
rename from node-commons/src/main/java/org/apache/iotdb/commons/hash/SDBMHashExecutor.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
index 8780ac4838..0246d4f8de 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/hash/SDBMHashExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
@@ -16,16 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.hash;
+package org.apache.iotdb.commons.partition.executor.hash;
-public class SDBMHashExecutor extends DeviceGroupHashExecutor {
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+
+public class SDBMHashExecutor extends SeriesPartitionExecutor {
public SDBMHashExecutor(int deviceGroupCount) {
super(deviceGroupCount);
}
@Override
- public int getDeviceGroupID(String device) {
+ public SeriesPartitionSlot getSeriesPartitionSlot(String device) {
int hash = 0;
for (int i = 0; i < device.length(); i++) {
@@ -33,6 +36,6 @@ public class SDBMHashExecutor extends DeviceGroupHashExecutor {
}
hash &= Integer.MAX_VALUE;
- return hash % deviceGroupCount;
+ return new SeriesPartitionSlot(hash % seriesPartitionSlotNum);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
index 5ba8090451..94167a23ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/FakePartitionFetcherImpl.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.sql.analyze;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.partition.*;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 30e88e484b..57cfdd1354 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.CommonUtils;
import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterReq;
-import org.apache.iotdb.confignode.rpc.thrift.DataNodeRegisterResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConfigCheck;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -139,10 +139,10 @@ public class DataNode implements DataNodeMBean {
logger.info("start joining the cluster with the help of {}", configNode);
try {
ConfigIService.Client client = createClient(configNode);
- DataNodeRegisterResp dataNodeRegisterResp =
+ TDataNodeRegisterResp dataNodeRegisterResp =
client.registerDataNode(
- new DataNodeRegisterReq(new EndPoint(thisNode.getIp(), thisNode.getPort())));
- if (dataNodeRegisterResp.getRegisterResult().getCode()
+ new TDataNodeRegisterReq(new EndPoint(thisNode.getIp(), thisNode.getPort())));
+ if (dataNodeRegisterResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataNodeID = dataNodeRegisterResp.getDataNodeID();
logger.info("Joined a cluster successfully");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 7ad38eb34f..03db5bb4ff 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.mpp.sql.plan;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.partition.DataNodeLocation;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index c0f5fa1635..d3948a7d06 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -109,7 +109,12 @@ public enum TSStatusCode {
PARSE_LOG_ERROR(708),
// configuration
- CONFIG_ERROR(800);
+ CONFIG_ERROR(800),
+
+ // ConfigNode response
+ DATANODE_ALREADY_REGISTERED(901),
+ STORAGE_GROUP_ALREADY_EXISTS(902),
+ NOT_ENOUGH_DATA_NODE(903);
private int statusCode;
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index c41ffc1c22..7d3c3b7038 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -21,82 +21,79 @@ include "common.thrift"
namespace java org.apache.iotdb.confignode.rpc.thrift
namespace py iotdb.thrift.confignode
-struct DataNodeRegisterReq {
+// DataNode
+struct TDataNodeRegisterReq {
1: required common.EndPoint endPoint
}
-struct DataNodeRegisterResp {
- 1: required common.TSStatus registerResult
- 2: optional i32 dataNodeID
+struct TGlobalConfig {
+ 1: optional string dataNodeConsensusProtocolClass
+ 2: optional i32 seriesPartitionSlotNum
+ 3: optional string seriesPartitionExecutorClass
}
-struct DataNodeMessage {
- 1: required i32 dataNodeID
- 2: required common.EndPoint endPoint
+struct TDataNodeRegisterResp {
+ 1: required common.TSStatus status
+ 2: optional i32 dataNodeID
+ 3: optional TGlobalConfig globalConfig
}
-struct SetStorageGroupReq {
- 1: required string storageGroup
+struct TDataNodeMessageResp {
+ 1: required common.TSStatus status
+ // map<DataNodeId, DataNodeMessage>
+ 2: optional map<i32, TDataNodeMessage> dataNodeMessageMap
}
-struct DeleteStorageGroupReq {
- 1: required string storageGroup
+struct TDataNodeMessage {
+ 1: required i32 dataNodeId
+ 2: required common.EndPoint endPoint
}
-struct StorageGroupMessage {
+// StorageGroup
+struct TSetStorageGroupReq {
1: required string storageGroup
+ 2: optional i64 ttl
}
-struct GetDeviceGroupIDReq {
- 1: required string device
-}
-
-struct GetSchemaPartitionReq {
+struct TDeleteStorageGroupReq {
1: required string storageGroup
- 2: required list<i32> deviceGroupIDs
-}
-
-struct SchemaPartitionInfo {
- 1: required map<string, map<i32, common.RegionReplicaSet>> schemaRegionDataNodesMap
}
-struct DataPartitionInfo {
- 1: required map<string, map<i64, map<i32, list<common.RegionReplicaSet>>>> deviceGroupStartTimeDataRegionGroupMap
+struct TStorageGroupMessageResp {
+ 1: required common.TSStatus status
+ // map<string, StorageGroupMessage>
+ 2: optional map<string, TStorageGroupMessage> storageGroupMessageMap
}
-struct GetDataPartitionReq {
+struct TStorageGroupMessage {
1: required string storageGroup
- 2: required map<i32, list<i64>> deviceGroupStartTimeMap
}
-struct DeviceGroupHashInfo {
- 1: required i32 deviceGroupCount
- 2: required string hashClass
+// Schema
+struct TSchemaPartitionReq {
+ 1: required binary pathPatternTree
}
-struct FetchDataPartitionReq {
- 1: required map<i32, list<i64>> deviceGroupIDToStartTimeMap
+struct TSchemaPartitionResp {
+ 1: required common.TSStatus status
+ // map<StorageGroupName, map<TSeriesPartitionSlot, TRegionReplicaSet>>
+ 2: optional map<string, map<common.TSeriesPartitionSlot, common.TRegionReplicaSet>> schemaRegionMap
}
-struct FetchSchemaPartitionReq {
- 1: required list<string> devicePaths
+// Data
+struct TDataPartitionReq {
+ // map<StorageGroupName, map<TSeriesPartitionSlot, list<TTimePartitionSlot>>>
+ 1: required map<string, map<common.TSeriesPartitionSlot, list<common.TTimePartitionSlot>>> partitionSlotsMap
}
-struct FetchPartitionReq {
- 1: required map<i32, list<i64>> deviceGroupIDToStartTimeMap
+struct TDataPartitionResp {
+ 1: required common.TSStatus status
+ // map<StorageGroupName, map<TSeriesPartitionSlot, map<TTimePartitionSlot, list<TRegionReplicaSet>>>>
+ 2: optional map<string, map<common.TSeriesPartitionSlot, map<common.TTimePartitionSlot, list<common.TRegionReplicaSet>>>> dataPartitionMap
}
-struct RegionInfo {
- 1: required i32 regionId
- 2: required list<common.EndPoint> endPointList
-}
-
-struct DataPartitionInfoResp {
- // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionReplicaInfo>>>>
- 1: required map<string, map<i32, map<i64, list<RegionInfo>>>> dataPartitionMap
-}
-
-struct AuthorizerReq{
+// Authorize
+struct TAuthorizerReq {
1: required i32 authorType
2: required string userName
3: required string roleName
@@ -106,51 +103,35 @@ struct AuthorizerReq{
7: required string nodeName
}
-struct SchemaPartitionInfoResp {
- // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
- 1: required map<string, map<i32, RegionInfo>> schemaPartitionInfo
-}
-
-struct PartitionInfoResp {
- // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionReplicaInfo>>>>
- 1: required map<string, map<i32, map<i64, list<RegionInfo>>>> dataPartitionMap
- // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
- 2: required map<string, map<i32, RegionInfo>> schemaPartitionInfo
-}
-
service ConfigIService {
- // Return TSStatusCode.SUCCESS_STATUS and the register DataNode id when successful registered.
- // Otherwise, return TSStatusCode.INTERNAL_SERVER_ERROR
- DataNodeRegisterResp registerDataNode(DataNodeRegisterReq req)
- map<i32, DataNodeMessage> getDataNodesMessage(i32 dataNodeID)
+ /* DataNode */
+
+ TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req)
- common.TSStatus setStorageGroup(SetStorageGroupReq req)
+ TDataNodeMessageResp getDataNodesMessage(i32 dataNodeID)
- common.TSStatus deleteStorageGroup(DeleteStorageGroupReq req)
+ /* StorageGroup */
- map<string, StorageGroupMessage> getStorageGroupsMessage()
+ common.TSStatus setStorageGroup(TSetStorageGroupReq req)
- // Gets SchemaRegions for DeviceGroups in a StorageGroup
- SchemaPartitionInfo getSchemaPartition(GetSchemaPartitionReq req)
+ common.TSStatus deleteStorageGroup(TDeleteStorageGroupReq req)
- // Gets DataRegions for DeviceGroups in a StorageGroup at different starttime
- DataPartitionInfo getDataPartition(GetDataPartitionReq req)
+ TStorageGroupMessageResp getStorageGroupsMessage()
- DeviceGroupHashInfo getDeviceGroupHashInfo()
+ /* Schema */
- // apply data partition when write data
- DataPartitionInfo applyDataPartition(GetDataPartitionReq req)
+ TSchemaPartitionResp getSchemaPartition(TSchemaPartitionReq req)
- // apply schema partition when create schema
- SchemaPartitionInfo applySchemaPartition(GetSchemaPartitionReq req)
+ TSchemaPartitionResp getOrCreateSchemaPartition(TSchemaPartitionReq req)
- DataPartitionInfoResp fetchDataPartitionInfo(FetchDataPartitionReq req)
+ /* Data */
- SchemaPartitionInfoResp fetchSchemaPartitionInfo(FetchSchemaPartitionReq req)
+ TDataPartitionResp getDataPartition(TDataPartitionReq req)
- PartitionInfoResp fetchPartitionInfo(FetchPartitionReq req)
+ TDataPartitionResp getOrCreateDataPartition(TDataPartitionReq req)
- common.TSStatus operatePermission(AuthorizerReq req)
+ /* Authorize */
+ common.TSStatus operatePermission(TAuthorizerReq req)
}
\ No newline at end of file
diff --git a/thrift/src/main/thrift/common.thrift b/thrift/src/main/thrift/common.thrift
index 724822e93b..8a8e61999f 100644
--- a/thrift/src/main/thrift/common.thrift
+++ b/thrift/src/main/thrift/common.thrift
@@ -33,7 +33,16 @@ namespace py iotdb.thrift.common
4: optional EndPoint redirectNode
}
- struct RegionReplicaSet {
+ struct TRegionReplicaSet {
1: required i32 regionId
- 2: required list<EndPoint> endpoint
+ 2: required string groupType
+ 3: required list<EndPoint> endpoint
+ }
+
+ struct TSeriesPartitionSlot {
+ 1: required i32 slotId
+ }
+
+ struct TTimePartitionSlot {
+ 1: required i64 startTime
}
\ No newline at end of file
diff --git a/thrift/src/main/thrift/management.thrift b/thrift/src/main/thrift/management.thrift
index 31832a168a..30aa28a072 100644
--- a/thrift/src/main/thrift/management.thrift
+++ b/thrift/src/main/thrift/management.thrift
@@ -24,12 +24,12 @@ typedef i32 int
typedef i64 long
struct CreateSchemaRegionReq {
- 1: required common.RegionReplicaSet regionReplicaSet
+ 1: required common.TRegionReplicaSet regionReplicaSet
2: required string storageGroup
}
struct CreateDataRegionReq {
- 1: required common.RegionReplicaSet regionReplicaSet
+ 1: required common.TRegionReplicaSet regionReplicaSet
2: required string storageGroup
3: optional long ttl
}