You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/11/23 11:00:30 UTC
[iotdb] branch master updated: [IOTDB-4881] Add feature StorageGroup fast activation (#7944)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1c01608849 [IOTDB-4881] Add feature StorageGroup fast activation (#7944)
1c01608849 is described below
commit 1c0160884916792588b50aa46581a5c3042232a2
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Wed Nov 23 19:00:23 2022 +0800
[IOTDB-4881] Add feature StorageGroup fast activation (#7944)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 24 ++++++-
.../confignode/conf/ConfigNodeDescriptor.java | 5 ++
.../confignode/conf/ConfigNodeStartupCheck.java | 5 ++
.../consensus/request/ConfigPhysicalPlan.java | 6 +-
.../consensus/request/ConfigPhysicalPlanType.java | 2 +-
.../write/region/CreateRegionGroupsPlan.java | 19 +++++
...tPlan.java => AdjustMaxRegionGroupNumPlan.java} | 46 ++++++-------
.../confignode/manager/ClusterSchemaManager.java | 80 +++++++++++++---------
.../iotdb/confignode/manager/ConfigManager.java | 6 +-
.../iotdb/confignode/manager/node/NodeManager.java | 2 +-
.../manager/partition/PartitionManager.java | 60 +++++++++-------
.../persistence/executor/ConfigPlanExecutor.java | 6 +-
.../persistence/partition/PartitionInfo.java | 7 +-
.../persistence/schema/ClusterSchemaInfo.java | 20 +++---
.../procedure/env/ConfigNodeProcedureEnv.java | 9 ++-
.../statemachine/CreateRegionGroupsProcedure.java | 13 +++-
.../iotdb/confignode/service/ConfigNode.java | 7 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 4 +-
.../request/ConfigPhysicalPlanSerDeTest.java | 8 +--
.../java/org/apache/iotdb/it/env/MppConfig.java | 7 ++
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +++
.../it/partition/IoTDBPartitionGetterIT.java | 55 +++++++++------
.../partition/IoTDBPartitionInheritPolicyTest.java | 3 +-
.../db/it/alignbydevice/IoTDBAlignByDeviceIT.java | 75 +-------------------
.../resources/conf/iotdb-common.properties | 7 ++
.../iotdb/db/mpp/execution/driver/DataDriver.java | 4 +-
.../operator/process/AbstractIntoOperator.java | 21 ++++--
.../operator/process/DeviceViewIntoOperator.java | 2 +-
.../operator/process/DeviceViewOperator.java | 16 ++---
.../execution/operator/process/IntoOperator.java | 2 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 19 +++--
.../operator/DeviceMergeOperatorTest.java | 9 +++
.../execution/operator/DeviceViewOperatorTest.java | 3 +
.../java/org/apache/iotdb/session/Session.java | 9 ++-
.../src/main/thrift/confignode.thrift | 5 +-
35 files changed, 333 insertions(+), 241 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index d76920f635..cced18ed1a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -62,9 +62,15 @@ public class ConfigNodeConfig {
/** DataNode data region consensus protocol */
private String dataRegionConsensusProtocolClass = ConsensusFactory.SIMPLE_CONSENSUS;
- /** The maximum number of SchemaRegion expected to be managed by each DataNode. */
+ /** The maximum number of DataRegion expected to be managed by each DataNode. */
private double dataRegionPerProcessor = 0.5;
+ /** The least number of SchemaRegionGroup for each StorageGroup. */
+ private int leastSchemaRegionGroupNum = 1;
+
+ /** The least number of DataRegionGroup for each StorageGroup. */
+ private int leastDataRegionGroupNum = 5;
+
/** region allocate strategy. */
private RegionBalancer.RegionAllocateStrategy regionAllocateStrategy =
RegionBalancer.RegionAllocateStrategy.GREEDY;
@@ -417,6 +423,22 @@ public class ConfigNodeConfig {
this.dataRegionPerProcessor = dataRegionPerProcessor;
}
+ public int getLeastSchemaRegionGroupNum() {
+ return leastSchemaRegionGroupNum;
+ }
+
+ public void setLeastSchemaRegionGroupNum(int leastSchemaRegionGroupNum) {
+ this.leastSchemaRegionGroupNum = leastSchemaRegionGroupNum;
+ }
+
+ public int getLeastDataRegionGroupNum() {
+ return leastDataRegionGroupNum;
+ }
+
+ public void setLeastDataRegionGroupNum(int leastDataRegionGroupNum) {
+ this.leastDataRegionGroupNum = leastDataRegionGroupNum;
+ }
+
public RegionBalancer.RegionAllocateStrategy getRegionAllocateStrategy() {
return regionAllocateStrategy;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index ee88d737c9..bbb7e053f8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -213,6 +213,11 @@ public class ConfigNodeDescriptor {
"data_region_per_processor", String.valueOf(conf.getDataRegionPerProcessor()))
.trim()));
+ conf.setLeastDataRegionGroupNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "least_data_region_group_num", String.valueOf(conf.getLeastDataRegionGroupNum()))));
+
try {
conf.setRegionAllocateStrategy(
RegionBalancer.RegionAllocateStrategy.valueOf(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
index 50dff4cebc..d00f93c301 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java
@@ -112,6 +112,11 @@ public class ConfigNodeStartupCheck {
throw new ConfigurationException(
"The ip address of any target_config_node_list couldn't be 0.0.0.0");
}
+
+ // The least DataRegionGroup number should be positive
+ if (CONF.getLeastDataRegionGroupNum() <= 0) {
+ throw new ConfigurationException("The least_data_region_group_num should be positive");
+ }
}
private void createDirsIfNecessary() throws IOException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index d69b620848..e1b8e58c6d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -64,7 +64,7 @@ import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProce
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupCountPlan;
+import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
@@ -177,8 +177,8 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest {
case SetTimePartitionInterval:
plan = new SetTimePartitionIntervalPlan();
break;
- case AdjustMaxRegionGroupCount:
- plan = new AdjustMaxRegionGroupCountPlan();
+ case AdjustMaxRegionGroupNum:
+ plan = new AdjustMaxRegionGroupNumPlan();
break;
case CountStorageGroup:
plan = new CountStorageGroupPlan();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index d549ca450c..4cea224021 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -39,7 +39,7 @@ public enum ConfigPhysicalPlanType {
SetSchemaReplicationFactor((short) 202),
SetDataReplicationFactor((short) 203),
SetTimePartitionInterval((short) 204),
- AdjustMaxRegionGroupCount((short) 205),
+ AdjustMaxRegionGroupNum((short) 205),
DeleteStorageGroup((short) 206),
PreDeleteStorageGroup((short) 207),
GetStorageGroup((short) 208),
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/region/CreateRegionGroupsPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/region/CreateRegionGroupsPlan.java
index e3e588ef8a..ec451907a9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/region/CreateRegionGroupsPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/region/CreateRegionGroupsPlan.java
@@ -18,12 +18,15 @@
*/
package org.apache.iotdb.confignode.consensus.request.write.region;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import org.slf4j.Logger;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -33,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.stream.Collectors;
/** Create regions for specific StorageGroups */
public class CreateRegionGroupsPlan extends ConfigPhysicalPlan {
@@ -60,6 +64,21 @@ public class CreateRegionGroupsPlan extends ConfigPhysicalPlan {
.add(regionReplicaSet);
}
+ public void planLog(Logger LOGGER) {
+ for (Map.Entry<String, List<TRegionReplicaSet>> regionGroupEntry : regionGroupMap.entrySet()) {
+ String storageGroup = regionGroupEntry.getKey();
+ for (TRegionReplicaSet regionReplicaSet : regionGroupEntry.getValue()) {
+ LOGGER.info(
+ "[CreateRegionGroups] RegionGroup: {}, belonged StorageGroup: {}, on DataNodes: {}",
+ regionReplicaSet.getRegionId(),
+ storageGroup,
+ regionReplicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList()));
+ }
+ }
+ }
+
public void serializeForProcedure(DataOutputStream stream) throws IOException {
this.serializeImpl(stream);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupCountPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupNumPlan.java
similarity index 61%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupCountPlan.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupNumPlan.java
index 63aa1d75d4..ad8911b39e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupCountPlan.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupNumPlan.java
@@ -30,34 +30,34 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-public class AdjustMaxRegionGroupCountPlan extends ConfigPhysicalPlan {
+public class AdjustMaxRegionGroupNumPlan extends ConfigPhysicalPlan {
- // Map<StorageGroupName, Pair<maxSchemaRegionGroupCount, maxDataRegionGroupCount>>
- public final Map<String, Pair<Integer, Integer>> maxRegionGroupCountMap;
+ // Map<StorageGroupName, Pair<maxSchemaRegionGroupNum, maxDataRegionGroupNum>>
+ public final Map<String, Pair<Integer, Integer>> maxRegionGroupNumMap;
- public AdjustMaxRegionGroupCountPlan() {
- super(ConfigPhysicalPlanType.AdjustMaxRegionGroupCount);
- this.maxRegionGroupCountMap = new HashMap<>();
+ public AdjustMaxRegionGroupNumPlan() {
+ super(ConfigPhysicalPlanType.AdjustMaxRegionGroupNum);
+ this.maxRegionGroupNumMap = new HashMap<>();
}
- public void putEntry(String storageGroup, Pair<Integer, Integer> maxRegionGroupCount) {
- maxRegionGroupCountMap.put(storageGroup, maxRegionGroupCount);
+ public void putEntry(String storageGroup, Pair<Integer, Integer> maxRegionGroupNum) {
+ maxRegionGroupNumMap.put(storageGroup, maxRegionGroupNum);
}
- public Map<String, Pair<Integer, Integer>> getMaxRegionGroupCountMap() {
- return maxRegionGroupCountMap;
+ public Map<String, Pair<Integer, Integer>> getMaxRegionGroupNumMap() {
+ return maxRegionGroupNumMap;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(getType().getPlanType(), stream);
- ReadWriteIOUtils.write(maxRegionGroupCountMap.size(), stream);
- for (Map.Entry<String, Pair<Integer, Integer>> maxRegionGroupCountEntry :
- maxRegionGroupCountMap.entrySet()) {
- ReadWriteIOUtils.write(maxRegionGroupCountEntry.getKey(), stream);
- ReadWriteIOUtils.write(maxRegionGroupCountEntry.getValue().getLeft(), stream);
- ReadWriteIOUtils.write(maxRegionGroupCountEntry.getValue().getRight(), stream);
+ ReadWriteIOUtils.write(maxRegionGroupNumMap.size(), stream);
+ for (Map.Entry<String, Pair<Integer, Integer>> maxRegionGroupNumEntry :
+ maxRegionGroupNumMap.entrySet()) {
+ ReadWriteIOUtils.write(maxRegionGroupNumEntry.getKey(), stream);
+ ReadWriteIOUtils.write(maxRegionGroupNumEntry.getValue().getLeft(), stream);
+ ReadWriteIOUtils.write(maxRegionGroupNumEntry.getValue().getRight(), stream);
}
}
@@ -67,10 +67,10 @@ public class AdjustMaxRegionGroupCountPlan extends ConfigPhysicalPlan {
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = ReadWriteIOUtils.readString(buffer);
- int maxSchemaRegionGroupCount = buffer.getInt();
- int maxDataRegionGroupCount = buffer.getInt();
- maxRegionGroupCountMap.put(
- storageGroup, new Pair<>(maxSchemaRegionGroupCount, maxDataRegionGroupCount));
+ int maxSchemaRegionGroupNum = buffer.getInt();
+ int maxDataRegionGroupNum = buffer.getInt();
+ maxRegionGroupNumMap.put(
+ storageGroup, new Pair<>(maxSchemaRegionGroupNum, maxDataRegionGroupNum));
}
}
@@ -78,12 +78,12 @@ public class AdjustMaxRegionGroupCountPlan extends ConfigPhysicalPlan {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- AdjustMaxRegionGroupCountPlan that = (AdjustMaxRegionGroupCountPlan) o;
- return maxRegionGroupCountMap.equals(that.maxRegionGroupCountMap);
+ AdjustMaxRegionGroupNumPlan that = (AdjustMaxRegionGroupNumPlan) o;
+ return maxRegionGroupNumMap.equals(that.maxRegionGroupNumMap);
}
@Override
public int hashCode() {
- return Objects.hash(maxRegionGroupCountMap);
+ return Objects.hash(maxRegionGroupNumMap);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index 7817f0afba..07b7554f1c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.storagegroup.CountStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.storagegroup.GetStorageGroupPlan;
@@ -40,7 +41,7 @@ import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplat
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetTemplateSetInfoPlan;
-import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupCountPlan;
+import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
@@ -95,10 +96,11 @@ public class ClusterSchemaManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSchemaManager.class);
- private static final double schemaRegionPerDataNode =
- ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode();
- private static final double dataRegionPerProcessor =
- ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerProcessor();
+ private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+ private static final int LEAST_SCHEMA_REGION_GROUP_NUM = CONF.getLeastSchemaRegionGroupNum();
+ private static final double SCHEMA_REGION_PER_DATA_NODE = CONF.getSchemaRegionPerDataNode();
+ private static final int LEAST_DATA_REGION_GROUP_NUM = CONF.getLeastDataRegionGroupNum();
+ private static final double DATA_REGION_PER_PROCESSOR = CONF.getDataRegionPerProcessor();
private final IManager configManager;
private final ClusterSchemaInfo clusterSchemaInfo;
@@ -146,7 +148,7 @@ public class ClusterSchemaManager {
result = getConsensusManager().write(setStorageGroupPlan).getStatus();
// Adjust the maximum RegionGroup number of each StorageGroup
- adjustMaxRegionGroupCount();
+ adjustMaxRegionGroupNum();
return result;
}
@@ -155,7 +157,7 @@ public class ClusterSchemaManager {
TSStatus result = getConsensusManager().write(deleteStorageGroupPlan).getStatus();
// Adjust the maximum RegionGroup number of each StorageGroup after deleting the storage group
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- adjustMaxRegionGroupCount();
+ adjustMaxRegionGroupNum();
}
return result;
}
@@ -200,9 +202,9 @@ public class ClusterSchemaManager {
try {
storageGroupInfo.setSchemaRegionNum(
- getPartitionManager().getRegionCount(name, TConsensusGroupType.SchemaRegion));
+ getPartitionManager().getRegionGroupCount(name, TConsensusGroupType.SchemaRegion));
storageGroupInfo.setDataRegionNum(
- getPartitionManager().getRegionCount(name, TConsensusGroupType.DataRegion));
+ getPartitionManager().getRegionGroupCount(name, TConsensusGroupType.DataRegion));
} catch (StorageGroupNotExistsException e) {
// Return immediately if some StorageGroups doesn't exist
return new TShowStorageGroupResp()
@@ -306,10 +308,10 @@ public class ClusterSchemaManager {
}
/**
- * Only leader use this interface. Adjust the maxSchemaRegionGroupCount and
- * maxDataRegionGroupCount of each StorageGroup bases on existing cluster resources
+ * Only leader use this interface. Adjust the maxSchemaRegionGroupNum and maxDataRegionGroupNum of
+ * each StorageGroup bases on existing cluster resources
*/
- public synchronized void adjustMaxRegionGroupCount() {
+ public synchronized void adjustMaxRegionGroupNum() {
// Get all StorageGroupSchemas
Map<String, TStorageGroupSchema> storageGroupSchemaMap =
getMatchedStorageGroupSchemasByName(getStorageGroupNames());
@@ -317,53 +319,63 @@ public class ClusterSchemaManager {
int totalCpuCoreNum = getNodeManager().getTotalCpuCoreCount();
int storageGroupNum = storageGroupSchemaMap.size();
- AdjustMaxRegionGroupCountPlan adjustMaxRegionGroupCountPlan =
- new AdjustMaxRegionGroupCountPlan();
+ AdjustMaxRegionGroupNumPlan adjustMaxRegionGroupNumPlan = new AdjustMaxRegionGroupNumPlan();
for (TStorageGroupSchema storageGroupSchema : storageGroupSchemaMap.values()) {
try {
- // Adjust maxSchemaRegionGroupCount.
+ // Adjust maxSchemaRegionGroupNum for each StorageGroup.
// All StorageGroups share the DataNodes equally.
// Allocated SchemaRegionGroups are not shrunk.
int allocatedSchemaRegionGroupCount =
getPartitionManager()
- .getRegionCount(storageGroupSchema.getName(), TConsensusGroupType.SchemaRegion);
- int maxSchemaRegionGroupCount =
+ .getRegionGroupCount(
+ storageGroupSchema.getName(), TConsensusGroupType.SchemaRegion);
+ int maxSchemaRegionGroupNum =
Math.max(
- 1,
+ // The least number of SchemaRegionGroup of each StorageGroup is specified
+ // by parameter least_schema_region_group_num, which is currently unconfigurable.
+ LEAST_SCHEMA_REGION_GROUP_NUM,
Math.max(
+ // The maxSchemaRegionGroupNum of the current StorageGroup is expected to be
+ // (SCHEMA_REGION_PER_DATA_NODE * registerDataNodeNum) /
+ // (createdStorageGroupNum * schemaReplicationFactor)
(int)
- (schemaRegionPerDataNode
+ (SCHEMA_REGION_PER_DATA_NODE
* dataNodeNum
/ (double)
(storageGroupNum
* storageGroupSchema.getSchemaReplicationFactor())),
allocatedSchemaRegionGroupCount));
- // Adjust maxDataRegionGroupCount.
- // All StorageGroups divide one-third of the total cpu cores equally.
+ // Adjust maxDataRegionGroupNum for each StorageGroup.
+ // All StorageGroups divide the total cpu cores equally.
// Allocated DataRegionGroups are not shrunk.
int allocatedDataRegionGroupCount =
getPartitionManager()
- .getRegionCount(storageGroupSchema.getName(), TConsensusGroupType.DataRegion);
- int maxDataRegionGroupCount =
+ .getRegionGroupCount(storageGroupSchema.getName(), TConsensusGroupType.DataRegion);
+ int maxDataRegionGroupNum =
Math.max(
- 2,
+ // The least number of DataRegionGroup of each StorageGroup is specified
+ // by parameter least_data_region_group_num.
+ LEAST_DATA_REGION_GROUP_NUM,
Math.max(
+ // The maxDataRegionGroupNum of the current StorageGroup is expected to be
+ // (DATA_REGION_PER_PROCESSOR * totalCpuCoreNum) /
+ // (createdStorageGroupNum * dataReplicationFactor)
(int)
- (dataRegionPerProcessor
+ (DATA_REGION_PER_PROCESSOR
* totalCpuCoreNum
/ (double)
(storageGroupNum * storageGroupSchema.getDataReplicationFactor())),
allocatedDataRegionGroupCount));
- adjustMaxRegionGroupCountPlan.putEntry(
+ adjustMaxRegionGroupNumPlan.putEntry(
storageGroupSchema.getName(),
- new Pair<>(maxSchemaRegionGroupCount, maxDataRegionGroupCount));
+ new Pair<>(maxSchemaRegionGroupNum, maxDataRegionGroupNum));
} catch (StorageGroupNotExistsException e) {
- LOGGER.warn("Adjust maxRegionGroupCount failed because StorageGroup doesn't exist", e);
+ LOGGER.warn("Adjust maxRegionGroupNum failed because StorageGroup doesn't exist", e);
}
}
- getConsensusManager().write(adjustMaxRegionGroupCountPlan);
+ getConsensusManager().write(adjustMaxRegionGroupNumPlan);
}
// ======================================================
@@ -413,21 +425,21 @@ public class ClusterSchemaManager {
/**
* Only leader use this interface.
*
- * @return List<StorageGroupName>, all storageGroups' name
+ * @return List<StorageGroupName>, all StorageGroups' name
*/
public List<String> getStorageGroupNames() {
return clusterSchemaInfo.getStorageGroupNames();
}
/**
- * Only leader use this interface. Get the maxRegionGroupCount of specific StorageGroup.
+ * Only leader use this interface. Get the maxRegionGroupNum of specific StorageGroup.
*
* @param storageGroup StorageGroupName
* @param consensusGroupType SchemaRegion or DataRegion
- * @return maxSchemaRegionGroupCount or maxDataRegionGroupCount
+ * @return maxSchemaRegionGroupNum or maxDataRegionGroupNum
*/
- public int getMaxRegionGroupCount(String storageGroup, TConsensusGroupType consensusGroupType) {
- return clusterSchemaInfo.getMaxRegionGroupCount(storageGroup, consensusGroupType);
+ public int getMaxRegionGroupNum(String storageGroup, TConsensusGroupType consensusGroupType) {
+ return clusterSchemaInfo.getMaxRegionGroupNum(storageGroup, consensusGroupType);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index c2f3b61c1b..0fda46342e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -633,7 +633,8 @@ public class ConfigManager implements IManager {
resp = queryResult.convertToTDataPartitionTableResp();
- LOGGER.debug(
+ // TODO: set debug
+ LOGGER.info(
"GetOrCreateDataPartition success. receive PartitionSlotsMap: {}, return: {}",
getOrCreateDataPartitionReq.getPartitionSlotsMap(),
resp);
@@ -779,6 +780,9 @@ public class ConfigManager implements IManager {
!= CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()) {
return errorStatus.setMessage(errorPrefix + "disk_space_warning_threshold" + errorSuffix);
}
+ if (req.getLeastDataRegionGroupNum() != conf.getLeastDataRegionGroupNum()) {
+ return errorStatus.setMessage(errorPrefix + "least_data_region_group_num" + errorSuffix);
+ }
return null;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index d98df75f48..1f005bb732 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -246,7 +246,7 @@ public class NodeManager {
getConsensusManager().write(registerDataNodePlan);
// Adjust the maximum RegionGroup number of each StorageGroup
- getClusterSchemaManager().adjustMaxRegionGroupCount();
+ getClusterSchemaManager().adjustMaxRegionGroupNum();
status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("registerDataNode success.");
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 77651a9bad..d1ad916399 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -102,6 +102,8 @@ public class PartitionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionManager.class);
+ private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+
private final IManager configManager;
private final PartitionInfo partitionInfo;
@@ -129,10 +131,9 @@ public class PartitionManager {
/** Construct SeriesPartitionExecutor by iotdb-confignode.properties */
private void setSeriesPartitionExecutor() {
- ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
this.executor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
- conf.getSeriesPartitionExecutorClass(), conf.getSeriesPartitionSlotNum());
+ CONF.getSeriesPartitionExecutorClass(), CONF.getSeriesPartitionSlotNum());
}
// ======================================================
@@ -337,51 +338,57 @@ public class PartitionManager {
final String storageGroup = entry.getKey();
final int unassignedPartitionSlotsCount = entry.getValue();
- float allocatedRegionCount = partitionInfo.getRegionCount(storageGroup, consensusGroupType);
+ float allocatedRegionGroupCount =
+ partitionInfo.getRegionGroupCount(storageGroup, consensusGroupType);
// The slotCount equals to the sum of assigned slot count and unassigned slot count
float slotCount =
(float) partitionInfo.getAssignedSeriesPartitionSlotsCount(storageGroup)
+ unassignedPartitionSlotsCount;
- float maxRegionCount =
- getClusterSchemaManager().getMaxRegionGroupCount(storageGroup, consensusGroupType);
- float maxSlotCount =
- ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
+ float maxRegionGroupCount =
+ getClusterSchemaManager().getMaxRegionGroupNum(storageGroup, consensusGroupType);
+ float maxSlotCount = CONF.getSeriesPartitionSlotNum();
/* Region extension is required in the following cases */
- // 1. There are no Region has been created for the current StorageGroup
- if (allocatedRegionCount == 0) {
- // The delta is equal to the smallest integer solution that satisfies the inequality:
- // slotCount / delta < maxSlotCount / maxRegionCount
+ // 1. The number of current RegionGroup of the StorageGroup is less than the least number
+ int leastRegionGroupNum =
+ TConsensusGroupType.SchemaRegion.equals(consensusGroupType)
+ ? 1
+ : CONF.getLeastDataRegionGroupNum();
+ if (allocatedRegionGroupCount < leastRegionGroupNum) {
+ // Let the sum of unassignedPartitionSlotsCount and allocatedRegionGroupCount
+ // no less than the leastRegionGroupNum
int delta =
- Math.min(
- (int) maxRegionCount,
- Math.max(1, (int) Math.ceil(slotCount * maxRegionCount / maxSlotCount)));
+ (int)
+ Math.min(
+ unassignedPartitionSlotsCount,
+ leastRegionGroupNum - allocatedRegionGroupCount);
allotmentMap.put(storageGroup, delta);
continue;
}
// 2. The average number of partitions held by each Region will be greater than the
// expected average number after the partition allocation is completed
- if (allocatedRegionCount < maxRegionCount
- && slotCount / allocatedRegionCount > maxSlotCount / maxRegionCount) {
+ if (allocatedRegionGroupCount < maxRegionGroupCount
+ && slotCount / allocatedRegionGroupCount > maxSlotCount / maxRegionGroupCount) {
// The delta is equal to the smallest integer solution that satisfies the inequality:
- // slotCount / (allocatedRegionCount + delta) < maxSlotCount / maxRegionCount
+ // slotCount / (allocatedRegionGroupCount + delta) < maxSlotCount / maxRegionGroupCount
int delta =
Math.min(
- (int) (maxRegionCount - allocatedRegionCount),
+ (int) (maxRegionGroupCount - allocatedRegionGroupCount),
Math.max(
1,
(int)
Math.ceil(
- slotCount * maxRegionCount / maxSlotCount - allocatedRegionCount)));
+ slotCount * maxRegionGroupCount / maxSlotCount
+ - allocatedRegionGroupCount)));
allotmentMap.put(storageGroup, delta);
continue;
}
// 3. All RegionGroups in the specified StorageGroup are disabled currently
- if (allocatedRegionCount
+ if (allocatedRegionGroupCount
== filterRegionGroupThroughStatus(storageGroup, RegionGroupStatus.Disabled).size()
- && allocatedRegionCount < maxRegionCount) {
+ && allocatedRegionGroupCount < maxRegionGroupCount) {
allotmentMap.put(storageGroup, 1);
}
}
@@ -389,6 +396,8 @@ public class PartitionManager {
if (!allotmentMap.isEmpty()) {
CreateRegionGroupsPlan createRegionGroupsPlan =
getLoadManager().allocateRegionGroups(allotmentMap, consensusGroupType);
+ LOGGER.info("[CreateRegionGroups] Starting to create the following RegionGroups:");
+ createRegionGroupsPlan.planLog(LOGGER);
result =
getProcedureManager().createRegionGroups(consensusGroupType, createRegionGroupsPlan);
} else {
@@ -482,17 +491,18 @@ public class PartitionManager {
}
/**
- * Only leader use this interface. Get the number of Regions currently owned by the specific
- * StorageGroup
+ * Only leader use this interface.
+ *
+ * <p>Get the number of RegionGroups currently owned by the specific StorageGroup
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
* @return Number of Regions currently owned by the specific StorageGroup
* @throws StorageGroupNotExistsException When the specific StorageGroup doesn't exist
*/
- public int getRegionCount(String storageGroup, TConsensusGroupType type)
+ public int getRegionGroupCount(String storageGroup, TConsensusGroupType type)
throws StorageGroupNotExistsException {
- return partitionInfo.getRegionCount(storageGroup, type);
+ return partitionInfo.getRegionGroupCount(storageGroup, type);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index b08e4059ef..ea702afd1d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -62,7 +62,7 @@ import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProce
import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
-import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupCountPlan;
+import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
@@ -268,9 +268,9 @@ public class ConfigPlanExecutor {
return status;
}
return partitionInfo.setStorageGroup((SetStorageGroupPlan) physicalPlan);
- case AdjustMaxRegionGroupCount:
+ case AdjustMaxRegionGroupNum:
return clusterSchemaInfo.adjustMaxRegionGroupCount(
- (AdjustMaxRegionGroupCountPlan) physicalPlan);
+ (AdjustMaxRegionGroupNumPlan) physicalPlan);
case DeleteStorageGroup:
partitionInfo.deleteStorageGroup((DeleteStorageGroupPlan) physicalPlan);
return clusterSchemaInfo.deleteStorageGroup((DeleteStorageGroupPlan) physicalPlan);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 17d66d36aa..19bc75482f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -607,15 +607,16 @@ public class PartitionInfo implements SnapshotProcessor {
}
/**
- * Only leader use this interface. Get the number of Regions currently owned by the specific
- * StorageGroup
+ * Only leader use this interface.
+ *
+ * <p>Get the number of RegionGroups currently owned by the specified StorageGroup
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
* @return Number of Regions currently owned by the specific StorageGroup
* @throws StorageGroupNotExistsException When the specific StorageGroup doesn't exist
*/
- public int getRegionCount(String storageGroup, TConsensusGroupType type)
+ public int getRegionGroupCount(String storageGroup, TConsensusGroupType type)
throws StorageGroupNotExistsException {
if (!isStorageGroupExisted(storageGroup)) {
throw new StorageGroupNotExistsException(storageGroup);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
index e16dda2c8a..4ba5883264 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplate
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetTemplateSetInfoPlan;
-import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupCountPlan;
+import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
@@ -328,17 +328,17 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
* @param plan AdjustMaxRegionGroupCountPlan
* @return SUCCESS_STATUS
*/
- public TSStatus adjustMaxRegionGroupCount(AdjustMaxRegionGroupCountPlan plan) {
+ public TSStatus adjustMaxRegionGroupCount(AdjustMaxRegionGroupNumPlan plan) {
TSStatus result = new TSStatus();
storageGroupReadWriteLock.writeLock().lock();
try {
for (Map.Entry<String, Pair<Integer, Integer>> entry :
- plan.getMaxRegionGroupCountMap().entrySet()) {
+ plan.getMaxRegionGroupNumMap().entrySet()) {
PartialPath path = new PartialPath(entry.getKey());
TStorageGroupSchema storageGroupSchema =
mTree.getStorageGroupNodeByStorageGroupPath(path).getStorageGroupSchema();
- storageGroupSchema.setMaxSchemaRegionGroupCount(entry.getValue().getLeft());
- storageGroupSchema.setMaxDataRegionGroupCount(entry.getValue().getRight());
+ storageGroupSchema.setMaxSchemaRegionGroupNum(entry.getValue().getLeft());
+ storageGroupSchema.setMaxDataRegionGroupNum(entry.getValue().getRight());
}
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (MetadataException e) {
@@ -437,13 +437,13 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
}
/**
- * Only leader use this interface. Get the maxRegionGroupCount of specific StorageGroup.
+ * Only leader use this interface. Get the maxRegionGroupNum of specific StorageGroup.
*
* @param storageGroup StorageGroupName
* @param consensusGroupType SchemaRegion or DataRegion
- * @return maxSchemaRegionGroupCount or maxDataRegionGroupCount
+ * @return maxSchemaRegionGroupNum or maxDataRegionGroupNum
*/
- public int getMaxRegionGroupCount(String storageGroup, TConsensusGroupType consensusGroupType) {
+ public int getMaxRegionGroupNum(String storageGroup, TConsensusGroupType consensusGroupType) {
storageGroupReadWriteLock.readLock().lock();
try {
PartialPath path = new PartialPath(storageGroup);
@@ -451,10 +451,10 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
mTree.getStorageGroupNodeByStorageGroupPath(path).getStorageGroupSchema();
switch (consensusGroupType) {
case SchemaRegion:
- return storageGroupSchema.getMaxSchemaRegionGroupCount();
+ return storageGroupSchema.getMaxSchemaRegionGroupNum();
case DataRegion:
default:
- return storageGroupSchema.getMaxDataRegionGroupCount();
+ return storageGroupSchema.getMaxDataRegionGroupNum();
}
} catch (MetadataException e) {
LOGGER.warn("Error StorageGroup name", e);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 0800c8840e..99b12b84af 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -511,11 +511,9 @@ public class ConfigNodeProcedureEnv {
return getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL();
}
- public void persistAndBroadcastRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) {
+ public void persistRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) {
// Persist the allocation result
getConsensusManager().write(createRegionGroupsPlan);
- // Broadcast the latest RegionRouteMap
- getLoadManager().broadcastLatestRegionRouteMap();
}
public void activateRegionGroup(
@@ -549,6 +547,11 @@ public class ConfigNodeProcedureEnv {
getLoadManager().getRouteBalancer().updateRegionRouteMap();
}
+ public void broadcastRegionGroup() {
+ // Broadcast the latest RegionRouteMap
+ getLoadManager().broadcastLatestRegionRouteMap();
+ }
+
public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
return getPartitionManager().getAllReplicaSets(storageGroup);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 21576f936b..dd15ad1117 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -98,6 +98,9 @@ public class CreateRegionGroupsProcedure
// A RegionGroup was created successfully when
// all RegionReplicas were created successfully
persistPlan.addRegionGroup(storageGroup, regionReplicaSet);
+ LOGGER.info(
+ "[CreateRegionGroups] All replicas of RegionGroup: {} are created successfully!",
+ regionReplicaSet.getRegionId());
} else {
TRegionReplicaSet failedRegionReplicas =
failedRegionReplicaSets.get(regionReplicaSet.getRegionId());
@@ -127,6 +130,9 @@ public class CreateRegionGroupsProcedure
offerPlan.appendRegionMaintainTask(createTask);
});
+ LOGGER.info(
+ "[CreateRegionGroups] Failed to create some replicas of RegionGroup: {}, but this RegionGroup can still be used.",
+ regionReplicaSet.getRegionId());
} else {
// The redundant RegionReplicas should be deleted otherwise
regionReplicaSet
@@ -142,11 +148,15 @@ public class CreateRegionGroupsProcedure
offerPlan.appendRegionMaintainTask(deleteTask);
}
});
+
+ LOGGER.info(
+ "[CreateRegionGroups] Failed to create most of replicas in RegionGroup: {}, The redundant replicas in this RegionGroup will be deleted.",
+ regionReplicaSet.getRegionId());
}
}
}));
- env.persistAndBroadcastRegionGroup(persistPlan);
+ env.persistRegionGroup(persistPlan);
env.getConfigManager().getConsensusManager().write(offerPlan);
setNextState(CreateRegionGroupsState.ACTIVATE_REGION_GROUPS);
break;
@@ -191,6 +201,7 @@ public class CreateRegionGroupsProcedure
setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
break;
case CREATE_REGION_GROUPS_FINISH:
+ env.broadcastRegionGroup();
return Flow.NO_MORE_STATE;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index ba92491931..96b2dd6e53 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
@@ -56,6 +57,7 @@ public class ConfigNode implements ConfigNodeMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNode.class);
private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
+ private static final CommonConfig COMMON_CONF = CommonDescriptor.getInstance().getConfig();
private static final int SCHEDULE_WAITING_RETRY_NUM = 20;
@@ -225,14 +227,15 @@ public class ConfigNode implements ConfigNodeMBean {
CONF.getSchemaRegionConsensusProtocolClass(),
CONF.getSeriesPartitionSlotNum(),
CONF.getSeriesPartitionExecutorClass(),
- CommonDescriptor.getInstance().getConfig().getDefaultTTLInMs(),
+ COMMON_CONF.getDefaultTTLInMs(),
CONF.getTimePartitionInterval(),
CONF.getSchemaReplicationFactor(),
CONF.getSchemaRegionPerDataNode(),
CONF.getDataReplicationFactor(),
CONF.getDataRegionPerProcessor(),
CONF.getReadConsistencyLevel(),
- CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold());
+ COMMON_CONF.getDiskSpaceWarningThreshold(),
+ CONF.getLeastDataRegionGroupNum());
TEndPoint targetConfigNode = CONF.getTargetConfigNode();
if (targetConfigNode == null) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 5641f1ec52..97b4d9fd2e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -255,8 +255,8 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac
}
// Initialize the maxSchemaRegionGroupCount and maxDataRegionGroupCount as 0
- storageGroupSchema.setMaxSchemaRegionGroupCount(0);
- storageGroupSchema.setMaxDataRegionGroupCount(0);
+ storageGroupSchema.setMaxSchemaRegionGroupNum(0);
+ storageGroupSchema.setMaxDataRegionGroupNum(0);
SetStorageGroupPlan setReq = new SetStorageGroupPlan(storageGroupSchema);
TSStatus resp = configManager.setStorageGroup(setReq);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 07a77ee756..3b6c0afd28 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -79,7 +79,7 @@ import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProce
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
-import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupCountPlan;
+import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan;
@@ -248,13 +248,13 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void AdjustMaxRegionGroupCountPlanTest() throws IOException {
- AdjustMaxRegionGroupCountPlan req0 = new AdjustMaxRegionGroupCountPlan();
+ AdjustMaxRegionGroupNumPlan req0 = new AdjustMaxRegionGroupNumPlan();
for (int i = 0; i < 3; i++) {
req0.putEntry("root.sg" + i, new Pair<>(i, i));
}
- AdjustMaxRegionGroupCountPlan req1 =
- (AdjustMaxRegionGroupCountPlan)
+ AdjustMaxRegionGroupNumPlan req1 =
+ (AdjustMaxRegionGroupNumPlan)
ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer());
Assert.assertEquals(req0, req1);
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index f7fa4fe4fe..114f63ce0c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -326,4 +326,11 @@ public class MppConfig implements BaseConfig {
"enable_leader_balancing", String.valueOf(enableLeaderBalancing));
return this;
}
+
+ @Override
+ public BaseConfig setLeastDataRegionGroupNum(int leastDataRegionGroupNum) {
+ confignodeProperties.setProperty(
+ "least_data_region_group_num", String.valueOf(leastDataRegionGroupNum));
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 8f3dffdd1b..97bd4eb24e 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -346,4 +346,12 @@ public interface BaseConfig {
default boolean isEnableLeaderBalancing() {
return false;
}
+
+ default BaseConfig setLeastDataRegionGroupNum(int leastDataRegionGroupNum) {
+ return this;
+ }
+
+ default int getLeastDataRegionGroupNum() {
+ return 5;
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
index 3f15c498f8..72ab0a074f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
@@ -39,12 +39,14 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.ConfigFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.env.BaseConfig;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -60,6 +62,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -72,6 +75,8 @@ public class IoTDBPartitionGetterIT {
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBPartitionGetterIT.class);
+ private static final BaseConfig CONF = ConfigFactory.getConfig();
+
private static String originalConfigNodeConsensusProtocolClass;
private static String originalSchemaRegionConsensusProtocolClass;
private static String originalDataRegionConsensusProtocolClass;
@@ -84,6 +89,9 @@ public class IoTDBPartitionGetterIT {
private static long originalTimePartitionInterval;
private static final long testTimePartitionInterval = 604800000;
+ protected static int originalLeastDataRegionGroupNum;
+ private static final int testLeastDataRegionGroupNum = 10;
+
private static final String sg = "root.sg";
private static final int storageGroupNum = 5;
private static final int seriesPartitionSlotsNum = 10000;
@@ -99,17 +107,20 @@ public class IoTDBPartitionGetterIT {
ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
originalDataRegionConsensusProtocolClass =
ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
- ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
- ConfigFactory.getConfig().setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
- ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
+ CONF.setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
+ CONF.setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+ CONF.setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
+
+ originalSchemaReplicationFactor = CONF.getSchemaReplicationFactor();
+ originalDataReplicationFactor = CONF.getDataReplicationFactor();
+ CONF.setSchemaReplicationFactor(testReplicationFactor);
+ CONF.setDataReplicationFactor(testReplicationFactor);
- originalSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
- originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
- ConfigFactory.getConfig().setSchemaReplicationFactor(testReplicationFactor);
- ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
+ originalTimePartitionInterval = CONF.getTimePartitionInterval();
+ CONF.setTimePartitionInterval(testTimePartitionInterval);
- originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
- ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
+ originalLeastDataRegionGroupNum = CONF.getLeastDataRegionGroupNum();
+ CONF.setLeastDataRegionGroupNum(testLeastDataRegionGroupNum);
// Init 1C3D environment
EnvFactory.getEnv().initClusterEnvironment(1, 3);
@@ -210,17 +221,13 @@ public class IoTDBPartitionGetterIT {
public static void tearDown() {
EnvFactory.getEnv().cleanAfterClass();
- ConfigFactory.getConfig()
- .setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
- ConfigFactory.getConfig()
- .setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
- ConfigFactory.getConfig()
- .setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
+ CONF.setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+ CONF.setSchemaRegionConsensusProtocolClass(originalSchemaRegionConsensusProtocolClass);
+ CONF.setDataRegionConsensusProtocolClass(originalDataRegionConsensusProtocolClass);
- ConfigFactory.getConfig().setSchemaReplicationFactor(originalSchemaReplicationFactor);
- ConfigFactory.getConfig().setDataReplicationFactor(originalDataReplicationFactor);
-
- ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
+ CONF.setSchemaReplicationFactor(originalSchemaReplicationFactor);
+ CONF.setDataReplicationFactor(originalDataReplicationFactor);
+ CONF.setTimePartitionInterval(originalTimePartitionInterval);
}
@Test
@@ -341,6 +348,14 @@ public class IoTDBPartitionGetterIT {
dataPartitionTableResp.getDataPartitionTable());
}
}
+
+ // Check the number of DataRegionGroup.
+ // And this number should be greater than or equal to testLeastDataRegionGroupNum
+ TShowStorageGroupResp showStorageGroupResp =
+ client.showStorageGroup(Arrays.asList(storageGroup.split("\\.")));
+ Assert.assertTrue(
+ showStorageGroupResp.getStorageGroupInfoMap().get(storageGroup).getDataRegionNum()
+ >= testLeastDataRegionGroupNum);
}
}
}
@@ -384,7 +399,7 @@ public class IoTDBPartitionGetterIT {
getRegionIdResp = client.getRegionId(getRegionIdReq);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), getRegionIdResp.status.getCode());
- Assert.assertEquals(1, getRegionIdResp.getDataRegionIdListSize());
+ Assert.assertEquals(10, getRegionIdResp.getDataRegionIdListSize());
final String d00 = sg0 + ".d0.s";
final String d01 = sg0 + ".d1.s";
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java
index 31bed5a0bf..fb9952e29f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyTest.java
@@ -94,7 +94,8 @@ public class IoTDBPartitionInheritPolicyTest {
originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
- EnvFactory.getEnv().initBeforeClass();
+ // Init 1C3D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
// Set StorageGroups
try (SyncConfigNodeIServiceClient client =
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
index decd847622..3d6a3fbaea 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceIT.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -45,9 +44,10 @@ import java.util.Map;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBAlignByDeviceIT {
- private static String[] sqls =
+ private static final String[] sqls =
new String[] {
"CREATE DATABASE root.vehicle",
"CREATE DATABASE root.other",
@@ -126,7 +126,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectTest() {
String[] retArray =
new String[] {
@@ -194,7 +193,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectWithDuplicatedPathsTest() {
String[] retArray =
new String[] {
@@ -252,7 +250,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectLimitTest() {
String[] retArray =
new String[] {
@@ -306,7 +303,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectSlimitTest2() {
String[] retArray =
new String[] {
@@ -351,7 +347,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectSlimitTest() {
String[] retArray =
new String[] {
@@ -408,7 +403,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectWithValueFilterTest() {
String[] retArray =
new String[] {
@@ -464,8 +458,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category(ClusterIT.class)
- // Result is different from Old standalone version
public void selectDifferentSeriesWithValueFilterWithoutCacheTest() {
String[] retArray =
new String[] {
@@ -515,7 +507,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectDifferentSeriesWithBinaryValueFilterWithoutCacheTest() {
String[] retArray =
new String[] {
@@ -560,7 +551,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void aggregateTest() {
String[] retArray =
new String[] {"root.vehicle.d0,11,11,6,6,1,", "root.vehicle.d1,2,null,null,null,null,"};
@@ -609,7 +599,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void groupByTimeTest() {
String[] retArray =
new String[] {
@@ -665,7 +654,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void groupByTimeWithValueFilterTest() {
String[] retArray =
new String[] {
@@ -709,61 +697,7 @@ public class IoTDBAlignByDeviceIT {
}
}
- @Ignore
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
- public void fillTest() {
- String[] retArray =
- new String[] {
- "3,root.vehicle.d0,10000,40000,3.33,null,null,",
- "3,root.vehicle.d1,999,null,null,null,null,",
- };
-
- try (Connection connection = EnvFactory.getEnv().getConnection();
- Statement statement = connection.createStatement()) {
-
- try (ResultSet resultSet =
- statement.executeQuery(
- "select * from root.vehicle.* where time = 3 Fill(int32[previous, 5ms]) align by device")) {
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- List<Integer> actualIndexToExpectedIndexList =
- checkHeader(
- resultSetMetaData,
- "Time,Device,s0,s1,s2,s3,s4",
- new int[] {
- Types.TIMESTAMP,
- Types.VARCHAR,
- Types.INTEGER,
- Types.BIGINT,
- Types.FLOAT,
- Types.VARCHAR,
- Types.BOOLEAN,
- });
-
- int cnt = 0;
- while (resultSet.next()) {
- String[] expectedStrings = retArray[cnt].split(",");
- StringBuilder expectedBuilder = new StringBuilder();
- StringBuilder actualBuilder = new StringBuilder();
- for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
- actualBuilder.append(resultSet.getString(i)).append(",");
- expectedBuilder
- .append(expectedStrings[actualIndexToExpectedIndexList.get(i - 1)])
- .append(",");
- }
- Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
- cnt++;
- }
- Assert.assertEquals(retArray.length, cnt);
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void errorCaseTest3() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
@@ -786,7 +720,6 @@ public class IoTDBAlignByDeviceIT {
* count(root.vehicle.d0.s0) INT64 count(root.vehicle.d1.s0) INT64 count(root.other.d1.s0) INT64
*/
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void unusualCaseTest1() {
String[] retArray =
new String[] {"root.other.d1,1,", "root.vehicle.d0,11,", "root.vehicle.d1,2,"};
@@ -824,7 +757,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void unusualCaseTest2() {
String[] retArray =
new String[] {
@@ -884,7 +816,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectNonExistTest() {
String[] retArray =
new String[] {
@@ -973,7 +904,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectWithRegularExpressionTest() {
String[] retArray =
new String[] {
@@ -1041,7 +971,6 @@ public class IoTDBAlignByDeviceIT {
}
@Test
- @Category({LocalStandaloneIT.class, ClusterIT.class})
public void selectWithNonExistMeasurementInWhereClause() {
String[] retArray = new String[] {"1,root.vehicle.d0,101,1101,null,null,null,"};
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 27f5958ff0..76f981c710 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -87,6 +87,13 @@
# Datatype: double
# data_region_per_processor=0.5
+# The least number of DataRegionGroup for each StorageGroup.
+# The ConfigNode-leader will create a DataRegionGroup for each newborn SeriesPartitionSlot
+# for the newly created StorageGroup until the number of DataRegionGroup is equal to this parameter.
+# Notice: In order to ensure the efficiency of concurrent write, this parameter should greater than 1.
+# Datatype: int
+# least_data_region_group_num=5
+
# Region allocate strategy
# These allocate strategies are currently supported:
# 1. GREEDY(Default, when region is allocated, always choose the dataNode that has been allocated the minimum regions)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index ba6909e4fa..761ebfa033 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import com.google.common.util.concurrent.SettableFuture;
@@ -131,12 +132,13 @@ public class DataDriver extends Driver {
Set<String> selectedDeviceIdSet =
pathList.stream().map(PartialPath::getDevice).collect(Collectors.toSet());
+ Filter timeFilter = context.getTimeFilter();
QueryDataSource dataSource =
dataRegion.query(
pathList,
selectedDeviceIdSet.size() == 1 ? selectedDeviceIdSet.iterator().next() : null,
driverContext.getFragmentInstanceContext(),
- context.getTimeFilter());
+ timeFilter != null ? timeFilter.copy() : null);
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 632fbeb05a..f331b2ed87 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -38,8 +38,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import com.google.common.util.concurrent.ListenableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -51,8 +49,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractIntoOperator implements ProcessOperator {
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class);
-
protected final OperatorContext operatorContext;
protected final Operator child;
@@ -138,6 +134,19 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
return false;
}
+ private boolean existNonEmptyStatement(
+ List<InsertTabletStatementGenerator> insertTabletStatementGenerators) {
+ if (insertTabletStatementGenerators == null) {
+ return false;
+ }
+ for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ if (generator != null && !generator.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
protected int findWritten(String device, String measurement) {
for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
if (!Objects.equals(generator.getDevice(), device)) {
@@ -160,7 +169,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return child.hasNext();
+ return existNonEmptyStatement(insertTabletStatementGenerators) || child.hasNext();
}
@Override
@@ -173,7 +182,7 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
@Override
public boolean isFinished() {
- return child.isFinished();
+ return !hasNext();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index b1eccd65c4..68329d56e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -99,7 +99,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
}
}
- if (hasNext()) {
+ if (child.hasNext()) {
return null;
} else {
insertMultiTabletsInternally(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index b9d6274ac0..9a3a5e6634 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -96,6 +96,9 @@ public class DeviceViewOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
+ if (deviceIndex >= deviceOperators.size()) {
+ return NOT_BLOCKED;
+ }
ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
if (!blocked.isDone()) {
return blocked;
@@ -105,6 +108,10 @@ public class DeviceViewOperator implements ProcessOperator {
@Override
public TsBlock next() {
+ if (!getCurDeviceOperator().hasNext()) {
+ deviceIndex++;
+ return null;
+ }
TsBlock tsBlock = getCurDeviceOperator().next();
if (tsBlock == null) {
return null;
@@ -132,14 +139,7 @@ public class DeviceViewOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- while (!getCurDeviceOperator().hasNext()) {
- if (deviceIndex + 1 < devices.size()) {
- deviceIndex++;
- } else {
- return false;
- }
- }
- return true;
+ return deviceIndex < devices.size();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index d79c9b3609..4a34488779 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -74,7 +74,7 @@ public class IntoOperator extends AbstractIntoOperator {
}
}
- if (hasNext()) {
+ if (child.hasNext()) {
return null;
} else {
insertMultiTabletsInternally(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index c7964ae828..484a342715 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -177,6 +177,7 @@ import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.Gt;
import org.apache.iotdb.tsfile.read.filter.operator.GtEq;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -243,6 +244,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
SeriesScanOperator.class.getSimpleName());
+ Filter timeFilter = node.getTimeFilter();
+ Filter valueFilter = node.getValueFilter();
SeriesScanOperator seriesScanOperator =
new SeriesScanOperator(
node.getPlanNodeId(),
@@ -250,8 +253,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()),
seriesPath.getSeriesType(),
operatorContext,
- node.getTimeFilter(),
- node.getValueFilter(),
+ timeFilter != null ? timeFilter.copy() : null,
+ valueFilter != null ? valueFilter.copy() : null,
ascending);
context.addSourceOperator(seriesScanOperator);
@@ -273,13 +276,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
node.getPlanNodeId(),
AlignedSeriesScanOperator.class.getSimpleName());
+ Filter timeFilter = node.getTimeFilter();
+ Filter valueFilter = node.getValueFilter();
AlignedSeriesScanOperator seriesScanOperator =
new AlignedSeriesScanOperator(
node.getPlanNodeId(),
seriesPath,
operatorContext,
- node.getTimeFilter(),
- node.getValueFilter(),
+ timeFilter != null ? timeFilter.copy() : null,
+ valueFilter != null ? valueFilter.copy() : null,
ascending);
context.addSourceOperator(seriesScanOperator);
@@ -320,6 +325,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
AggregationUtil.calculateMaxAggregationResultSize(
node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider());
+ Filter timeFilter = node.getTimeFilter();
SeriesAggregationScanOperator aggregateScanOperator =
new SeriesAggregationScanOperator(
node.getPlanNodeId(),
@@ -328,7 +334,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
operatorContext,
aggregators,
timeRangeIterator,
- node.getTimeFilter(),
+ timeFilter != null ? timeFilter.copy() : null,
ascending,
node.getGroupByTimeParameter(),
maxReturnSize);
@@ -381,6 +387,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
AggregationUtil.calculateMaxAggregationResultSize(
node.getAggregationDescriptorList(), timeRangeIterator, context.getTypeProvider());
+ Filter timeFilter = node.getTimeFilter();
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
node.getPlanNodeId(),
@@ -388,7 +395,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
operatorContext,
aggregators,
timeRangeIterator,
- node.getTimeFilter(),
+ timeFilter != null ? timeFilter.copy() : null,
ascending,
groupByTimeParameter,
maxReturnSize);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
index e8daf854cf..f564e1421e 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
@@ -180,6 +180,9 @@ public class DeviceMergeOperatorTest {
int count = 0;
while (deviceMergeOperator.hasNext()) {
TsBlock tsBlock = deviceMergeOperator.next();
+ if (tsBlock == null) {
+ continue;
+ }
assertEquals(3, tsBlock.getValueColumnCount());
assertEquals(20, tsBlock.getPositionCount());
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -338,6 +341,9 @@ public class DeviceMergeOperatorTest {
int count = 0;
while (deviceMergeOperator.hasNext()) {
TsBlock tsBlock = deviceMergeOperator.next();
+ if (tsBlock == null) {
+ continue;
+ }
assertEquals(2, tsBlock.getValueColumnCount());
assertEquals(20, tsBlock.getPositionCount());
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
@@ -501,6 +507,9 @@ public class DeviceMergeOperatorTest {
int count = 0;
while (deviceMergeOperator.hasNext()) {
TsBlock tsBlock = deviceMergeOperator.next();
+ if (tsBlock == null) {
+ continue;
+ }
assertEquals(3, tsBlock.getValueColumnCount());
assertEquals(20, tsBlock.getPositionCount());
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
index a40f649e92..37d3ab0595 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
@@ -152,6 +152,9 @@ public class DeviceViewOperatorTest {
int count = 0;
while (deviceViewOperator.hasNext()) {
TsBlock tsBlock = deviceViewOperator.next();
+ if (tsBlock == null) {
+ continue;
+ }
assertEquals(3, tsBlock.getValueColumnCount());
assertEquals(20, tsBlock.getPositionCount());
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5d78893b64..a8c7872ff3 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1479,18 +1479,18 @@ public class Session implements ISession {
for (int i = 0; i < deviceIds.size(); i++) {
final SessionConnection connection = getSessionConnection(deviceIds.get(i));
TSInsertStringRecordsReq request =
- recordsGroup.computeIfAbsent(connection, k -> new TSInsertStringRecordsReq());
+ recordsGroup.getOrDefault(connection, new TSInsertStringRecordsReq());
request.setIsAligned(isAligned);
try {
filterAndUpdateTSInsertStringRecordsReq(
request, deviceIds.get(i), times.get(i), measurementsList.get(i), valuesList.get(i));
+ recordsGroup.putIfAbsent(connection, request);
} catch (NoValidValueException e) {
logger.warn(
"All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements is [{}]",
deviceIds.get(i),
times.get(i),
measurementsList.get(i).toString());
- continue;
}
}
@@ -2156,8 +2156,7 @@ public class Session implements ISession {
Map<SessionConnection, TSInsertRecordsReq> recordsGroup = new HashMap<>();
for (int i = 0; i < deviceIds.size(); i++) {
final SessionConnection connection = getSessionConnection(deviceIds.get(i));
- TSInsertRecordsReq request =
- recordsGroup.computeIfAbsent(connection, k -> new TSInsertRecordsReq());
+ TSInsertRecordsReq request = recordsGroup.getOrDefault(connection, new TSInsertRecordsReq());
request.setIsAligned(isAligned);
try {
filterAndUpdateTSInsertRecordsReq(
@@ -2167,13 +2166,13 @@ public class Session implements ISession {
measurementsList.get(i),
typesList.get(i),
valuesList.get(i));
+ recordsGroup.putIfAbsent(connection, request);
} catch (NoValidValueException e) {
logger.warn(
"All values are null and this submission is ignored,deviceId is [{}],time is [{}],measurements are [{}]",
deviceIds.get(i),
times.get(i),
measurementsList.get(i).toString());
- continue;
}
}
insertByGroup(recordsGroup, SessionConnection::insertRecords);
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 2434e371c1..8d5698fa09 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -170,8 +170,8 @@ struct TStorageGroupSchema {
3: optional i32 schemaReplicationFactor
4: optional i32 dataReplicationFactor
5: optional i64 timePartitionInterval
- 6: optional i32 maxSchemaRegionGroupCount
- 7: optional i32 maxDataRegionGroupCount
+ 6: optional i32 maxSchemaRegionGroupNum
+ 7: optional i32 maxDataRegionGroupNum
}
// Schema
@@ -307,6 +307,7 @@ struct TConfigNodeRegisterReq {
11: required double dataRegionPerProcessor
12: required string readConsistencyLevel
13: required double diskSpaceWarningThreshold
+ 14: required i32 leastDataRegionGroupNum
}
struct TConfigNodeRegisterResp {