You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/04/06 06:55:02 UTC

[iotdb] branch rel/1.1 updated: [To rel/1.1][IOTDB-5719] Cluster-Wide Time Series Limit Control (#9430)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 879d6b180c [To rel/1.1][IOTDB-5719] Cluster-Wide Time Series Limit Control (#9430)
879d6b180c is described below

commit 879d6b180cb0739d6ed99224626708631cb623f0
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Thu Apr 6 14:54:56 2023 +0800

    [To rel/1.1][IOTDB-5719] Cluster-Wide Time Series Limit Control (#9430)
---
 .../heartbeat/DataNodeHeartbeatHandler.java        |   9 +-
 .../statemachine/ConfigRegionStateMachine.java     |   1 +
 .../iotdb/confignode/manager/ConfigManager.java    |   5 +-
 .../apache/iotdb/confignode/manager/IManager.java  |   1 +
 .../manager/load/balancer/RegionBalancer.java      |   2 +-
 .../iotdb/confignode/manager/node/NodeManager.java |   6 +-
 .../manager/partition/PartitionManager.java        |  17 ++-
 .../manager/partition/PartitionMetrics.java        |   2 +-
 .../manager/{ => schema}/ClusterSchemaManager.java |  23 +++-
 .../schema/ClusterSchemaQuotaStatistics.java}      |  33 ++++--
 .../partition/DatabasePartitionTable.java          |   4 +
 .../persistence/partition/PartitionInfo.java       |  13 +++
 .../procedure/env/ConfigNodeProcedureEnv.java      |   9 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |   8 +-
 .../manager/ClusterSchemaManagerTest.java          |   2 +
 .../iotdb/it/env/cluster/MppCommonConfig.java      |  12 ++
 .../it/env/cluster/MppSharedCommonConfig.java      |  14 +++
 .../iotdb/it/env/remote/RemoteCommonConfig.java    |  10 ++
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   4 +
 .../db/it/schema/IoTDBClusterDeviceQuotaIT.java    |  25 ++---
 .../it/schema/IoTDBClusterMeasurementQuotaIT.java  | 125 +++++++++++++++++++++
 .../resources/conf/iotdb-common.properties         |  14 +++
 .../commons/schema/ClusterSchemaQuotaLevel.java    |  24 ++--
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  22 ++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  25 ++++-
 .../metadata/SchemaQuotaExceededException.java     |  27 ++---
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |   4 +
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |   6 +-
 .../db/metadata/mtree/store/CachedMTreeStore.java  |   7 +-
 .../db/metadata/mtree/store/MemMTreeStore.java     |   7 +-
 .../rescon/DataNodeSchemaQuotaManager.java         |  92 +++++++++++++++
 .../db/metadata/schemaregion/SchemaEngine.java     |  46 ++++++++
 .../schemaregion/SchemaRegionMemoryImpl.java       |  16 ++-
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   5 +
 .../impl/DataNodeInternalRPCServiceImpl.java       |   7 ++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 thrift/src/main/thrift/datanode.thrift             |   2 +
 37 files changed, 546 insertions(+), 84 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 28e1d26254..181cacddf3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.async.AsyncMethodCallback;
 
 import java.util.Map;
+import java.util.function.Consumer;
 
 public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatResp> {
 
@@ -40,16 +41,19 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
   private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
   private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
   private final RouteBalancer routeBalancer;
+  private final Consumer<Map<TConsensusGroupId, Long>> schemaQuotaRespProcess;
 
   public DataNodeHeartbeatHandler(
       TDataNodeLocation dataNodeLocation,
       DataNodeHeartbeatCache dataNodeHeartbeatCache,
       Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap,
-      RouteBalancer routeBalancer) {
+      RouteBalancer routeBalancer,
+      Consumer<Map<TConsensusGroupId, Long>> schemaQuotaRespProcess) {
     this.dataNodeLocation = dataNodeLocation;
     this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
     this.regionGroupCacheMap = regionGroupCacheMap;
     this.routeBalancer = routeBalancer;
+    this.schemaQuotaRespProcess = schemaQuotaRespProcess;
   }
 
   @Override
@@ -82,6 +86,9 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
                         heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId()));
               }
             });
+    if (heartbeatResp.getSchemaCountMap() != null) {
+      schemaQuotaRespProcess.accept(heartbeatResp.getSchemaCountMap());
+    }
   }
 
   @Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 8555ff9a77..608e4b2297 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -236,6 +236,7 @@ public class ConfigRegionStateMachine
       configManager.getNodeManager().stopHeartbeatService();
       configManager.getPartitionManager().stopRegionCleaner();
       configManager.getCQManager().stopCQScheduler();
+      configManager.getClusterSchemaManager().clearSchemaQuotaCache();
     }
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6a4b8ddb15..48893cfef4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -89,6 +89,8 @@ import org.apache.iotdb.confignode.manager.node.NodeMetrics;
 import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaQuotaStatistics;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.persistence.ModelInfo;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
