You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/04/06 06:55:02 UTC
[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5719] Cluster-Wide Time Series Limit Control (#9430)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 879d6b180c [To rel/1.1][IOTDB-5719] Cluster-Wide Time Series Limit Control (#9430)
879d6b180c is described below
commit 879d6b180cb0739d6ed99224626708631cb623f0
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Thu Apr 6 14:54:56 2023 +0800
[To rel/1.1][IOTDB-5719] Cluster-Wide Time Series Limit Control (#9430)
---
.../heartbeat/DataNodeHeartbeatHandler.java | 9 +-
.../statemachine/ConfigRegionStateMachine.java | 1 +
.../iotdb/confignode/manager/ConfigManager.java | 5 +-
.../apache/iotdb/confignode/manager/IManager.java | 1 +
.../manager/load/balancer/RegionBalancer.java | 2 +-
.../iotdb/confignode/manager/node/NodeManager.java | 6 +-
.../manager/partition/PartitionManager.java | 17 ++-
.../manager/partition/PartitionMetrics.java | 2 +-
.../manager/{ => schema}/ClusterSchemaManager.java | 23 +++-
.../schema/ClusterSchemaQuotaStatistics.java} | 33 ++++--
.../partition/DatabasePartitionTable.java | 4 +
.../persistence/partition/PartitionInfo.java | 13 +++
.../procedure/env/ConfigNodeProcedureEnv.java | 9 +-
.../impl/schema/DeleteDatabaseProcedure.java | 8 +-
.../manager/ClusterSchemaManagerTest.java | 2 +
.../iotdb/it/env/cluster/MppCommonConfig.java | 12 ++
.../it/env/cluster/MppSharedCommonConfig.java | 14 +++
.../iotdb/it/env/remote/RemoteCommonConfig.java | 10 ++
.../org/apache/iotdb/itbase/env/CommonConfig.java | 4 +
.../db/it/schema/IoTDBClusterDeviceQuotaIT.java | 25 ++---
.../it/schema/IoTDBClusterMeasurementQuotaIT.java | 125 +++++++++++++++++++++
.../resources/conf/iotdb-common.properties | 14 +++
.../commons/schema/ClusterSchemaQuotaLevel.java | 24 ++--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 ++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 25 ++++-
.../metadata/SchemaQuotaExceededException.java | 27 ++---
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 4 +
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 6 +-
.../db/metadata/mtree/store/CachedMTreeStore.java | 7 +-
.../db/metadata/mtree/store/MemMTreeStore.java | 7 +-
.../rescon/DataNodeSchemaQuotaManager.java | 92 +++++++++++++++
.../db/metadata/schemaregion/SchemaEngine.java | 46 ++++++++
.../schemaregion/SchemaRegionMemoryImpl.java | 16 ++-
.../schemaregion/SchemaRegionSchemaFileImpl.java | 5 +
.../impl/DataNodeInternalRPCServiceImpl.java | 7 ++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
thrift/src/main/thrift/datanode.thrift | 2 +
37 files changed, 546 insertions(+), 84 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 28e1d26254..181cacddf3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.async.AsyncMethodCallback;
import java.util.Map;
+import java.util.function.Consumer;
public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
@@ -40,16 +41,19 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
private final RouteBalancer routeBalancer;
+ private final Consumer<Map<TConsensusGroupId, Long>> schemaQuotaRespProcess;
public DataNodeHeartbeatHandler(
TDataNodeLocation dataNodeLocation,
DataNodeHeartbeatCache dataNodeHeartbeatCache,
Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap,
- RouteBalancer routeBalancer) {
+ RouteBalancer routeBalancer,
+ Consumer<Map<TConsensusGroupId, Long>> schemaQuotaRespProcess) {
this.dataNodeLocation = dataNodeLocation;
this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
this.regionGroupCacheMap = regionGroupCacheMap;
this.routeBalancer = routeBalancer;
+ this.schemaQuotaRespProcess = schemaQuotaRespProcess;
}
@Override
@@ -82,6 +86,9 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId()));
}
});
+ if (heartbeatResp.getSchemaCountMap() != null) {
+ schemaQuotaRespProcess.accept(heartbeatResp.getSchemaCountMap());
+ }
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 8555ff9a77..608e4b2297 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -236,6 +236,7 @@ public class ConfigRegionStateMachine
configManager.getNodeManager().stopHeartbeatService();
configManager.getPartitionManager().stopRegionCleaner();
configManager.getCQManager().stopCQScheduler();
+ configManager.getClusterSchemaManager().clearSchemaQuotaCache();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6a4b8ddb15..48893cfef4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -89,6 +89,8 @@ import org.apache.iotdb.confignode.manager.node.NodeMetrics;
import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaQuotaStatistics;
import org.apache.iotdb.confignode.persistence.AuthorInfo;
import org.apache.iotdb.confignode.persistence.ModelInfo;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
@@ -263,7 +265,8 @@ public class ConfigManager implements IManager {
// Build the manager module
this.nodeManager = new NodeManager(this, nodeInfo);
- this.clusterSchemaManager = new ClusterSchemaManager(this, clusterSchemaInfo);
+ this.clusterSchemaManager =
+ new ClusterSchemaManager(this, clusterSchemaInfo, new ClusterSchemaQuotaStatistics());
this.partitionManager = new PartitionManager(this, partitionInfo);
this.permissionManager = new PermissionManager(this, authorInfo);
this.procedureManager = new ProcedureManager(this, procedureInfo);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 8ace5e2f13..f85706230a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.manager.cq.CQManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 882a5eeebe..198d70a59d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -27,13 +27,13 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import java.util.List;
import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index fb822a9abb..276b879d4d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -52,7 +52,6 @@ import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.TriggerManager;
@@ -64,6 +63,7 @@ import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCac
import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
@@ -722,6 +722,7 @@ public class NodeManager {
heartbeatReq.setNeedJudgeLeader(true);
// We sample DataNode's load in every 10 heartbeat loop
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
+ heartbeatReq.setSchemaQuotaCount(getClusterSchemaManager().getSchemaQuotaCount());
/* Update heartbeat counter */
heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
@@ -745,7 +746,8 @@ public class NodeManager {
dataNodeInfo.getLocation().getDataNodeId(),
empty -> new DataNodeHeartbeatCache()),
getPartitionManager().getRegionGroupCacheMap(),
- getLoadManager().getRouteBalancer());
+ getLoadManager().getRouteBalancer(),
+ getClusterSchemaManager()::updateSchemaQuota);
AsyncDataNodeHeartbeatClientPool.getInstance()
.getDataNodeHeartBeat(
dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 5e2ad788aa..79df2c353b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -63,12 +63,12 @@ import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionR
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -712,6 +712,15 @@ public class PartitionManager {
return result;
}
+ /**
+ * Only leader use this interface.
+ *
+ * @return TConsensusGroupId set of all schema region
+ */
+ public Set<TConsensusGroupId> getAllSchemaPartition() {
+ return partitionInfo.getAllSchemaPartition();
+ }
+
/**
* Only leader use this interface
*
@@ -735,10 +744,10 @@ public class PartitionManager {
return schemaNodeManagementResp;
}
- public void preDeleteStorageGroup(
- String storageGroup, PreDeleteDatabasePlan.PreDeleteType preDeleteType) {
+ public void preDeleteDatabase(
+ String database, PreDeleteDatabasePlan.PreDeleteType preDeleteType) {
final PreDeleteDatabasePlan preDeleteDatabasePlan =
- new PreDeleteDatabasePlan(storageGroup, preDeleteType);
+ new PreDeleteDatabasePlan(database, preDeleteType);
getConsensusManager().write(preDeleteDatabasePlan);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
index 21909d6a5a..87a748e47f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
@@ -27,10 +27,10 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index e6c0529255..15036c5bfa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.manager.schema;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -61,6 +62,7 @@ import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInf
import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp;
import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
+import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
@@ -107,10 +109,15 @@ public class ClusterSchemaManager {
private final IManager configManager;
private final ClusterSchemaInfo clusterSchemaInfo;
+ private final ClusterSchemaQuotaStatistics schemaQuotaStatistics;
- public ClusterSchemaManager(IManager configManager, ClusterSchemaInfo clusterSchemaInfo) {
+ public ClusterSchemaManager(
+ IManager configManager,
+ ClusterSchemaInfo clusterSchemaInfo,
+ ClusterSchemaQuotaStatistics schemaQuotaStatistics) {
this.configManager = configManager;
this.clusterSchemaInfo = clusterSchemaInfo;
+ this.schemaQuotaStatistics = schemaQuotaStatistics;
}
// ======================================================
@@ -807,6 +814,18 @@ public class ClusterSchemaManager {
return getConsensusManager().write(new DropSchemaTemplatePlan(templateName)).getStatus();
}
+ public long getSchemaQuotaCount() {
+ return schemaQuotaStatistics.getSchemaQuotaCount(getPartitionManager().getAllSchemaPartition());
+ }
+
+ public void updateSchemaQuota(Map<TConsensusGroupId, Long> schemaCountMap) {
+ schemaQuotaStatistics.updateCount(schemaCountMap);
+ }
+
+ public void clearSchemaQuotaCache() {
+ schemaQuotaStatistics.clear();
+ }
+
/**
* When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post
* the different NodeStatstics event to SyncManager and ClusterSchemaManager.
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
similarity index 50%
copy from confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
index 2af715eb1b..1eda15c8c3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
@@ -16,23 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.manager.schema;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-public class ClusterSchemaManagerTest {
+import javax.validation.constraints.NotNull;
- @Test
- public void testCalcMaxRegionGroupNum() {
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
- // The maxRegionGroupNum should be great or equal to the leastRegionGroupNum
- Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(100, 1.0, 3, 1, 3, 0));
+public class ClusterSchemaQuotaStatistics {
- // The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
- Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 6, 2, 3, 100));
+ private final Map<TConsensusGroupId, Long> countMap = new ConcurrentHashMap<>();
- // (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
- Assert.assertEquals(20, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 120, 2, 3, 5));
+ public void updateCount(@NotNull Map<TConsensusGroupId, Long> schemaCountMap) {
+ countMap.putAll(schemaCountMap);
+ }
+
+ public long getSchemaQuotaCount(Set<TConsensusGroupId> consensusGroupIdSet) {
+ return countMap.entrySet().stream()
+ .filter(i -> consensusGroupIdSet.contains(i.getKey()))
+ .mapToLong(Map.Entry::getValue)
+ .sum();
+ }
+
+ public void clear() {
+ countMap.clear();
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 882cee0b14..5fe9cb9162 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -109,6 +109,10 @@ public class DatabasePartitionTable {
replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId()));
}
+ public Set<TConsensusGroupId> getAllConsensusGroupId() {
+ return regionGroupMap.keySet();
+ }
+
/** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup */
public List<TRegionReplicaSet> getAllReplicaSets() {
List<TRegionReplicaSet> result = new ArrayList<>();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 70b0679e3c..2dd737ca63 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -694,6 +694,19 @@ public class PartitionInfo implements SnapshotProcessor {
return databasePartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type);
}
+ /**
+ * Only leader use this interface.
+ *
+ * @return TConsensusGroupId set of all schema region
+ */
+ public Set<TConsensusGroupId> getAllSchemaPartition() {
+ Set<TConsensusGroupId> schemaPartitionSet = new HashSet<>();
+ databasePartitionTables
+ .values()
+ .forEach(i -> schemaPartitionSet.addAll(i.getAllConsensusGroupId()));
+ return schemaPartitionSet;
+ }
+
@Override
public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 4bb874c4ba..32d6b62503 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
import org.apache.iotdb.confignode.exception.AddConsensusGroupException;
import org.apache.iotdb.confignode.exception.AddPeerException;
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
@@ -52,6 +51,7 @@ import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
@@ -120,7 +120,7 @@ public class ConfigNodeProcedureEnv {
* @param name database name
* @return tsStatus
*/
- public TSStatus deleteConfig(String name) {
+ public TSStatus deleteDatabaseConfig(String name) {
DeleteDatabasePlan deleteDatabasePlan = new DeleteDatabasePlan(name);
return getClusterSchemaManager().deleteDatabase(deleteDatabasePlan);
}
@@ -131,8 +131,9 @@ public class ConfigNodeProcedureEnv {
* @param preDeleteType execute/rollback
* @param deleteSgName database name
*/
- public void preDelete(PreDeleteDatabasePlan.PreDeleteType preDeleteType, String deleteSgName) {
- getPartitionManager().preDeleteStorageGroup(deleteSgName, preDeleteType);
+ public void preDeleteDatabase(
+ PreDeleteDatabasePlan.PreDeleteType preDeleteType, String deleteSgName) {
+ getPartitionManager().preDeleteDatabase(deleteSgName, preDeleteType);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index b2beaf2f35..11c5aa0c79 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -91,7 +91,7 @@ public class DeleteDatabaseProcedure
case PRE_DELETE_DATABASE:
LOG.info(
"[DeleteDatabaseProcedure] Pre delete database: {}", deleteDatabaseSchema.getName());
- env.preDelete(
+ env.preDeleteDatabase(
PreDeleteDatabasePlan.PreDeleteType.EXECUTE, deleteDatabaseSchema.getName());
setNextState(DeleteStorageGroupState.INVALIDATE_CACHE);
break;
@@ -150,7 +150,8 @@ public class DeleteDatabaseProcedure
}
// Delete DatabasePartitionTable
- final TSStatus deleteConfigResult = env.deleteConfig(deleteDatabaseSchema.getName());
+ final TSStatus deleteConfigResult =
+ env.deleteDatabaseConfig(deleteDatabaseSchema.getName());
// Delete Database metrics
PartitionMetrics.unbindDatabasePartitionMetrics(deleteDatabaseSchema.getName());
@@ -237,7 +238,8 @@ public class DeleteDatabaseProcedure
case INVALIDATE_CACHE:
LOG.info(
"[DeleteDatabaseProcedure] Rollback to preDeleted: {}", deleteDatabaseSchema.getName());
- env.preDelete(PreDeleteDatabasePlan.PreDeleteType.ROLLBACK, deleteDatabaseSchema.getName());
+ env.preDeleteDatabase(
+ PreDeleteDatabasePlan.PreDeleteType.ROLLBACK, deleteDatabaseSchema.getName());
break;
default:
break;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
index 2af715eb1b..261f60ed65 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.confignode.manager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
+
import org.junit.Assert;
import org.junit.Test;
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index fe9f3a6767..01948edb75 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -340,4 +340,16 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig {
setProperty("write_memory_proportion", writeMemoryProportion);
return this;
}
+
+ @Override
+ public CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+ setProperty("cluster_schema_limit_level", clusterSchemaLimitLevel);
+ return this;
+ }
+
+ @Override
+ public CommonConfig setClusterMaxSchemaCount(long clusterMaxSchemaCount) {
+ setProperty("cluster_max_schema_count", String.valueOf(clusterMaxSchemaCount));
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index 275ed1f8f8..49eade65d9 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -350,4 +350,18 @@ public class MppSharedCommonConfig implements CommonConfig {
cnConfig.setWriteMemoryProportion(writeMemoryProportion);
return this;
}
+
+ @Override
+ public CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+ dnConfig.setClusterSchemaLimitLevel(clusterSchemaLimitLevel);
+ cnConfig.setClusterSchemaLimitLevel(clusterSchemaLimitLevel);
+ return this;
+ }
+
+ @Override
+ public CommonConfig setClusterMaxSchemaCount(long clusterMaxSchemaCount) {
+ dnConfig.setClusterMaxSchemaCount(clusterMaxSchemaCount);
+ cnConfig.setClusterMaxSchemaCount(clusterMaxSchemaCount);
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index 526df5dbab..2a169c74e2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -251,4 +251,14 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) {
return this;
}
+
+ @Override
+ public CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+ return this;
+ }
+
+ @Override
+ public CommonConfig setClusterMaxSchemaCount(long clusterMaxSchemaCount) {
+ return this;
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 4792b8b3e7..9c7c4f60e0 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -112,4 +112,8 @@ public interface CommonConfig {
CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate);
CommonConfig setWriteMemoryProportion(String writeMemoryProportion);
+
+ CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel);
+
+ CommonConfig setClusterMaxSchemaCount(long clusterMaxSchemaCount);
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
similarity index 53%
copy from confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
copy to integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
index 2af715eb1b..55e9767f23 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
@@ -16,23 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.db.it.schema;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.iotdb.it.env.EnvFactory;
-public class ClusterSchemaManagerTest {
-
- @Test
- public void testCalcMaxRegionGroupNum() {
-
- // The maxRegionGroupNum should be great or equal to the leastRegionGroupNum
- Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(100, 1.0, 3, 1, 3, 0));
-
- // The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
- Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 6, 2, 3, 100));
+public class IoTDBClusterDeviceQuotaIT extends IoTDBClusterMeasurementQuotaIT {
+ public IoTDBClusterDeviceQuotaIT(SchemaTestMode schemaTestMode) {
+ super(schemaTestMode);
+ }
- // (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
- Assert.assertEquals(20, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 120, 2, 3, 5));
+ @Override
+ protected void setUpQuotaConfig() {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitLevel("DEVICE");
+ EnvFactory.getEnv().getConfig().getCommonConfig().setClusterMaxSchemaCount(3);
}
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
new file mode 100644
index 0000000000..4481dedc1d
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.schema;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class IoTDBClusterMeasurementQuotaIT extends AbstractSchemaIT {
+ public IoTDBClusterMeasurementQuotaIT(SchemaTestMode schemaTestMode) {
+ super(schemaTestMode);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ setUpQuotaConfig();
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareTimeSeries();
+ Thread.sleep(2000); // wait heartbeat
+ }
+
+ protected void setUpQuotaConfig() {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitLevel("MEASUREMENT");
+ EnvFactory.getEnv().getConfig().getCommonConfig().setClusterMaxSchemaCount(6);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ super.tearDown();
+ }
+
+ /** Prepare time series: 2 databases, 3 devices, 6 time series */
+ private void prepareTimeSeries() throws SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ // create database
+ statement.execute("CREATE DATABASE root.sg1");
+ statement.execute("CREATE DATABASE root.sg2");
+ // create schema template
+ statement.execute("CREATE SCHEMA TEMPLATE t1 (s1 INT64, s2 DOUBLE)");
+ // set schema template
+ statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg2");
+ statement.execute(
+ "create timeseries root.sg1.d0.s0 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+ statement.execute(
+ "create timeseries root.sg1.d0.s1 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+ statement.execute(
+ "create timeseries root.sg1.d1.s0 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+ statement.execute(
+ "create timeseries root.sg1.d1.s1 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+ statement.execute("create timeseries of schema template on root.sg2.d1;");
+ }
+ }
+
+ @Test
+ public void testClusterSchemaQuota() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try {
+ statement.execute(
+ "create timeseries root.sg1.d3.s0 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains("The current metadata capacity has exceeded the cluster quota"));
+ }
+ try {
+ statement.execute("create timeseries of schema template on root.sg2.d2;");
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains("The current metadata capacity has exceeded the cluster quota"));
+ }
+ // delete some timeseries and database
+ statement.execute("delete database root.sg2;");
+ statement.execute("delete timeseries root.sg1.d0.s0;");
+ Thread.sleep(2000); // wait heartbeat
+ // now we can create 3 new timeseries or 1 new device
+ statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg1.d4");
+ statement.execute("create timeseries of schema template on root.sg1.d4");
+ statement.execute(
+ "create timeseries root.sg1.d4.s3 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+ Thread.sleep(2000); // wait heartbeat
+ try {
+ statement.execute(
+ "create timeseries root.sg1.d3.s0 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains("The current metadata capacity has exceeded the cluster quota"));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 0a61caa03c..934f81e736 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -288,6 +288,20 @@ cluster_name=defaultCluster
# which exceeds this num, will be split to several plans with timeseries no more than this num.
# max_measurement_num_of_internal_request=10000
+# This configuration parameter sets the level at which the time series limit is applied.
+# There are two available levels: 'DEVICE' and 'MEASUREMENT'.
+# 'DEVICE': The limit is applied to the number of devices in the cluster.
+# 'MEASUREMENT': The limit is applied to the number of measurements in the cluster.
+# Set the value to either 'device' or 'measurement' based on your desired control level.
+# cluster_schema_limit_level=MEASUREMENT
+
+# This configuration parameter sets the maximum number of schema allowed in the cluster.
+# The value should be a positive integer representing the desired threshold.
+# When the threshold is reached, users will be prohibited from creating new time series.
+# Set the value based on the desired maximum number of schema for your IoTDB cluster.
+# -1 means the system does not impose a limit on the maximum number of time series.
+# cluster_max_schema_count=-1
+
####################
### Configurations for creating schema automatically
####################
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/node-commons/src/main/java/org/apache/iotdb/commons/schema/ClusterSchemaQuotaLevel.java
similarity index 53%
copy from confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/schema/ClusterSchemaQuotaLevel.java
index 2af715eb1b..1410f1a1c3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/schema/ClusterSchemaQuotaLevel.java
@@ -16,23 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.commons.schema;
-import org.junit.Assert;
-import org.junit.Test;
+public enum ClusterSchemaQuotaLevel {
+ MEASUREMENT(0),
+ DEVICE(1);
-public class ClusterSchemaManagerTest {
+ private final int code;
- @Test
- public void testCalcMaxRegionGroupNum() {
-
- // The maxRegionGroupNum should be great or equal to the leastRegionGroupNum
- Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(100, 1.0, 3, 1, 3, 0));
-
- // The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
- Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 6, 2, 3, 100));
+ ClusterSchemaQuotaLevel(int code) {
+ this.code = code;
+ }
- // (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
- Assert.assertEquals(20, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 120, 2, 3, 5));
+ public int getCode() {
+ return code;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 38bc963235..8bd7128679 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1034,6 +1034,12 @@ public class IoTDBConfig {
/** whether to enable the audit log * */
private boolean enableAuditLog = false;
+ /** This configuration parameter sets the level at which the time series limit is applied.* */
+ private String clusterSchemaLimitLevel = "MEASUREMENT";
+
+ /** This configuration parameter sets the maximum number of schema allowed in the cluster.* */
+ private long clusterMaxSchemaCount = -1;
+
/** Output location of audit logs * */
private List<AuditLogStorage> auditLogStorage =
Arrays.asList(AuditLogStorage.IOTDB, AuditLogStorage.LOGGER);
@@ -3627,4 +3633,20 @@ public class IoTDBConfig {
public void setEnableAuditLogForNativeInsertApi(boolean enableAuditLogForNativeInsertApi) {
this.enableAuditLogForNativeInsertApi = enableAuditLogForNativeInsertApi;
}
+
+ public String getClusterSchemaLimitLevel() {
+ return clusterSchemaLimitLevel;
+ }
+
+ public void setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+ this.clusterSchemaLimitLevel = clusterSchemaLimitLevel;
+ }
+
+ public long getClusterMaxSchemaCount() {
+ return clusterMaxSchemaCount;
+ }
+
+ public void setClusterMaxSchemaCount(long clusterMaxSchemaCount) {
+ this.clusterMaxSchemaCount = clusterMaxSchemaCount;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 77378e7f72..469dd66c15 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.compaction.selector.constant.CrossCompactionSe
import org.apache.iotdb.db.engine.compaction.selector.constant.InnerSequenceCompactionSelector;
import org.apache.iotdb.db.engine.compaction.selector.constant.InnerUnsequenceCompactionSelector;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -199,7 +200,16 @@ public class IoTDBDescriptor {
}
public void loadProperties(Properties properties) {
-
+ conf.setClusterSchemaLimitLevel(
+ properties
+ .getProperty("cluster_schema_limit_level", conf.getClusterSchemaLimitLevel())
+ .trim());
+ conf.setClusterMaxSchemaCount(
+ Long.parseLong(
+ properties
+ .getProperty(
+ "cluster_max_schema_count", Long.toString(conf.getClusterMaxSchemaCount()))
+ .trim()));
conf.setClusterName(
properties.getProperty(IoTDBConstant.CLUSTER_NAME, conf.getClusterName()).trim());
@@ -1468,6 +1478,19 @@ public class IoTDBDescriptor {
if (prevDeleteWalFilesPeriodInMs != conf.getDeleteWalFilesPeriodInMs()) {
WALManager.getInstance().rebootWALDeleteThread();
}
+
+ // update schema quota configuration
+ conf.setClusterSchemaLimitLevel(
+ properties
+ .getProperty("cluster_schema_limit_level", conf.getClusterSchemaLimitLevel())
+ .trim());
+ conf.setClusterMaxSchemaCount(
+ Long.parseLong(
+ properties
+ .getProperty(
+ "cluster_max_schema_count", Long.toString(conf.getClusterMaxSchemaCount()))
+ .trim()));
+ DataNodeSchemaQuotaManager.getInstance().updateConfiguration();
} catch (Exception e) {
throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
similarity index 52%
copy from confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
copy to server/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
index 2af715eb1b..fa152a1bb4 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
@@ -16,23 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.db.exception.metadata;
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
+import org.apache.iotdb.rpc.TSStatusCode;
-public class ClusterSchemaManagerTest {
-
- @Test
- public void testCalcMaxRegionGroupNum() {
-
- // The maxRegionGroupNum should be great or equal to the leastRegionGroupNum
- Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(100, 1.0, 3, 1, 3, 0));
-
- // The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
- Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 6, 2, 3, 100));
-
- // (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
- Assert.assertEquals(20, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 120, 2, 3, 5));
+public class SchemaQuotaExceededException extends MetadataException {
+ public SchemaQuotaExceededException(ClusterSchemaQuotaLevel level, long limit) {
+ super(
+ String.format(
+ "The current metadata capacity has exceeded the cluster quota. The cluster quota is set at the %s level, with a limit number of %d. Please review your configuration or delete some existing time series to comply with the quota.",
+ level.toString(), limit),
+ TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index d340268ca2..1ce8393868 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -840,6 +841,8 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
throw new DifferentTemplateException(activatePath.getFullPath(), template.getName());
}
}
+ DataNodeSchemaQuotaManager.getInstance()
+ .checkMeasurementLevel(template.getMeasurementNumber());
if (cur.isEntity()) {
entityMNode = cur.getAsEntityMNode();
@@ -895,6 +898,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
entityMNode.setSchemaTemplateId(templateId);
store.updateMNode(entityMNode);
+ regionStatistics.activateTemplate(templateId);
} finally {
unPinPath(cur);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 465468e24b..f77debe0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
@@ -710,6 +711,8 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
throw new DifferentTemplateException(activatePath.getFullPath(), template.getName());
}
}
+ DataNodeSchemaQuotaManager.getInstance()
+ .checkMeasurementLevel(template.getMeasurementNumber());
if (cur.isEntity()) {
entityMNode = cur.getAsEntityMNode();
@@ -801,7 +804,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
}
public void activateTemplateWithoutCheck(
- PartialPath activatePath, int templateId, boolean isAligned) {
+ PartialPath activatePath, int templateId, boolean isAligned) throws MetadataException {
String[] nodes = activatePath.getNodes();
IMNode cur = storageGroupMNode;
for (int i = levelOfSG + 1; i < nodes.length; i++) {
@@ -823,6 +826,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
}
entityMNode.setUseTemplate(true);
entityMNode.setSchemaTemplateId(templateId);
+ regionStatistics.activateTemplate(templateId);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
index 93c9238433..0a329b3cc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.metadata.mtree.store;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.exception.metadata.cache.MNodeNotCachedException;
import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
@@ -38,6 +39,7 @@ import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManager;
import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.ISchemaFile;
import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFile;
import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.slf4j.Logger;
@@ -62,7 +64,7 @@ public class CachedMTreeStore implements IMTreeStore {
private IMNode root;
private final Runnable flushCallback;
private final CachedSchemaRegionStatistics regionStatistics;
-
+ private final DataNodeSchemaQuotaManager quotaManager = DataNodeSchemaQuotaManager.getInstance();
private final StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
public CachedMTreeStore(
@@ -325,7 +327,8 @@ public class CachedMTreeStore implements IMTreeStore {
}
@Override
- public IEntityMNode setToEntity(IMNode node) {
+ public IEntityMNode setToEntity(IMNode node) throws SchemaQuotaExceededException {
+ quotaManager.checkDeviceLevel();
IEntityMNode result = MNodeUtils.setToEntity(node);
if (result != node) {
regionStatistics.addDevice();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
index 3ca77d321d..fd5dedbddb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
import org.apache.iotdb.db.metadata.mnode.IMNode;
@@ -36,6 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.iterator.IMNodeIterator;
import org.apache.iotdb.db.metadata.mnode.iterator.MNodeIterator;
import org.apache.iotdb.db.metadata.mnode.iterator.MemoryTraverserIterator;
import org.apache.iotdb.db.metadata.mtree.snapshot.MemMTreeSnapshotUtil;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.template.Template;
@@ -48,6 +50,8 @@ import java.util.function.Consumer;
public class MemMTreeStore implements IMTreeStore {
private final IMNodeSizeEstimator estimator = new BasicMNodSizeEstimator();
+ private final DataNodeSchemaQuotaManager schemaQuotaManager =
+ DataNodeSchemaQuotaManager.getInstance();
private MemSchemaRegionStatistics regionStatistics;
private IMNode root;
@@ -154,7 +158,8 @@ public class MemMTreeStore implements IMTreeStore {
public void updateMNode(IMNode node) {}
@Override
- public IEntityMNode setToEntity(IMNode node) {
+ public IEntityMNode setToEntity(IMNode node) throws SchemaQuotaExceededException {
+ schemaQuotaManager.checkDeviceLevel();
IEntityMNode result = MNodeUtils.setToEntity(node);
if (result != node) {
regionStatistics.addDevice();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/DataNodeSchemaQuotaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/DataNodeSchemaQuotaManager.java
new file mode 100644
index 0000000000..936cf8c5f6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/DataNodeSchemaQuotaManager.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.rescon;
+
+import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class DataNodeSchemaQuotaManager {
+
+ private ClusterSchemaQuotaLevel level =
+ ClusterSchemaQuotaLevel.valueOf(
+ IoTDBDescriptor.getInstance().getConfig().getClusterSchemaLimitLevel().toUpperCase());
+ private long limit =
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getClusterMaxSchemaCount(); // -1 means no limitation
+ private final AtomicLong remain = new AtomicLong(0);
+
+ public void updateRemain(long totalCount) {
+ this.remain.getAndSet(limit - totalCount);
+ }
+
+ public void checkMeasurementLevel(int acquireNumber) throws SchemaQuotaExceededException {
+ if (limit > 0 && level.equals(ClusterSchemaQuotaLevel.MEASUREMENT)) {
+ if (remain.get() <= 0) {
+ throw new SchemaQuotaExceededException(level, limit);
+ } else {
+ remain.addAndGet(-acquireNumber);
+ }
+ }
+ }
+
+ public void checkDeviceLevel() throws SchemaQuotaExceededException {
+ if (limit > 0 && level.equals(ClusterSchemaQuotaLevel.DEVICE)) {
+ if (remain.get() <= 0) {
+ throw new SchemaQuotaExceededException(level, limit);
+ } else {
+ remain.addAndGet(-1L);
+ }
+ }
+ }
+
+ public void updateConfiguration() {
+ this.level =
+ ClusterSchemaQuotaLevel.valueOf(
+ IoTDBDescriptor.getInstance().getConfig().getClusterSchemaLimitLevel());
+ long oldLimit = limit;
+ this.limit = IoTDBDescriptor.getInstance().getConfig().getClusterMaxSchemaCount();
+ this.remain.addAndGet(limit - oldLimit);
+ }
+
+ public ClusterSchemaQuotaLevel getLevel() {
+ return level;
+ }
+
+ public long getLimit() {
+ return limit;
+ }
+
+ private DataNodeSchemaQuotaManager() {}
+
+ public static DataNodeSchemaQuotaManager getInstance() {
+ return DataNodeSchemaQuotaManager.DataNodeSchemaQuotaManagerHolder.INSTANCE;
+ }
+
+ private static class DataNodeSchemaQuotaManagerHolder {
+ private static final DataNodeSchemaQuotaManager INSTANCE = new DataNodeSchemaQuotaManager();
+
+ private DataNodeSchemaQuotaManagerHolder() {
+ // empty constructor
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 20c90e66d1..e1b56aa4cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.schemaregion;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
@@ -30,8 +31,10 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.metadata.metric.SchemaMetricManager;
import org.apache.iotdb.db.metadata.rescon.CachedSchemaEngineStatistics;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.rescon.ISchemaEngineStatistics;
import org.apache.iotdb.db.metadata.rescon.MemSchemaEngineStatistics;
import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
@@ -43,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
@@ -72,6 +76,9 @@ public class SchemaEngine {
private ISchemaEngineStatistics schemaEngineStatistics;
+ private final DataNodeSchemaQuotaManager schemaQuotaManager =
+ DataNodeSchemaQuotaManager.getInstance();
+
private static class SchemaEngineManagerHolder {
private static final SchemaEngine INSTANCE = new SchemaEngine();
@@ -343,6 +350,45 @@ public class SchemaEngine {
return schemaRegionMap == null ? 0 : schemaRegionMap.size();
}
+ /**
+ * Update total count in schema quota manager and generate local count map response.
+ *
+ * @param totalCount cluster schema usage
+ * @return if limit is -1, return null; else return schemaCountMap of the SchemaRegion whose
+ * current node is the leader
+ */
+ public Map<TConsensusGroupId, Long> updateAndGenerateSchemaCountMap(long totalCount) {
+ // update DataNodeSchemaQuotaManager
+ schemaQuotaManager.updateRemain(totalCount);
+ if (schemaQuotaManager.getLimit() < 0) {
+ return null;
+ }
+ Map<TConsensusGroupId, Long> res = new HashMap<>();
+ switch (schemaQuotaManager.getLevel()) {
+ case MEASUREMENT:
+ schemaRegionMap.values().stream()
+ .filter(i -> SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
+ .forEach(
+ i ->
+ res.put(
+ i.getSchemaRegionId().convertToTConsensusGroupId(),
+ i.getSchemaRegionStatistics().getSeriesNumber()));
+ break;
+ case DEVICE:
+ schemaRegionMap.values().stream()
+ .filter(i -> SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
+ .forEach(
+ i ->
+ res.put(
+ i.getSchemaRegionId().convertToTConsensusGroupId(),
+ i.getSchemaRegionStatistics().getDevicesNumber()));
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ return res;
+ }
+
@TestOnly
public ISchemaEngineStatistics getSchemaEngineStatistics() {
return schemaEngineStatistics;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index f27b857395..a5312d880a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
@@ -139,6 +140,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
private SchemaLogWriter<ISchemaRegionPlan> logWriter;
private final MemSchemaRegionStatistics regionStatistics;
+ private final DataNodeSchemaQuotaManager schemaQuotaManager =
+ DataNodeSchemaQuotaManager.getInstance();
private MTreeBelowSGMemoryImpl mtree;
private TagManager tagManager;
@@ -534,6 +537,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
if (seriesNumberMonitor != null && !seriesNumberMonitor.addTimeSeries(1)) {
throw new SeriesNumberOverflowException();
}
+ schemaQuotaManager.checkMeasurementLevel(1);
try {
IMeasurementMNode leafMNode;
@@ -614,6 +618,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
if (seriesNumberMonitor != null && !seriesNumberMonitor.addTimeSeries(seriesCount)) {
throw new SeriesNumberOverflowException();
}
+ schemaQuotaManager.checkMeasurementLevel(seriesCount);
try {
PartialPath prefixPath = plan.getDevicePath();
@@ -1074,7 +1079,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
}
}
- private void recoverActivatingSchemaTemplate(IActivateTemplateInClusterPlan plan) {
+ private void recoverActivatingSchemaTemplate(IActivateTemplateInClusterPlan plan)
+ throws MetadataException {
mtree.activateTemplateWithoutCheck(
plan.getActivatePath(), plan.getTemplateId(), plan.isAligned());
}
@@ -1263,8 +1269,12 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
public RecoverOperationResult visitActivateTemplateInCluster(
IActivateTemplateInClusterPlan activateTemplateInClusterPlan,
SchemaRegionMemoryImpl context) {
- recoverActivatingSchemaTemplate(activateTemplateInClusterPlan);
- return RecoverOperationResult.SUCCESS;
+ try {
+ recoverActivatingSchemaTemplate(activateTemplateInClusterPlan);
+ return RecoverOperationResult.SUCCESS;
+ } catch (MetadataException e) {
+ return new RecoverOperationResult(e);
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index cbca5643b1..98ecde5d4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
import org.apache.iotdb.db.metadata.tag.TagManager;
import org.apache.iotdb.db.metadata.template.Template;
@@ -148,6 +149,8 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
private MLogDescriptionWriter logDescriptionWriter;
private final CachedSchemaRegionStatistics regionStatistics;
+ private final DataNodeSchemaQuotaManager schemaQuotaManager =
+ DataNodeSchemaQuotaManager.getInstance();
private MTreeBelowSGCachedImpl mtree;
private TagManager tagManager;
@@ -602,6 +605,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(1)) {
throw new SeriesNumberOverflowException();
}
+ schemaQuotaManager.checkMeasurementLevel(1);
try {
PartialPath path = plan.getPath();
@@ -724,6 +728,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(seriesCount)) {
throw new SeriesNumberOverflowException();
}
+ schemaQuotaManager.checkMeasurementLevel(seriesCount);
try {
PartialPath prefixPath = plan.getDevicePath();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 49f7ff8a18..efe659f6e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -976,6 +976,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
resp.setLoadSample(loadSample);
}
+ // Update schema quota if necessary
+ Map<TConsensusGroupId, Long> schemaCountMap =
+ SchemaEngine.getInstance().updateAndGenerateSchemaCountMap(req.schemaQuotaCount);
+ if (schemaCountMap != null) {
+ resp.setSchemaCountMap(schemaCountMap);
+ }
+
resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
resp.setStatus(CommonDescriptor.getInstance().getConfig().getNodeStatus().getStatus());
if (CommonDescriptor.getInstance().getConfig().getStatusReason() != null) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 487e711012..f2a5aae7b4 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -77,6 +77,7 @@ public enum TSStatusCode {
SCHEMA_FILE_REDO_LOG_BROKEN(523),
TEMPLATE_NOT_ACTIVATED(524),
DATABASE_CONFIG_ERROR(525),
+ SCHEMA_QUOTA_EXCEEDED(502),
// Storage Engine
SYSTEM_READ_ONLY(600),
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index c64e42d51a..c80ae4452c 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -225,6 +225,7 @@ struct THeartbeatReq {
1: required i64 heartbeatTimestamp
2: required bool needJudgeLeader
3: required bool needSamplingLoad
+ 4: required i64 schemaQuotaCount
}
struct THeartbeatResp {
@@ -233,6 +234,7 @@ struct THeartbeatResp {
3: optional string statusReason
4: optional map<common.TConsensusGroupId, bool> judgedLeaders
5: optional TLoadSample loadSample
+ 6: optional map<common.TConsensusGroupId, i64> schemaCountMap
}
struct TLoadSample {