You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/01 08:17:24 UTC

[iotdb] branch master updated: [IOTDB-3020][IOTDB-3022]Multi-thread MLog recovery & Fix SchemaFile dirty page loses (#5695)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f953aa7bb [IOTDB-3020][IOTDB-3022]Multi-thread MLog recovery & Fix SchemaFile dirty page loses (#5695)
6f953aa7bb is described below

commit 6f953aa7bb172dd80c0910a19a652eff365b18c9
Author: ZhaoXin <x_...@163.com>
AuthorDate: Sun May 1 16:17:18 2022 +0800

    [IOTDB-3020][IOTDB-3022]Multi-thread MLog recovery & Fix SchemaFile dirty page loses (#5695)
---
 .../resources/conf/iotdb-engine.properties         |  4 --
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ----
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  5 --
 .../metadata/SeriesOverflowException.java          | 33 ++++++++++
 .../mtree/store/disk/MTreeFlushTaskManager.java    |  6 +-
 .../mtree/store/disk/MTreeReleaseTaskManager.java  |  3 +-
 .../mtree/store/disk/schemafile/SchemaFile.java    | 73 +++++++++++++---------
 .../db/metadata/schemaregion/SchemaEngine.java     | 73 ++++++++++++++++++++--
 .../schemaregion/SchemaRegionMemoryImpl.java       |  9 +--
 .../schemaregion/SchemaRegionSchemaFileImpl.java   | 53 +++++++++++++---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 11 files changed, 196 insertions(+), 75 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 27e06fe926..11382d2d56 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -1020,10 +1020,6 @@ timestamp_precision=ms
 ####################
 ### Schema File Configuration
 ####################
-# the max num of thread used for flushing metadata to schema file
-# Datatype: int
-# max_schema_flush_thread=15
-
 # The minimum size (in bytes) allocated for a node in schema file
 # A large number for this will make it faster while occupying more space, or otherwise
 # The default 0 means if a flushed internal(entity) had less than 20 children, it will get a segment with the size calculated from total size of its children
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 65d8264126..f679fa2286 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -827,9 +827,6 @@ public class IoTDBConfig {
   /** the memory used for metadata cache when using persistent schema */
   private int cachedMNodeSizeInSchemaFileMode = -1;
 
-  /** the max num of thread used for flushing metadata to schema file */
-  private int maxSchemaFlushThreadNum = 15;
-
   /** the minimum size (in bytes) of segment inside a schema file page */
   private short minimumSegmentInSchemaFile = 0;
 
@@ -2681,14 +2678,6 @@ public class IoTDBConfig {
     this.cachedMNodeSizeInSchemaFileMode = cachedMNodeSizeInSchemaFileMode;
   }
 
-  public int getMaxSchemaFlushThreadNum() {
-    return maxSchemaFlushThreadNum;
-  }
-
-  public void setMaxSchemaFlushThreadNum(int maxSchemaFlushThreadNum) {
-    this.maxSchemaFlushThreadNum = maxSchemaFlushThreadNum;
-  }
-
   public short getMinimumSegmentInSchemaFile() {
     return minimumSegmentInSchemaFile;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 556c03b582..c1651f66eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -766,11 +766,6 @@ public class IoTDBDescriptor {
                   "cached_mnode_size_in_schema_file_mode",
                   String.valueOf(conf.getCachedMNodeSizeInSchemaFileMode()))));
 
-      conf.setMaxSchemaFlushThreadNum(
-          Integer.parseInt(
-              properties.getProperty(
-                  "max_schema_flush_thread", String.valueOf(conf.getMaxSchemaFlushThreadNum()))));
-
       conf.setMinimumSegmentInSchemaFile(
           Short.parseShort(
               properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
new file mode 100644
index 0000000000..ddf8d9e5d9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesOverflowException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class SeriesOverflowException extends MetadataException {
+
+  public SeriesOverflowException() {
+    super(
+        "There are too many timeseries in memory, "
+            + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat, restart and create timeseries again.",
+        TSStatusCode.SERIES_OVERFLOW.getStatusCode());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeFlushTaskManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeFlushTaskManager.java
index e10fa49d54..6a4a61de10 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeFlushTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeFlushTaskManager.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.metadata.mtree.store.disk;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,10 +45,7 @@ public class MTreeFlushTaskManager {
   }
 
   public void init() {
-    flushTaskExecutor =
-        IoTDBThreadPoolFactory.newCachedThreadPool(
-            MTREE_FLUSH_THREAD_POOL_NAME,
-            IoTDBDescriptor.getInstance().getConfig().getMaxSchemaFlushThreadNum());
+    flushTaskExecutor = IoTDBThreadPoolFactory.newCachedThreadPool(MTREE_FLUSH_THREAD_POOL_NAME);
   }
 
   public void clear() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeReleaseTaskManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeReleaseTaskManager.java
index bce67fc863..7ba8cceabe 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeReleaseTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/MTreeReleaseTaskManager.java
@@ -47,8 +47,7 @@ public class MTreeReleaseTaskManager {
 
   public void init() {
     releaseTaskExecutor =
-        IoTDBThreadPoolFactory.newCachedThreadPool(
-            MTREE_RELEASE_THREAD_POOL_NAME, Runtime.getRuntime().availableProcessors());
+        IoTDBThreadPoolFactory.newCachedThreadPool(MTREE_RELEASE_THREAD_POOL_NAME);
   }
 
   public void clear() {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
index 7ee2a7e450..9bb8176572 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/schemafile/SchemaFile.java
@@ -104,6 +104,7 @@ public class SchemaFile implements ISchemaFile {
 
   private ByteBuffer headerContent;
   private int lastPageIndex; // last page index of the file, boundary to grow
+  private long lastSGAddr; // last segment of storage group node
 
   // work as a naive (read-only) cache for page instance
   private final Map<Integer, ISchemaPage> pageInstCache;
@@ -242,12 +243,11 @@ public class SchemaFile implements ISchemaFile {
     long curSegAddr = getNodeAddress(node);
 
     if (node.isStorageGroup()) {
-      // Notice that it implies StorageGroupNode is always of 0L segmentAddress
-      curSegAddr = 0L;
-      pageIndex = 0;
-      curSegIdx = 0;
+      curSegAddr = lastSGAddr;
+      pageIndex = getPageIndex(lastSGAddr);
+      curSegIdx = getSegIndex(lastSGAddr);
       isEntity = node.isEntity();
-      setNodeAddress(node, 0L);
+      setNodeAddress(node, lastSGAddr);
     } else {
       if ((curSegAddr & 0x80000000_00000000L) != 0) {
         throw new MetadataException(
@@ -326,11 +326,10 @@ public class SchemaFile implements ISchemaFile {
           curPage.deleteSegment(curSegIdx);
 
           curSegIdx = SchemaFile.getSegIndex(curSegAddr);
-          setNodeAddress(node, curSegAddr);
-          updateParentalRecord(node.getParent(), node.getName(), curSegAddr);
         }
+        setNodeAddress(node, curSegAddr);
+        updateParentalRecord(node.getParent(), node.getName(), curSegAddr);
 
-        dirtyPages.putIfAbsent(newPage.getPageIndex(), newPage);
         dirtyPages.putIfAbsent(curPage.getPageIndex(), curPage);
 
         curPage = newPage;
@@ -378,7 +377,6 @@ public class SchemaFile implements ISchemaFile {
           curPage.deleteSegment(curSegIdx);
           setNodeAddress(node, newSegAddr);
           updateParentalRecord(node.getParent(), node.getName(), newSegAddr);
-          dirtyPages.putIfAbsent(newPage.getPageIndex(), newPage);
           dirtyPages.putIfAbsent(curPage.getPageIndex(), curPage);
         } else {
           // already full page segment, write updated record to another applicable segment or a
@@ -402,7 +400,6 @@ public class SchemaFile implements ISchemaFile {
             newPage.setPrevSegAddress(getSegIndex(existedSegAddr), actualSegAddr);
 
             curPage.setNextSegAddress(getSegIndex(actualSegAddr), existedSegAddr);
-            dirtyPages.putIfAbsent(newPage.getPageIndex(), newPage);
           }
 
           ISchemaPage existedPage = getPageInstance(getPageIndex(existedSegAddr));
@@ -421,12 +418,18 @@ public class SchemaFile implements ISchemaFile {
   public void delete(IMNode node) throws IOException, MetadataException {
     long recSegAddr = node.getParent() == null ? ROOT_INDEX : getNodeAddress(node.getParent());
     recSegAddr = getTargetSegmentAddress(recSegAddr, node.getName());
-    getPageInstance(getPageIndex(recSegAddr)).removeRecord(getSegIndex(recSegAddr), node.getName());
+    ISchemaPage tarPage = getPageInstance(getPageIndex(recSegAddr));
+    dirtyPages.putIfAbsent(tarPage.getPageIndex(), tarPage);
+    tarPage.removeRecord(getSegIndex(recSegAddr), node.getName());
 
     if (!node.isMeasurement()) {
       long delSegAddr = getNodeAddress(node);
-      getPageInstance(getPageIndex(delSegAddr)).deleteSegment(getSegIndex(delSegAddr));
+      tarPage = getPageInstance(getPageIndex(delSegAddr));
+      dirtyPages.putIfAbsent(tarPage.getPageIndex(), tarPage);
+      tarPage.deleteSegment(getSegIndex(delSegAddr));
     }
+
+    flushAllDirtyPages();
   }
 
   @Override
@@ -554,8 +557,10 @@ public class SchemaFile implements ISchemaFile {
                     + "==  Internal/Entity presents as (name, is_aligned, child_segment_address)\n"
                     + "==  Measurement presents as (name, data_type, encoding, compressor, alias_if_exist)\n"
                     + "=============================\n"
-                    + "Belong to StorageGroup: [%s], total pages:%d\n",
-                storageGroupName == null ? "NOT SPECIFIED" : storageGroupName, lastPageIndex + 1));
+                    + "Belong to StorageGroup: [%s], segment of SG:%s, total pages:%d\n",
+                storageGroupName == null ? "NOT SPECIFIED" : storageGroupName,
+                Long.toHexString(lastSGAddr),
+                lastPageIndex + 1));
     int cnt = 0;
     while (cnt <= lastPageIndex) {
       ISchemaPage page = getPageInstance(cnt);
@@ -574,15 +579,14 @@ public class SchemaFile implements ISchemaFile {
    * <p><b>File Header Structure:</b>
    *
    * <ul>
-   *   <li>1 int (4 bytes): last page index
+   *   <li>1 int (4 bytes): last page index {@link #lastPageIndex}
    *   <li>var length: root(SG) node info
    *       <ul>
    *         <li><s>a. var length string (less than 200 bytes): path to root(SG) node</s>
-   *         <li>a. 1 long (8 bytes): dataTTL
-   *         <li>b. 1 bool (1 byte): isEntityStorageGroup
-   *         <li>c. 1 int (4 bytes): hash code of template name
-   *         <li>d. fixed length buffer (9 bytes): internal or entity node buffer [not implemented
-   *             yet]
+   *         <li>a. 1 long (8 bytes): dataTTL {@link #dataTTL}
+   *         <li>b. 1 bool (1 byte): isEntityStorageGroup {@link #isEntity}
+   *         <li>c. 1 int (4 bytes): hash code of template name {@link #templateHash}
+   *         <li>d. 1 long (8 bytes): last segment address of storage group {@link #lastSGAddr}
    *       </ul>
    * </ul>
    *
@@ -596,6 +600,7 @@ public class SchemaFile implements ISchemaFile {
       ReadWriteIOUtils.write(dataTTL, headerContent);
       ReadWriteIOUtils.write(isEntity, headerContent);
       ReadWriteIOUtils.write(templateHash, headerContent);
+      lastSGAddr = 0L;
       initRootPage();
     } else {
       channel.read(headerContent);
@@ -604,6 +609,7 @@ public class SchemaFile implements ISchemaFile {
       dataTTL = ReadWriteIOUtils.readLong(headerContent);
       isEntity = ReadWriteIOUtils.readBool(headerContent);
       templateHash = ReadWriteIOUtils.readInt(headerContent);
+      lastSGAddr = ReadWriteIOUtils.readLong(headerContent);
       rootPage = getPageInstance(0);
     }
   }
@@ -615,6 +621,7 @@ public class SchemaFile implements ISchemaFile {
     ReadWriteIOUtils.write(dataTTL, headerContent);
     ReadWriteIOUtils.write(isEntity, headerContent);
     ReadWriteIOUtils.write(templateHash, headerContent);
+    ReadWriteIOUtils.write(lastSGAddr, headerContent);
 
     headerContent.clear();
     channel.write(headerContent, 0);
@@ -731,15 +738,25 @@ public class SchemaFile implements ISchemaFile {
   // region Schema Page Operations
 
   /**
-   * This method checks with cached page container, LOCK and return a minimum applicable page for
+   * This method checks with cached page containers and returns a minimum applicable page for
    * allocation.
    *
+   * <p><b>Since it will only be called during write procedure, any {@link SchemaPage} returned will
+   * be added to {@link #dirtyPages}.</b>
+   *
    * @param size size of segment
    * @return
    */
   private ISchemaPage getMinApplicablePageInMem(short size) throws IOException {
+    for (Map.Entry<Integer, ISchemaPage> entry : dirtyPages.entrySet()) {
+      if (entry.getValue().isCapableForSize(size)) {
+        return dirtyPages.get(entry.getKey());
+      }
+    }
+
     for (Map.Entry<Integer, ISchemaPage> entry : pageInstCache.entrySet()) {
       if (entry.getValue().isCapableForSize(size)) {
+        dirtyPages.putIfAbsent(entry.getKey(), entry.getValue());
         return pageInstCache.get(entry.getKey());
       }
     }
@@ -755,8 +772,6 @@ public class SchemaFile implements ISchemaFile {
    * @return an existed page
    */
   private ISchemaPage getPageInstance(int pageIdx) throws IOException, MetadataException {
-    // TODO: improve concurrent control
-    //  since now one page may be evicted after returned but before updated
     if (pageIdx > lastPageIndex) {
       throw new MetadataException(String.format("Page index %d out of range.", pageIdx));
     }
@@ -765,10 +780,8 @@ public class SchemaFile implements ISchemaFile {
       return rootPage;
     }
 
-    // TODO: improve concurrent control
+    pageLocks.readLock(pageIdx);
     try {
-      pageLocks.readLock(pageIdx);
-
       if (dirtyPages.containsKey(pageIdx)) {
         return dirtyPages.get(pageIdx);
       }
@@ -809,7 +822,7 @@ public class SchemaFile implements ISchemaFile {
     // only one thread evicts and flushes pages
     if (evictLock.tryLock()) {
       try {
-        if (pageInstCache.size() >= PAGE_CACHE_SIZE) {
+        if (pageInstCache.size() > PAGE_CACHE_SIZE) {
           int removeCnt =
               (int) (0.2 * pageInstCache.size()) > 0 ? (int) (0.2 * pageInstCache.size()) : 1;
           List<Integer> rmvIds = new ArrayList<>(pageInstCache.keySet()).subList(0, removeCnt);
@@ -849,9 +862,13 @@ public class SchemaFile implements ISchemaFile {
     return (short) (globalIndex & SchemaFile.SEG_INDEX_MASK);
   }
 
-  // estimate for segment re-allocation
   private void updateParentalRecord(IMNode parent, String key, long newSegAddr)
       throws IOException, MetadataException {
+    if (parent == null || parent.getChild(key).isStorageGroup()) {
+      lastSGAddr = newSegAddr;
+      updateHeader();
+      return;
+    }
     long parSegAddr = parent.getParent() == null ? ROOT_INDEX : getNodeAddress(parent);
     parSegAddr = getTargetSegmentAddress(parSegAddr, key);
     ISchemaPage page = getPageInstance(getPageIndex(parSegAddr));
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 9e633bf46a..8398f0ab22 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.metadata.schemaregion;
 
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,7 +40,11 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 // manage all the schemaRegion in this dataNode
 public class SchemaEngine {
@@ -81,6 +86,13 @@ public class SchemaEngine {
    */
   private Map<PartialPath, List<SchemaRegionId>> initSchemaRegion() throws MetadataException {
     Map<PartialPath, List<SchemaRegionId>> partitionTable = new HashMap<>();
+
+    // recover SchemaRegion concurrently
+    ExecutorService schemaRegionRecoverPools =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors(), "SchemaRegion-recover-task");
+    List<Future<ISchemaRegion>> futures = new ArrayList<>();
+
     for (PartialPath storageGroup : localStorageGroupSchemaManager.getAllStorageGroupPaths()) {
       List<SchemaRegionId> schemaRegionIdList = new ArrayList<>();
       partitionTable.put(storageGroup, schemaRegionIdList);
@@ -104,10 +116,23 @@ public class SchemaEngine {
           // the dir/file is not schemaRegionDir, ignore this.
           continue;
         }
-        createSchemaRegion(storageGroup, schemaRegionId);
+        futures.add(
+            schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup, schemaRegionId)));
         schemaRegionIdList.add(schemaRegionId);
       }
     }
+
+    for (Future<ISchemaRegion> future : futures) {
+      try {
+        ISchemaRegion schemaRegion = future.get();
+        schemaRegionMap.put(schemaRegion.getSchemaRegionId(), schemaRegion);
+      } catch (ExecutionException | InterruptedException | RuntimeException e) {
+        logger.error("Something wrong happened during SchemaRegion recovery: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+    schemaRegionRecoverPools.shutdown();
+
     return partitionTable;
   }
 
@@ -141,12 +166,48 @@ public class SchemaEngine {
       PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
     ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
     if (schemaRegion != null) {
-      return;
+      throw new MetadataException(
+          String.format(
+              "SchemaRegion [%s] is duplicated between [%s] and [%s], "
+                  + "and the former one has been recovered.",
+              schemaRegionId, schemaRegion.getStorageGroupFullPath(), storageGroup.getFullPath()));
     }
-    localStorageGroupSchemaManager.ensureStorageGroup(storageGroup);
+    schemaRegionMap.put(
+        schemaRegionId, createSchemaRegionWithoutExistenceCheck(storageGroup, schemaRegionId));
+  }
+
+  private Callable<ISchemaRegion> recoverSchemaRegionTask(
+      PartialPath storageGroup, SchemaRegionId schemaRegionId) {
+    // this method is called for concurrent recovery of schema regions
+    return () -> {
+      long timeRecord = System.currentTimeMillis();
+      try {
+        // TODO: handle duplicated regionId across different storage group
+        ISchemaRegion schemaRegion =
+            createSchemaRegionWithoutExistenceCheck(storageGroup, schemaRegionId);
+        timeRecord = System.currentTimeMillis() - timeRecord;
+        logger.info(
+            String.format(
+                "Recover [%s] spend: %s ms",
+                storageGroup.concatNode(schemaRegionId.toString()), timeRecord));
+        return schemaRegion;
+      } catch (MetadataException e) {
+        logger.error(
+            String.format(
+                "SchemaRegion [%d] in StorageGroup [%s] failed to recover.",
+                schemaRegionId.getId(), storageGroup.getFullPath()));
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  private ISchemaRegion createSchemaRegionWithoutExistenceCheck(
+      PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
+    ISchemaRegion schemaRegion = null;
+    this.localStorageGroupSchemaManager.ensureStorageGroup(storageGroup);
     IStorageGroupMNode storageGroupMNode =
-        localStorageGroupSchemaManager.getStorageGroupNodeByStorageGroupPath(storageGroup);
-    switch (schemaRegionStoredMode) {
+        this.localStorageGroupSchemaManager.getStorageGroupNodeByStorageGroupPath(storageGroup);
+    switch (this.schemaRegionStoredMode) {
       case Memory:
         schemaRegion = new SchemaRegionMemoryImpl(storageGroup, schemaRegionId, storageGroupMNode);
         break;
@@ -165,7 +226,7 @@ public class SchemaEngine {
                 "This mode [%s] is not supported. Please check and modify it.",
                 schemaRegionStoredMode));
     }
-    schemaRegionMap.put(schemaRegionId, schemaRegion);
+    return schemaRegion;
   }
 
   public void deleteSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index e3a6e3268d..9ba26c32e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
 import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
 import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
 import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
@@ -420,9 +421,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
-      throw new MetadataException(
-          "IoTDB system load is too large to create timeseries, "
-              + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
+      throw new SeriesOverflowException();
     }
 
     try {
@@ -532,9 +531,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    */
   public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
-      throw new MetadataException(
-          "IoTDB system load is too large to create timeseries, "
-              + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
+      throw new SeriesOverflowException();
     }
 
     try {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index b2e6b48aa9..8b4ecdff40 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
 import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
 import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
 import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
@@ -351,12 +352,12 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     switch (plan.getOperatorType()) {
       case CREATE_TIMESERIES:
         CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
-        createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
+        recoverTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
         break;
       case CREATE_ALIGNED_TIMESERIES:
         CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
             (CreateAlignedTimeSeriesPlan) plan;
-        createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+        recoverAlignedTimeSeries(createAlignedTimeSeriesPlan);
         break;
       case DELETE_TIMESERIES:
         DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
@@ -428,12 +429,31 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     createTimeseries(plan, -1);
   }
 
+  public void recoverTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+    boolean done = false;
+    while (!done) {
+      try {
+        createTimeseries(plan, offset);
+        done = true;
+      } catch (SeriesOverflowException e) {
+        logger.warn(
+            "Too many timeseries during recovery from MLog, waiting for SchemaFile swapping.");
+        try {
+          Thread.sleep(3000L);
+        } catch (InterruptedException e2) {
+          logger.error("Exception occurs during timeseries recovery.");
+          throw new MetadataException(e2.getMessage());
+        }
+      }
+    }
+  }
+
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
-      throw new MetadataException(
-          "IoTDB system load is too large to create timeseries, "
-              + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
+      logger.error(
+          String.format("Series overflow when creating: [%s]", plan.getPath().getFullPath()));
+      throw new SeriesOverflowException();
     }
 
     try {
@@ -542,6 +562,25 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
             prefixPath, measurements, dataTypes, encodings, compressors, null, null, null));
   }
 
+  public void recoverAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+    boolean done = false;
+    while (!done) {
+      try {
+        createAlignedTimeSeries(plan);
+        done = true;
+      } catch (SeriesOverflowException e) {
+        logger.warn(
+            "Too many timeseries during recovery from MLog, waiting for SchemaFile swapping.");
+        try {
+          Thread.sleep(3000L);
+        } catch (InterruptedException e2) {
+          logger.error("Exception occurs during timeseries recovery.");
+          throw new MetadataException(e2.getMessage());
+        }
+      }
+    }
+  }
+
   /**
    * create aligned timeseries
    *
@@ -549,9 +588,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    */
   public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
-      throw new MetadataException(
-          "IoTDB system load is too large to create timeseries, "
-              + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
+      throw new SeriesOverflowException();
     }
 
     try {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 2466ddfdb3..2dfb511398 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -66,6 +66,7 @@ public enum TSStatusCode {
   PIPESINK_ERROR(334),
   PIPE_ERROR(335),
   PIPESERVER_ERROR(336),
+  SERIES_OVERFLOW(337),
 
   EXECUTE_STATEMENT_ERROR(400),
   SQL_PARSE_ERROR(401),