@@ -263,7 +265,8 @@ public class ConfigManager implements IManager {
 
     // Build the manager module
     this.nodeManager = new NodeManager(this, nodeInfo);
-    this.clusterSchemaManager = new ClusterSchemaManager(this, clusterSchemaInfo);
+    this.clusterSchemaManager =
+        new ClusterSchemaManager(this, clusterSchemaInfo, new ClusterSchemaQuotaStatistics());
     this.partitionManager = new PartitionManager(this, partitionInfo);
     this.permissionManager = new PermissionManager(this, authorInfo);
     this.procedureManager = new ProcedureManager(this, procedureInfo);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 8ace5e2f13..f85706230a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 882a5eeebe..198d70a59d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -27,13 +27,13 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.balancer.region.CopySetRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 
 import java.util.List;
 import java.util.Map;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index fb822a9abb..276b879d4d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -52,7 +52,6 @@ import org.apache.iotdb.confignode.consensus.response.datanode.ConfigurationResp
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeConfigurationResp;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
 import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.TriggerManager;
@@ -64,6 +63,7 @@ import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCac
 import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
 import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
@@ -722,6 +722,7 @@ public class NodeManager {
     heartbeatReq.setNeedJudgeLeader(true);
     // We sample DataNode's load in every 10 heartbeat loop
     heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
+    heartbeatReq.setSchemaQuotaCount(getClusterSchemaManager().getSchemaQuotaCount());
 
     /* Update heartbeat counter */
     heartbeatCounter.getAndUpdate((x) -> (x + 1) % 10);
@@ -745,7 +746,8 @@ public class NodeManager {
                       dataNodeInfo.getLocation().getDataNodeId(),
                       empty -> new DataNodeHeartbeatCache()),
               getPartitionManager().getRegionGroupCacheMap(),
-              getLoadManager().getRouteBalancer());
+              getLoadManager().getRouteBalancer(),
+              getClusterSchemaManager()::updateSchemaQuota);
       AsyncDataNodeHeartbeatClientPool.getInstance()
           .getDataNodeHeartBeat(
               dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 5e2ad788aa..79df2c353b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -63,12 +63,12 @@ import org.apache.iotdb.confignode.consensus.response.partition.SchemaPartitionR
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.exception.NoAvailableRegionGroupException;
 import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.ProcedureManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
 import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
@@ -712,6 +712,15 @@ public class PartitionManager {
     return result;
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * @return TConsensusGroupId set of all schema region
+   */
+  public Set<TConsensusGroupId> getAllSchemaPartition() {
+    return partitionInfo.getAllSchemaPartition();
+  }
+
   /**
    * Only leader use this interface
    *
@@ -735,10 +744,10 @@ public class PartitionManager {
     return schemaNodeManagementResp;
   }
 
-  public void preDeleteStorageGroup(
-      String storageGroup, PreDeleteDatabasePlan.PreDeleteType preDeleteType) {
+  public void preDeleteDatabase(
+      String database, PreDeleteDatabasePlan.PreDeleteType preDeleteType) {
     final PreDeleteDatabasePlan preDeleteDatabasePlan =
-        new PreDeleteDatabasePlan(storageGroup, preDeleteType);
+        new PreDeleteDatabasePlan(database, preDeleteType);
     getConsensusManager().write(preDeleteDatabasePlan);
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
index 21909d6a5a..87a748e47f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
@@ -27,10 +27,10 @@ import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
 import org.apache.iotdb.metrics.utils.MetricLevel;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
similarity index 97%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index e6c0529255..15036c5bfa 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.manager.schema;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -61,6 +62,7 @@ import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInf
 import org.apache.iotdb.confignode.consensus.response.template.TemplateInfoResp;
 import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoResp;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
+import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
@@ -107,10 +109,15 @@ public class ClusterSchemaManager {
 
   private final IManager configManager;
   private final ClusterSchemaInfo clusterSchemaInfo;
+  private final ClusterSchemaQuotaStatistics schemaQuotaStatistics;
 
-  public ClusterSchemaManager(IManager configManager, ClusterSchemaInfo clusterSchemaInfo) {
+  public ClusterSchemaManager(
+      IManager configManager,
+      ClusterSchemaInfo clusterSchemaInfo,
+      ClusterSchemaQuotaStatistics schemaQuotaStatistics) {
     this.configManager = configManager;
     this.clusterSchemaInfo = clusterSchemaInfo;
+    this.schemaQuotaStatistics = schemaQuotaStatistics;
   }
 
   // ======================================================
@@ -807,6 +814,18 @@ public class ClusterSchemaManager {
     return getConsensusManager().write(new DropSchemaTemplatePlan(templateName)).getStatus();
   }
 
+  public long getSchemaQuotaCount() {
+    return schemaQuotaStatistics.getSchemaQuotaCount(getPartitionManager().getAllSchemaPartition());
+  }
+
+  public void updateSchemaQuota(Map<TConsensusGroupId, Long> schemaCountMap) {
+    schemaQuotaStatistics.updateCount(schemaCountMap);
+  }
+
+  public void clearSchemaQuotaCache() {
+    schemaQuotaStatistics.clear();
+  }
+
   /**
    * When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post
    * the different NodeStatstics event to SyncManager and ClusterSchemaManager.
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
similarity index 50%
copy from confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
index 2af715eb1b..1eda15c8c3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaQuotaStatistics.java
@@ -16,23 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.confignode.manager.schema;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 
-public class ClusterSchemaManagerTest {
+import javax.validation.constraints.NotNull;
 
-  @Test
-  public void testCalcMaxRegionGroupNum() {
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
-    // The maxRegionGroupNum should be great or equal to the leastRegionGroupNum
-    Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(100, 1.0, 3, 1, 3, 0));
+public class ClusterSchemaQuotaStatistics {
 
-    // The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
-    Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 6, 2, 3, 100));
+  private final Map<TConsensusGroupId, Long> countMap = new ConcurrentHashMap<>();
 
-    // (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
-    Assert.assertEquals(20, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 120, 2, 3, 5));
+  public void updateCount(@NotNull Map<TConsensusGroupId, Long> schemaCountMap) {
+    countMap.putAll(schemaCountMap);
+  }
+
+  public long getSchemaQuotaCount(Set<TConsensusGroupId> consensusGroupIdSet) {
+    return countMap.entrySet().stream()
+        .filter(i -> consensusGroupIdSet.contains(i.getKey()))
+        .mapToLong(Map.Entry::getValue)
+        .sum();
+  }
+
+  public void clear() {
+    countMap.clear();
   }
 }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index 882cee0b14..5fe9cb9162 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -109,6 +109,10 @@ public class DatabasePartitionTable {
     replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId()));
   }
 
+  public Set<TConsensusGroupId> getAllConsensusGroupId() {
+    return regionGroupMap.keySet();
+  }
+
   /** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup */
   public List<TRegionReplicaSet> getAllReplicaSets() {
     List<TRegionReplicaSet> result = new ArrayList<>();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 70b0679e3c..2dd737ca63 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -694,6 +694,19 @@ public class PartitionInfo implements SnapshotProcessor {
     return databasePartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type);
   }
 
+  /**
+   * Only leader use this interface.
+   *
+   * @return TConsensusGroupId set of all schema region
+   */
+  public Set<TConsensusGroupId> getAllSchemaPartition() {
+    Set<TConsensusGroupId> schemaPartitionSet = new HashSet<>();
+    databasePartitionTables
+        .values()
+        .forEach(i -> schemaPartitionSet.addAll(i.getAllConsensusGroupId()));
+    return schemaPartitionSet;
+  }
+
   @Override
   public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 4bb874c4ba..32d6b62503 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -43,7 +43,6 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr
 import org.apache.iotdb.confignode.exception.AddConsensusGroupException;
 import org.apache.iotdb.confignode.exception.AddPeerException;
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
-import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
@@ -52,6 +51,7 @@ import org.apache.iotdb.confignode.manager.node.heartbeat.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionGroupCache;
 import org.apache.iotdb.confignode.manager.partition.heartbeat.RegionHeartbeatSample;
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
@@ -120,7 +120,7 @@ public class ConfigNodeProcedureEnv {
    * @param name database name
    * @return tsStatus
    */
-  public TSStatus deleteConfig(String name) {
+  public TSStatus deleteDatabaseConfig(String name) {
     DeleteDatabasePlan deleteDatabasePlan = new DeleteDatabasePlan(name);
     return getClusterSchemaManager().deleteDatabase(deleteDatabasePlan);
   }
@@ -131,8 +131,9 @@ public class ConfigNodeProcedureEnv {
    * @param preDeleteType execute/rollback
    * @param deleteSgName database name
    */
-  public void preDelete(PreDeleteDatabasePlan.PreDeleteType preDeleteType, String deleteSgName) {
-    getPartitionManager().preDeleteStorageGroup(deleteSgName, preDeleteType);
+  public void preDeleteDatabase(
+      PreDeleteDatabasePlan.PreDeleteType preDeleteType, String deleteSgName) {
+    getPartitionManager().preDeleteDatabase(deleteSgName, preDeleteType);
   }
 
   /**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index b2beaf2f35..11c5aa0c79 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -91,7 +91,7 @@ public class DeleteDatabaseProcedure
         case PRE_DELETE_DATABASE:
           LOG.info(
               "[DeleteDatabaseProcedure] Pre delete database: {}", deleteDatabaseSchema.getName());
-          env.preDelete(
+          env.preDeleteDatabase(
               PreDeleteDatabasePlan.PreDeleteType.EXECUTE, deleteDatabaseSchema.getName());
           setNextState(DeleteStorageGroupState.INVALIDATE_CACHE);
           break;
@@ -150,7 +150,8 @@ public class DeleteDatabaseProcedure
           }
 
           // Delete DatabasePartitionTable
-          final TSStatus deleteConfigResult = env.deleteConfig(deleteDatabaseSchema.getName());
+          final TSStatus deleteConfigResult =
+              env.deleteDatabaseConfig(deleteDatabaseSchema.getName());
 
           // Delete Database metrics
           PartitionMetrics.unbindDatabasePartitionMetrics(deleteDatabaseSchema.getName());
@@ -237,7 +238,8 @@ public class DeleteDatabaseProcedure
       case INVALIDATE_CACHE:
         LOG.info(
             "[DeleteDatabaseProcedure] Rollback to preDeleted: {}", deleteDatabaseSchema.getName());
-        env.preDelete(PreDeleteDatabasePlan.PreDeleteType.ROLLBACK, deleteDatabaseSchema.getName());
+        env.preDeleteDatabase(
+            PreDeleteDatabasePlan.PreDeleteType.ROLLBACK, deleteDatabaseSchema.getName());
         break;
       default:
         break;
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
index 2af715eb1b..261f60ed65 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.confignode.manager;
 
+import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
+
 import org.junit.Assert;
 import org.junit.Test;
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
index fe9f3a6767..01948edb75 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppCommonConfig.java
@@ -340,4 +340,16 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig {
     setProperty("write_memory_proportion", writeMemoryProportion);
     return this;
   }
+
+  @Override
+  public CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+    setProperty("cluster_schema_limit_level", clusterSchemaLimitLevel);
+    return this;
+  }
+
+  @Override
+  public CommonConfig setClusterMaxSchemaCount(long clusterMaxSchemaCount) {
+    setProperty("cluster_max_schema_count", String.valueOf(clusterMaxSchemaCount));
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
index 275ed1f8f8..49eade65d9 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/MppSharedCommonConfig.java
@@ -350,4 +350,18 @@ public class MppSharedCommonConfig implements CommonConfig {
     cnConfig.setWriteMemoryProportion(writeMemoryProportion);
     return this;
   }
+
+  @Override
+  public CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+    dnConfig.setClusterSchemaLimitLevel(clusterSchemaLimitLevel);
+    cnConfig.setClusterSchemaLimitLevel(clusterSchemaLimitLevel);
+    return this;
+  }
+
+  @Override
+  public CommonConfig setClusterMaxSchemaCount(long clusterMaxSchemaCount) {
+    dnConfig.setClusterMaxSchemaCount(clusterMaxSchemaCount);
+    cnConfig.setClusterMaxSchemaCount(clusterMaxSchemaCount);
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
index 526df5dbab..2a169c74e2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/RemoteCommonConfig.java
@@ -251,4 +251,14 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) {
     return this;
   }
+
+  @Override
+  public CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+    return this;
+  }
+
+  @Override
+  public CommonConfig setClusterMaxSchemaCount(long clusterMaxSchemaCount) {
+    return this;
+  }
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 4792b8b3e7..9c7c4f60e0 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -112,4 +112,8 @@ public interface CommonConfig {
   CommonConfig setSchemaMemoryAllocate(String schemaMemoryAllocate);
 
   CommonConfig setWriteMemoryProportion(String writeMemoryProportion);
+
+  CommonConfig setClusterSchemaLimitLevel(String clusterSchemaLimitLevel);
+
+  CommonConfig setClusterMaxSchemaCount(long clusterMaxSchemaCount);
 }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
similarity index 53%
copy from confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
copy to integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
index 2af715eb1b..55e9767f23 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterDeviceQuotaIT.java
@@ -16,23 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.db.it.schema;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.iotdb.it.env.EnvFactory;
 
-public class ClusterSchemaManagerTest {
-
-  @Test
-  public void testCalcMaxRegionGroupNum() {
-
-    // The maxRegionGroupNum should be great or equal to the leastRegionGroupNum
-    Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(100, 1.0, 3, 1, 3, 0));
-
-    // The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
-    Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 6, 2, 3, 100));
+public class IoTDBClusterDeviceQuotaIT extends IoTDBClusterMeasurementQuotaIT {
+  public IoTDBClusterDeviceQuotaIT(SchemaTestMode schemaTestMode) {
+    super(schemaTestMode);
+  }
 
-    // (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
-    Assert.assertEquals(20, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 120, 2, 3, 5));
+  @Override
+  protected void setUpQuotaConfig() {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitLevel("DEVICE");
+    EnvFactory.getEnv().getConfig().getCommonConfig().setClusterMaxSchemaCount(3);
   }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
new file mode 100644
index 0000000000..4481dedc1d
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBClusterMeasurementQuotaIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.schema;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class IoTDBClusterMeasurementQuotaIT extends AbstractSchemaIT {
+  public IoTDBClusterMeasurementQuotaIT(SchemaTestMode schemaTestMode) {
+    super(schemaTestMode);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    setUpQuotaConfig();
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareTimeSeries();
+    Thread.sleep(2000); // wait heartbeat
+  }
+
+  protected void setUpQuotaConfig() {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setClusterSchemaLimitLevel("MEASUREMENT");
+    EnvFactory.getEnv().getConfig().getCommonConfig().setClusterMaxSchemaCount(6);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+    super.tearDown();
+  }
+
+  /** Prepare time series: 2 databases, 3 devices, 6 time series */
+  private void prepareTimeSeries() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      // create database
+      statement.execute("CREATE DATABASE root.sg1");
+      statement.execute("CREATE DATABASE root.sg2");
+      // create schema template
+      statement.execute("CREATE SCHEMA TEMPLATE t1 (s1 INT64, s2 DOUBLE)");
+      // set schema template
+      statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg2");
+      statement.execute(
+          "create timeseries root.sg1.d0.s0 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+      statement.execute(
+          "create timeseries root.sg1.d0.s1 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+      statement.execute(
+          "create timeseries root.sg1.d1.s0 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+      statement.execute(
+          "create timeseries root.sg1.d1.s1 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+      statement.execute("create timeseries of schema template on root.sg2.d1;");
+    }
+  }
+
+  @Test
+  public void testClusterSchemaQuota() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try {
+        statement.execute(
+            "create timeseries root.sg1.d3.s0 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+        Assert.fail();
+      } catch (Exception e) {
+        Assert.assertTrue(
+            e.getMessage()
+                .contains("The current metadata capacity has exceeded the cluster quota"));
+      }
+      try {
+        statement.execute("create timeseries of schema template on root.sg2.d2;");
+        Assert.fail();
+      } catch (Exception e) {
+        Assert.assertTrue(
+            e.getMessage()
+                .contains("The current metadata capacity has exceeded the cluster quota"));
+      }
+      // delete some timeseries and database
+      statement.execute("delete database root.sg2;");
+      statement.execute("delete timeseries root.sg1.d0.s0;");
+      Thread.sleep(2000); // wait heartbeat
+      // now we can create 3 new timeseries or 1 new device
+      statement.execute("SET SCHEMA TEMPLATE t1 TO root.sg1.d4");
+      statement.execute("create timeseries of schema template on root.sg1.d4");
+      statement.execute(
+          "create timeseries root.sg1.d4.s3 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+      Thread.sleep(2000); // wait heartbeat
+      try {
+        statement.execute(
+            "create timeseries root.sg1.d3.s0 with datatype=FLOAT, encoding=RLE, compression=SNAPPY");
+        Assert.fail();
+      } catch (Exception e) {
+        Assert.assertTrue(
+            e.getMessage()
+                .contains("The current metadata capacity has exceeded the cluster quota"));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+}
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 0a61caa03c..934f81e736 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -288,6 +288,20 @@ cluster_name=defaultCluster
 # which exceeds this num, will be split to several plans with timeseries no more than this num.
 # max_measurement_num_of_internal_request=10000
 
+# This configuration parameter sets the level at which the time series limit is applied.
+# There are two available levels: 'DEVICE' and 'MEASUREMENT'.
+# 'DEVICE': The limit is applied to the number of devices in the cluster.
+# 'MEASUREMENT': The limit is applied to the number of measurements in the cluster.
+# Set the value to either 'device' or 'measurement' based on your desired control level.
+# cluster_schema_limit_level=MEASUREMENT
+
+# This configuration parameter sets the maximum number of schema allowed in the cluster.
+# The value should be a positive integer representing the desired threshold.
+# When the threshold is reached, users will be prohibited from creating new time series.
+# Set the value based on the desired maximum number of schema for your IoTDB cluster.
+# -1 means the system does not impose a limit on the maximum number of time series.
+# cluster_max_schema_count=-1
+
 ####################
 ### Configurations for creating schema automatically
 ####################
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/node-commons/src/main/java/org/apache/iotdb/commons/schema/ClusterSchemaQuotaLevel.java
similarity index 53%
copy from confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/schema/ClusterSchemaQuotaLevel.java
index 2af715eb1b..1410f1a1c3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/schema/ClusterSchemaQuotaLevel.java
@@ -16,23 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.commons.schema;
 
-import org.junit.Assert;
-import org.junit.Test;
+public enum ClusterSchemaQuotaLevel {
+  MEASUREMENT(0),
+  DEVICE(1);
 
-public class ClusterSchemaManagerTest {
+  private final int code;
 
-  @Test
-  public void testCalcMaxRegionGroupNum() {
-
-    // The maxRegionGroupNum should be great or equal to the leastRegionGroupNum
-    Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(100, 1.0, 3, 1, 3, 0));
-
-    // The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
-    Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 6, 2, 3, 100));
+  ClusterSchemaQuotaLevel(int code) {
+    this.code = code;
+  }
 
-    // (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
-    Assert.assertEquals(20, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 120, 2, 3, 5));
+  public int getCode() {
+    return code;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 38bc963235..8bd7128679 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1034,6 +1034,12 @@ public class IoTDBConfig {
   /** whether to enable the audit log * */
   private boolean enableAuditLog = false;
 
+  /** This configuration parameter sets the level at which the time series limit is applied.* */
+  private String clusterSchemaLimitLevel = "MEASUREMENT";
+
+  /** This configuration parameter sets the maximum number of schema allowed in the cluster.* */
+  private long clusterMaxSchemaCount = -1;
+
   /** Output location of audit logs * */
   private List<AuditLogStorage> auditLogStorage =
       Arrays.asList(AuditLogStorage.IOTDB, AuditLogStorage.LOGGER);
@@ -3627,4 +3633,20 @@ public class IoTDBConfig {
   public void setEnableAuditLogForNativeInsertApi(boolean enableAuditLogForNativeInsertApi) {
     this.enableAuditLogForNativeInsertApi = enableAuditLogForNativeInsertApi;
   }
+
+  public String getClusterSchemaLimitLevel() {
+    return clusterSchemaLimitLevel;
+  }
+
+  public void setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+    this.clusterSchemaLimitLevel = clusterSchemaLimitLevel;
+  }
+
+  public long getClusterMaxSchemaCount() {
+    return clusterMaxSchemaCount;
+  }
+
+  public void setClusterMaxSchemaCount(long clusterMaxSchemaCount) {
+    this.clusterMaxSchemaCount = clusterMaxSchemaCount;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 77378e7f72..469dd66c15 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.compaction.selector.constant.CrossCompactionSe
 import org.apache.iotdb.db.engine.compaction.selector.constant.InnerSequenceCompactionSelector;
 import org.apache.iotdb.db.engine.compaction.selector.constant.InnerUnsequenceCompactionSelector;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
 import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -199,7 +200,16 @@ public class IoTDBDescriptor {
   }
 
   public void loadProperties(Properties properties) {
-
+    conf.setClusterSchemaLimitLevel(
+        properties
+            .getProperty("cluster_schema_limit_level", conf.getClusterSchemaLimitLevel())
+            .trim());
+    conf.setClusterMaxSchemaCount(
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "cluster_max_schema_count", Long.toString(conf.getClusterMaxSchemaCount()))
+                .trim()));
     conf.setClusterName(
         properties.getProperty(IoTDBConstant.CLUSTER_NAME, conf.getClusterName()).trim());
 
@@ -1468,6 +1478,19 @@ public class IoTDBDescriptor {
       if (prevDeleteWalFilesPeriodInMs != conf.getDeleteWalFilesPeriodInMs()) {
         WALManager.getInstance().rebootWALDeleteThread();
       }
+
+      // update schema quota configuration
+      conf.setClusterSchemaLimitLevel(
+          properties
+              .getProperty("cluster_schema_limit_level", conf.getClusterSchemaLimitLevel())
+              .trim());
+      conf.setClusterMaxSchemaCount(
+          Long.parseLong(
+              properties
+                  .getProperty(
+                      "cluster_max_schema_count", Long.toString(conf.getClusterMaxSchemaCount()))
+                  .trim()));
+      DataNodeSchemaQuotaManager.getInstance().updateConfiguration();
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload configuration because %s", e));
     }
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
similarity index 52%
copy from confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
copy to server/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
index 2af715eb1b..fa152a1bb4 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/ClusterSchemaManagerTest.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SchemaQuotaExceededException.java
@@ -16,23 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager;
+package org.apache.iotdb.db.exception.metadata;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
+import org.apache.iotdb.rpc.TSStatusCode;
 
-public class ClusterSchemaManagerTest {
-
-  @Test
-  public void testCalcMaxRegionGroupNum() {
-
-    // The maxRegionGroupNum should be great or equal to the leastRegionGroupNum
-    Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(100, 1.0, 3, 1, 3, 0));
-
-    // The maxRegionGroupNum should be great or equal to the allocatedRegionGroupCount
-    Assert.assertEquals(100, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 6, 2, 3, 100));
-
-    // (resourceWeight * resource) / (createdStorageGroupNum * replicationFactor)
-    Assert.assertEquals(20, ClusterSchemaManager.calcMaxRegionGroupNum(3, 1.0, 120, 2, 3, 5));
+public class SchemaQuotaExceededException extends MetadataException {
+  public SchemaQuotaExceededException(ClusterSchemaQuotaLevel level, long limit) {
+    super(
+        String.format(
+            "The current metadata capacity has exceeded the cluster quota. The cluster quota is set at the %s level, with a limit number of %d. Please review your configuration or delete some existing time series to comply with the quota.",
+            level.toString(), limit),
+        TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index d340268ca2..1ce8393868 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -61,6 +61,7 @@ import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
 import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -840,6 +841,8 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
             throw new DifferentTemplateException(activatePath.getFullPath(), template.getName());
           }
         }
+        DataNodeSchemaQuotaManager.getInstance()
+            .checkMeasurementLevel(template.getMeasurementNumber());
 
         if (cur.isEntity()) {
           entityMNode = cur.getAsEntityMNode();
@@ -895,6 +898,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
       entityMNode.setSchemaTemplateId(templateId);
 
       store.updateMNode(entityMNode);
+      regionStatistics.activateTemplate(templateId);
     } finally {
       unPinPath(cur);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 465468e24b..f77debe0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
@@ -710,6 +711,8 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
           throw new DifferentTemplateException(activatePath.getFullPath(), template.getName());
         }
       }
+      DataNodeSchemaQuotaManager.getInstance()
+          .checkMeasurementLevel(template.getMeasurementNumber());
 
       if (cur.isEntity()) {
         entityMNode = cur.getAsEntityMNode();
@@ -801,7 +804,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
   }
 
   public void activateTemplateWithoutCheck(
-      PartialPath activatePath, int templateId, boolean isAligned) {
+      PartialPath activatePath, int templateId, boolean isAligned) throws MetadataException {
     String[] nodes = activatePath.getNodes();
     IMNode cur = storageGroupMNode;
     for (int i = levelOfSG + 1; i < nodes.length; i++) {
@@ -823,6 +826,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
     }
     entityMNode.setUseTemplate(true);
     entityMNode.setSchemaTemplateId(templateId);
+    regionStatistics.activateTemplate(templateId);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
index 93c9238433..0a329b3cc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/CachedMTreeStore.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.metadata.mtree.store;
 
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.exception.metadata.cache.MNodeNotCachedException;
 import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
 import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
@@ -38,6 +39,7 @@ import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.MemManager;
 import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.ISchemaFile;
 import org.apache.iotdb.db.metadata.mtree.store.disk.schemafile.SchemaFile;
 import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.template.Template;
 
 import org.slf4j.Logger;
@@ -62,7 +64,7 @@ public class CachedMTreeStore implements IMTreeStore {
   private IMNode root;
   private final Runnable flushCallback;
   private final CachedSchemaRegionStatistics regionStatistics;
-
+  private final DataNodeSchemaQuotaManager quotaManager = DataNodeSchemaQuotaManager.getInstance();
   private final StampedWriterPreferredLock lock = new StampedWriterPreferredLock();
 
   public CachedMTreeStore(
@@ -325,7 +327,8 @@ public class CachedMTreeStore implements IMTreeStore {
   }
 
   @Override
-  public IEntityMNode setToEntity(IMNode node) {
+  public IEntityMNode setToEntity(IMNode node) throws SchemaQuotaExceededException {
+    quotaManager.checkDeviceLevel();
     IEntityMNode result = MNodeUtils.setToEntity(node);
     if (result != node) {
       regionStatistics.addDevice();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
index 3ca77d321d..fd5dedbddb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/MemMTreeStore.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
 import org.apache.iotdb.db.metadata.mnode.AboveDatabaseMNode;
 import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
@@ -36,6 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.iterator.IMNodeIterator;
 import org.apache.iotdb.db.metadata.mnode.iterator.MNodeIterator;
 import org.apache.iotdb.db.metadata.mnode.iterator.MemoryTraverserIterator;
 import org.apache.iotdb.db.metadata.mtree.snapshot.MemMTreeSnapshotUtil;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
 import org.apache.iotdb.db.metadata.template.Template;
 
@@ -48,6 +50,8 @@ import java.util.function.Consumer;
 public class MemMTreeStore implements IMTreeStore {
 
   private final IMNodeSizeEstimator estimator = new BasicMNodSizeEstimator();
+  private final DataNodeSchemaQuotaManager schemaQuotaManager =
+      DataNodeSchemaQuotaManager.getInstance();
   private MemSchemaRegionStatistics regionStatistics;
 
   private IMNode root;
@@ -154,7 +158,8 @@ public class MemMTreeStore implements IMTreeStore {
   public void updateMNode(IMNode node) {}
 
   @Override
-  public IEntityMNode setToEntity(IMNode node) {
+  public IEntityMNode setToEntity(IMNode node) throws SchemaQuotaExceededException {
+    schemaQuotaManager.checkDeviceLevel();
     IEntityMNode result = MNodeUtils.setToEntity(node);
     if (result != node) {
       regionStatistics.addDevice();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/DataNodeSchemaQuotaManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/DataNodeSchemaQuotaManager.java
new file mode 100644
index 0000000000..936cf8c5f6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/DataNodeSchemaQuotaManager.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.rescon;
+
+import org.apache.iotdb.commons.schema.ClusterSchemaQuotaLevel;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class DataNodeSchemaQuotaManager {
+
+  private ClusterSchemaQuotaLevel level =
+      ClusterSchemaQuotaLevel.valueOf(
+          IoTDBDescriptor.getInstance().getConfig().getClusterSchemaLimitLevel().toUpperCase());
+  private long limit =
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .getClusterMaxSchemaCount(); // -1 means no limitation
+  private final AtomicLong remain = new AtomicLong(0);
+
+  public void updateRemain(long totalCount) {
+    this.remain.getAndSet(limit - totalCount);
+  }
+
+  public void checkMeasurementLevel(int acquireNumber) throws SchemaQuotaExceededException {
+    if (limit > 0 && level.equals(ClusterSchemaQuotaLevel.MEASUREMENT)) {
+      if (remain.get() <= 0) {
+        throw new SchemaQuotaExceededException(level, limit);
+      } else {
+        remain.addAndGet(-acquireNumber);
+      }
+    }
+  }
+
+  public void checkDeviceLevel() throws SchemaQuotaExceededException {
+    if (limit > 0 && level.equals(ClusterSchemaQuotaLevel.DEVICE)) {
+      if (remain.get() <= 0) {
+        throw new SchemaQuotaExceededException(level, limit);
+      } else {
+        remain.addAndGet(-1L);
+      }
+    }
+  }
+
+  public void updateConfiguration() {
+    this.level =
+        ClusterSchemaQuotaLevel.valueOf(
+            IoTDBDescriptor.getInstance().getConfig().getClusterSchemaLimitLevel());
+    long oldLimit = limit;
+    this.limit = IoTDBDescriptor.getInstance().getConfig().getClusterMaxSchemaCount();
+    this.remain.addAndGet(limit - oldLimit);
+  }
+
+  public ClusterSchemaQuotaLevel getLevel() {
+    return level;
+  }
+
+  public long getLimit() {
+    return limit;
+  }
+
+  private DataNodeSchemaQuotaManager() {}
+
+  public static DataNodeSchemaQuotaManager getInstance() {
+    return DataNodeSchemaQuotaManager.DataNodeSchemaQuotaManagerHolder.INSTANCE;
+  }
+
+  private static class DataNodeSchemaQuotaManagerHolder {
+    private static final DataNodeSchemaQuotaManager INSTANCE = new DataNodeSchemaQuotaManager();
+
+    private DataNodeSchemaQuotaManagerHolder() {
+      // empty constructor
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 20c90e66d1..e1b56aa4cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.metadata.schemaregion;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
@@ -30,8 +31,10 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.metadata.metric.SchemaMetricManager;
 import org.apache.iotdb.db.metadata.rescon.CachedSchemaEngineStatistics;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.rescon.ISchemaEngineStatistics;
 import org.apache.iotdb.db.metadata.rescon.MemSchemaEngineStatistics;
 import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
@@ -43,6 +46,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
@@ -72,6 +76,9 @@ public class SchemaEngine {
 
   private ISchemaEngineStatistics schemaEngineStatistics;
 
+  private final DataNodeSchemaQuotaManager schemaQuotaManager =
+      DataNodeSchemaQuotaManager.getInstance();
+
   private static class SchemaEngineManagerHolder {
 
     private static final SchemaEngine INSTANCE = new SchemaEngine();
@@ -343,6 +350,45 @@ public class SchemaEngine {
     return schemaRegionMap == null ? 0 : schemaRegionMap.size();
   }
 
+  /**
+   * Update total count in schema quota manager and generate local count map response.
+   *
+   * @param totalCount cluster schema usage
+   * @return if limit is -1, return null; else return schemaCountMap of the SchemaRegion whose
+   *     current node is the leader
+   */
+  public Map<TConsensusGroupId, Long> updateAndGenerateSchemaCountMap(long totalCount) {
+    // update DataNodeSchemaQuotaManager
+    schemaQuotaManager.updateRemain(totalCount);
+    if (schemaQuotaManager.getLimit() < 0) {
+      return null;
+    }
+    Map<TConsensusGroupId, Long> res = new HashMap<>();
+    switch (schemaQuotaManager.getLevel()) {
+      case MEASUREMENT:
+        schemaRegionMap.values().stream()
+            .filter(i -> SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
+            .forEach(
+                i ->
+                    res.put(
+                        i.getSchemaRegionId().convertToTConsensusGroupId(),
+                        i.getSchemaRegionStatistics().getSeriesNumber()));
+        break;
+      case DEVICE:
+        schemaRegionMap.values().stream()
+            .filter(i -> SchemaRegionConsensusImpl.getInstance().isLeader(i.getSchemaRegionId()))
+            .forEach(
+                i ->
+                    res.put(
+                        i.getSchemaRegionId().convertToTConsensusGroupId(),
+                        i.getSchemaRegionStatistics().getDevicesNumber()));
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+    return res;
+  }
+
   @TestOnly
   public ISchemaEngineStatistics getSchemaEngineStatistics() {
     return schemaEngineStatistics;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index f27b857395..a5312d880a 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.metadata.query.info.IDeviceSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -139,6 +140,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   private SchemaLogWriter<ISchemaRegionPlan> logWriter;
 
   private final MemSchemaRegionStatistics regionStatistics;
+  private final DataNodeSchemaQuotaManager schemaQuotaManager =
+      DataNodeSchemaQuotaManager.getInstance();
 
   private MTreeBelowSGMemoryImpl mtree;
   private TagManager tagManager;
@@ -534,6 +537,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     if (seriesNumberMonitor != null && !seriesNumberMonitor.addTimeSeries(1)) {
       throw new SeriesNumberOverflowException();
     }
+    schemaQuotaManager.checkMeasurementLevel(1);
 
     try {
       IMeasurementMNode leafMNode;
@@ -614,6 +618,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     if (seriesNumberMonitor != null && !seriesNumberMonitor.addTimeSeries(seriesCount)) {
       throw new SeriesNumberOverflowException();
     }
+    schemaQuotaManager.checkMeasurementLevel(seriesCount);
 
     try {
       PartialPath prefixPath = plan.getDevicePath();
@@ -1074,7 +1079,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     }
   }
 
-  private void recoverActivatingSchemaTemplate(IActivateTemplateInClusterPlan plan) {
+  private void recoverActivatingSchemaTemplate(IActivateTemplateInClusterPlan plan)
+      throws MetadataException {
     mtree.activateTemplateWithoutCheck(
         plan.getActivatePath(), plan.getTemplateId(), plan.isAligned());
   }
@@ -1263,8 +1269,12 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     public RecoverOperationResult visitActivateTemplateInCluster(
         IActivateTemplateInClusterPlan activateTemplateInClusterPlan,
         SchemaRegionMemoryImpl context) {
-      recoverActivatingSchemaTemplate(activateTemplateInClusterPlan);
-      return RecoverOperationResult.SUCCESS;
+      try {
+        recoverActivatingSchemaTemplate(activateTemplateInClusterPlan);
+        return RecoverOperationResult.SUCCESS;
+      } catch (MetadataException e) {
+        return new RecoverOperationResult(e);
+      }
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index cbca5643b1..98ecde5d4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.db.metadata.query.info.INodeSchemaInfo;
 import org.apache.iotdb.db.metadata.query.info.ITimeSeriesSchemaInfo;
 import org.apache.iotdb.db.metadata.query.reader.ISchemaReader;
 import org.apache.iotdb.db.metadata.rescon.CachedSchemaRegionStatistics;
+import org.apache.iotdb.db.metadata.rescon.DataNodeSchemaQuotaManager;
 import org.apache.iotdb.db.metadata.rescon.MemSchemaRegionStatistics;
 import org.apache.iotdb.db.metadata.tag.TagManager;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -148,6 +149,8 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   private MLogDescriptionWriter logDescriptionWriter;
 
   private final CachedSchemaRegionStatistics regionStatistics;
+  private final DataNodeSchemaQuotaManager schemaQuotaManager =
+      DataNodeSchemaQuotaManager.getInstance();
 
   private MTreeBelowSGCachedImpl mtree;
   private TagManager tagManager;
@@ -602,6 +605,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(1)) {
       throw new SeriesNumberOverflowException();
     }
+    schemaQuotaManager.checkMeasurementLevel(1);
 
     try {
       PartialPath path = plan.getPath();
@@ -724,6 +728,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(seriesCount)) {
       throw new SeriesNumberOverflowException();
     }
+    schemaQuotaManager.checkMeasurementLevel(seriesCount);
 
     try {
       PartialPath prefixPath = plan.getDevicePath();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 49f7ff8a18..efe659f6e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -976,6 +976,13 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
       resp.setLoadSample(loadSample);
     }
 
+    // Update schema quota if necessary
+    Map<TConsensusGroupId, Long> schemaCountMap =
+        SchemaEngine.getInstance().updateAndGenerateSchemaCountMap(req.schemaQuotaCount);
+    if (schemaCountMap != null) {
+      resp.setSchemaCountMap(schemaCountMap);
+    }
+
     resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
     resp.setStatus(CommonDescriptor.getInstance().getConfig().getNodeStatus().getStatus());
     if (CommonDescriptor.getInstance().getConfig().getStatusReason() != null) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 487e711012..f2a5aae7b4 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -77,6 +77,7 @@ public enum TSStatusCode {
   SCHEMA_FILE_REDO_LOG_BROKEN(523),
   TEMPLATE_NOT_ACTIVATED(524),
   DATABASE_CONFIG_ERROR(525),
+  SCHEMA_QUOTA_EXCEEDED(502),
 
   // Storage Engine
   SYSTEM_READ_ONLY(600),
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index c64e42d51a..c80ae4452c 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -225,6 +225,7 @@ struct THeartbeatReq {
   1: required i64 heartbeatTimestamp
   2: required bool needJudgeLeader
   3: required bool needSamplingLoad
+  4: required i64 schemaQuotaCount
 }
 
 struct THeartbeatResp {
@@ -233,6 +234,7 @@ struct THeartbeatResp {
   3: optional string statusReason
   4: optional map<common.TConsensusGroupId, bool> judgedLeaders
   5: optional TLoadSample loadSample
+  6: optional map<common.TConsensusGroupId, i64> schemaCountMap
 }
 
 struct TLoadSample {