You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/12 03:05:25 UTC

[iotdb] branch master updated: Rocksdb-based metadata storage (#5295)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c6634a0df3 Rocksdb-based metadata storage (#5295)
c6634a0df3 is described below

commit c6634a0df3c56d88d673028891741a34820b30f3
Author: lisijia <44...@users.noreply.github.com>
AuthorDate: Tue Apr 12 11:05:21 2022 +0800

    Rocksdb-based metadata storage (#5295)
---
 server/pom.xml                                     |    5 +
 .../resources/conf/iotdb-engine.properties         |    2 +-
 .../assembly/resources/tools/rocksdb-transfer.bat  |  126 ++
 .../assembly/resources/tools/rocksdb-transfer.sh   |   82 +
 .../statemachine/SchemaRegionStateMachine.java     |    6 +-
 .../metadata/AcquireLockTimeoutException.java}     |    9 +-
 .../iotdb/db/metadata/LocalConfigManager.java      |   30 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   42 +-
 .../iotdb/db/metadata/mnode/MeasurementMNode.java  |    2 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    |  193 ++
 .../db/metadata/schemaregion/SchemaEngine.java     |   35 +-
 .../db/metadata/schemaregion/SchemaEngineMode.java |    3 +-
 .../db/metadata/schemaregion/SchemaRegion.java     |   32 +-
 .../metadata/schemaregion/SchemaRegionUtils.java   |   59 +
 .../schemaregion/rocksdb/CheckKeyResult.java}      |   39 +-
 .../schemaregion/rocksdb/RSchemaConstants.java     |   76 +
 .../schemaregion/rocksdb/RSchemaLogger.java        |   61 +
 .../rocksdb/RSchemaReadWriteHandler.java           |  514 +++++
 .../schemaregion/rocksdb/RSchemaRegion.java        | 1962 ++++++++++++++++++++
 .../schemaregion/rocksdb/RSchemaUtils.java         |  592 ++++++
 .../schemaregion/rocksdb/mnode/REntityMNode.java   |  124 ++
 .../schemaregion/rocksdb/mnode/RInternalMNode.java |  197 ++
 .../schemaregion/rocksdb/mnode/RMNode.java         |  234 +++
 .../schemaregion/rocksdb/mnode/RMNodeType.java     |   49 +
 .../rocksdb/mnode/RMNodeValueType.java             |   47 +
 .../rocksdb/mnode/RMeasurementMNode.java}          |  185 +-
 .../rocksdb/mnode/RStorageGroupMNode.java          |  102 +
 .../apache/iotdb/db/service/IoTDBShutdownHook.java |    6 +
 .../schemaregion/rocksdb/MRocksDBBenchmark.java    |   98 +
 .../schemaregion/rocksdb/MRocksDBUnitTest.java     |  238 +++
 .../rocksdb/RSchemaReadWriteHandlerTest.java       |   77 +
 .../rocksdb/RSchemaRegionAdvancedTest.java         |  167 ++
 .../rocksdb/RocksDBBenchmarkEngine.java            |  153 ++
 .../schemaregion/rocksdb/RocksDBBenchmarkTask.java |  109 ++
 .../schemaregion/rocksdb/RocksDBTestUtils.java     |   72 +
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |    8 +-
 36 files changed, 5555 insertions(+), 181 deletions(-)

diff --git a/server/pom.xml b/server/pom.xml
index 8514095349..55ffa494c9 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -262,6 +262,11 @@
             <version>2.6</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+            <version>6.27.3</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index dc2a019158..de12695ec6 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -962,7 +962,7 @@ timestamp_precision=ms
 ####################
 ### Schema Engine Configuration
 ####################
-# Choose the mode of schema engine. The value could be Memory and Schema_File. If the provided value doesn't match any pre-defined value, Memory mode will be used as default.
+# Choose the mode of schema engine. The value could be Memory,Schema_File and Rocksdb_based. If the provided value doesn't match any pre-defined value, Memory mode will be used as default.
 # Datatype: string
 # schema_engine_mode=Memory
 
diff --git a/server/src/assembly/resources/tools/rocksdb-transfer.bat b/server/src/assembly/resources/tools/rocksdb-transfer.bat
new file mode 100644
index 0000000000..89765a7f7e
--- /dev/null
+++ b/server/src/assembly/resources/tools/rocksdb-transfer.bat
@@ -0,0 +1,126 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements.  See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership.  The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License.  You may obtain a copy of the License at
+@REM
+@REM     http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied.  See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM
+
+@echo off
+echo ````````````````````````
+echo Use the script to transfer metadata managed by default(pure memory) SchemaEngine to RocksDB based SchemaEngine
+echo ````````````````````````
+
+
+set PATH="%JAVA_HOME%\bin\";%PATH%
+set "FULL_VERSION="
+set "MAJOR_VERSION="
+set "MINOR_VERSION="
+
+
+for /f tokens^=2-5^ delims^=.-_+^" %%j in ('java -fullversion 2^>^&1') do (
+	set "FULL_VERSION=%%j-%%k-%%l-%%m"
+	IF "%%j" == "1" (
+	    set "MAJOR_VERSION=%%k"
+	    set "MINOR_VERSION=%%l"
+	) else (
+	    set "MAJOR_VERSION=%%j"
+	    set "MINOR_VERSION=%%k"
+	)
+)
+
+set JAVA_VERSION=%MAJOR_VERSION%
+
+@REM we do not check jdk that version less than 1.8 because they are too stale...
+IF "%JAVA_VERSION%" == "6" (
+	echo IoTDB only supports jdk >= 8, please check your java version.
+	goto finally
+)
+IF "%JAVA_VERSION%" == "7" (
+	echo IoTDB only supports jdk >= 8, please check your java version.
+	goto finally
+)
+
+if "%OS%" == "Windows_NT" setlocal
+
+pushd %~dp0..
+if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%cd%
+popd
+
+set IOTDB_CONF=%IOTDB_HOME%\conf
+set IOTDB_LOGS=%IOTDB_HOME%\logs
+
+@setlocal ENABLEDELAYEDEXPANSION ENABLEEXTENSIONS
+set is_conf_path=false
+for %%i in (%*) do (
+	IF "%%i" == "-c" (
+		set is_conf_path=true
+	) ELSE IF "!is_conf_path!" == "true" (
+		set is_conf_path=false
+		set IOTDB_CONF=%%i
+	) ELSE (
+		set CONF_PARAMS=!CONF_PARAMS! %%i
+	)
+)
+
+IF EXIST "%IOTDB_CONF%\iotdb-env.bat" (
+    CALL "%IOTDB_CONF%\iotdb-env.bat" %1
+    ) ELSE (
+    echo "can't find %IOTDB_CONF%\iotdb-env.bat"
+    )
+
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.metadata.schemaregion.rocksdb.SchemaDataTransfer
+if NOT DEFINED JAVA_HOME goto :err
+
+@REM -----------------------------------------------------------------------------
+@REM JVM Opts we'll use in legacy run or installation
+set JAVA_OPTS=-ea^
+ -Dlogback.configurationFile="%IOTDB_CONF%\logback.xml"^
+ -DIOTDB_HOME="%IOTDB_HOME%"^
+ -DTSFILE_HOME="%IOTDB_HOME%"^
+ -DTSFILE_CONF="%IOTDB_CONF%"^
+ -DIOTDB_CONF="%IOTDB_CONF%"^
+ -Dsun.jnu.encoding=UTF-8^
+ -Dfile.encoding=UTF-8
+
+@REM ***** CLASSPATH library setting *****
+@REM Ensure that any user defined CLASSPATH variables are not used on startup
+set CLASSPATH="%IOTDB_HOME%\lib\*"
+set CLASSPATH=%CLASSPATH%;iotdb.IoTDB
+goto okClasspath
+
+:append
+set CLASSPATH=%CLASSPATH%;%1
+
+goto :eof
+
+@REM -----------------------------------------------------------------------------
+:okClasspath
+
+rem echo CLASSPATH: %CLASSPATH%
+
+"%JAVA_HOME%\bin\java" %JAVA_OPTS% %IOTDB_HEAP_OPTS% -cp %CLASSPATH% %IOTDB_JMX_OPTS% %MAIN_CLASS% %CONF_PARAMS%
+goto finally
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+
+@REM -----------------------------------------------------------------------------
+:finally
+
+pause
+
+ENDLOCAL
diff --git a/server/src/assembly/resources/tools/rocksdb-transfer.sh b/server/src/assembly/resources/tools/rocksdb-transfer.sh
new file mode 100644
index 0000000000..a899d084f7
--- /dev/null
+++ b/server/src/assembly/resources/tools/rocksdb-transfer.sh
@@ -0,0 +1,82 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo Use the script to transfer metadata managed by default(pure memory) SchemaEngine to RocksDB based SchemaEngine
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+IOTDB_CONF=${IOTDB_HOME}/conf
+# IOTDB_LOGS=${IOTDB_HOME}/logs
+
+is_conf_path=false
+for arg do
+  shift
+  if [ "$arg" == "-c" ]; then
+    is_conf_path=true
+    continue
+  fi
+  if [ $is_conf_path == true ]; then
+    IOTDB_CONF=$arg
+    is_conf_path=false
+    continue
+  fi
+  set -- "$@" "$arg"
+done
+
+CONF_PARAMS=$*
+
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+    if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+      . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+    else
+        . "$IOTDB_CONF/iotdb-env.sh"
+    fi
+else
+    echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.db.metadata.rocksdb.MetaDataTransfer
+
+launch_service()
+{
+	class="$1"
+	iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+	iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
+	return $?
+}
+
+# Start up the service
+launch_service "$classname"
+
+exit $?
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 2b36f96ef8..fcb73ee4ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -32,9 +32,9 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
 
   private static final Logger logger = LoggerFactory.getLogger(SchemaRegionStateMachine.class);
 
-  private final SchemaRegion region;
+  private final ISchemaRegion region;
 
-  public SchemaRegionStateMachine(SchemaRegion region) {
+  public SchemaRegionStateMachine(ISchemaRegion region) {
     this.region = region;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java
similarity index 81%
copy from server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
copy to server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java
index f02d88f4d7..f96e5dfe50 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.metadata.schemaregion;
+package org.apache.iotdb.db.exception.metadata;
 
-public enum SchemaEngineMode {
-  Memory,
-  Schema_File
+public class AcquireLockTimeoutException extends MetadataException {
+  public AcquireLockTimeoutException(String msg) {
+    super(msg);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java
index cd9c88669b..bf7ec5fa99 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateExceptio
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
 import org.apache.iotdb.db.metadata.storagegroup.IStorageGroupSchemaManager;
 import org.apache.iotdb.db.metadata.storagegroup.StorageGroupSchemaManager;
 import org.apache.iotdb.db.metadata.template.Template;
@@ -183,7 +183,7 @@ public class LocalConfigManager {
 
       partitionTable.clear();
 
-      for (SchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
+      for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
         schemaRegion.clear();
       }
       schemaEngine.clear();
@@ -206,7 +206,7 @@ public class LocalConfigManager {
     storageGroupSchemaManager.forceLog();
     templateManager.forceLog();
 
-    for (SchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
+    for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
       schemaRegion.forceMlog();
     }
   }
@@ -516,7 +516,7 @@ public class LocalConfigManager {
     partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
   }
 
-  public SchemaRegion getSchemaRegion(SchemaRegionId schemaRegionId) {
+  public ISchemaRegion getSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataException {
     return schemaEngine.getSchemaRegion(schemaRegionId);
   }
 
@@ -544,7 +544,7 @@ public class LocalConfigManager {
     }
   }
 
-  private SchemaRegion localCreateSchemaRegion(
+  private ISchemaRegion localCreateSchemaRegion(
       PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
     return schemaEngine.createSchemaRegion(
         storageGroup,
@@ -559,10 +559,10 @@ public class LocalConfigManager {
    * root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
    * thrown.
    */
-  public SchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
+  public ISchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
     PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
     SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
-    SchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+    ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
     if (schemaRegion == null) {
       schemaRegion = localCreateSchemaRegion(storageGroup, schemaRegionId);
       partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
@@ -571,7 +571,7 @@ public class LocalConfigManager {
   }
 
   // This interface involves storage group auto creation
-  public SchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
+  public ISchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
       throws MetadataException {
     ensureStorageGroup(path, true);
     return getBelongedSchemaRegion(path);
@@ -583,9 +583,9 @@ public class LocalConfigManager {
    * paths represented by the given pathPattern. If isPrefixMatch, all storage groups under the
    * prefixPath that matches the given pathPattern will be collected.
    */
-  public List<SchemaRegion> getInvolvedSchemaRegions(PartialPath pathPattern, boolean isPrefixMatch)
-      throws MetadataException {
-    List<SchemaRegion> result = new ArrayList<>();
+  public List<ISchemaRegion> getInvolvedSchemaRegions(
+      PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException {
+    List<ISchemaRegion> result = new ArrayList<>();
     for (PartialPath storageGroup :
         storageGroupSchemaManager.getInvolvedStorageGroups(pathPattern, isPrefixMatch)) {
       for (SchemaRegionId schemaRegionId :
@@ -597,9 +597,9 @@ public class LocalConfigManager {
     return result;
   }
 
-  public List<SchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
+  public List<ISchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
       throws MetadataException {
-    List<SchemaRegion> result = new ArrayList<>();
+    List<ISchemaRegion> result = new ArrayList<>();
     for (SchemaRegionId schemaRegionId :
         partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup)) {
       result.add(schemaEngine.getSchemaRegion(schemaRegionId));
@@ -709,7 +709,7 @@ public class LocalConfigManager {
   public Set<String> getPathsSetTemplate(String templateName) throws MetadataException {
     Set<String> result = new HashSet<>();
     if (templateName.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
-      for (SchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
+      for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
         result.addAll(schemaRegion.getPathsSetTemplate(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
       }
     } else {
@@ -726,7 +726,7 @@ public class LocalConfigManager {
   public Set<String> getPathsUsingTemplate(String templateName) throws MetadataException {
     Set<String> result = new HashSet<>();
     if (templateName.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
-      for (SchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
+      for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
         result.addAll(schemaRegion.getPathsUsingTemplate(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
       }
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 0793afdfc4..56a8c0729d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
@@ -158,12 +158,12 @@ public class LocalSchemaProcessor {
    * root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
    * thrown.
    */
-  private SchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
+  private ISchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
     return configManager.getBelongedSchemaRegion(path);
   }
 
   // This interface involves storage group auto creation
-  private SchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
+  private ISchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
       throws MetadataException {
     return configManager.getBelongedSchemaRegionWithAutoCreate(path);
   }
@@ -174,12 +174,12 @@ public class LocalSchemaProcessor {
    * paths represented by the given pathPattern. If isPrefixMatch, all storage groups under the
    * prefixPath that matches the given pathPattern will be collected.
    */
-  private List<SchemaRegion> getInvolvedSchemaRegions(
+  private List<ISchemaRegion> getInvolvedSchemaRegions(
       PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException {
     return configManager.getInvolvedSchemaRegions(pathPattern, isPrefixMatch);
   }
 
-  private List<SchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
+  private List<ISchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
       throws MetadataException {
     return configManager.getSchemaRegionsByStorageGroup(storageGroup);
   }
@@ -334,7 +334,7 @@ public class LocalSchemaProcessor {
    */
   public String deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
-    List<SchemaRegion> schemaRegions = getInvolvedSchemaRegions(pathPattern, isPrefixMatch);
+    List<ISchemaRegion> schemaRegions = getInvolvedSchemaRegions(pathPattern, isPrefixMatch);
     if (schemaRegions.isEmpty()) {
       // In the cluster mode, the deletion of a timeseries will be forwarded to all the nodes. For
       // nodes that do not have the metadata of the timeseries, the coordinator expects a
@@ -344,7 +344,7 @@ public class LocalSchemaProcessor {
     Set<String> failedNames = new HashSet<>();
     int deletedNum = 0;
     Pair<Integer, Set<String>> sgDeletionResult;
-    for (SchemaRegion schemaRegion : schemaRegions) {
+    for (ISchemaRegion schemaRegion : schemaRegions) {
       sgDeletionResult = schemaRegion.deleteTimeseries(pathPattern, isPrefixMatch);
       deletedNum += sgDeletionResult.left;
       failedNames.addAll(sgDeletionResult.right);
@@ -414,7 +414,7 @@ public class LocalSchemaProcessor {
       }
       try {
         PartialPath storageGroup = configManager.getBelongedStorageGroup(path);
-        for (SchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
+        for (ISchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
           if (schemaRegion.isPathExist(path)) {
             return true;
           }
@@ -449,7 +449,7 @@ public class LocalSchemaProcessor {
       return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
     }
     int count = 0;
-    for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+    for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
       count += schemaRegion.getAllTimeseriesCount(pathPattern, isPrefixMatch);
     }
     return count;
@@ -471,7 +471,7 @@ public class LocalSchemaProcessor {
   public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
     int num = 0;
-    for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+    for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
       num += schemaRegion.getDevicesNum(pathPattern, isPrefixMatch);
     }
     return num;
@@ -521,7 +521,7 @@ public class LocalSchemaProcessor {
       PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
     Map<PartialPath, Integer> result = new HashMap<>();
     Map<PartialPath, Integer> sgResult;
-    for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+    for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
       sgResult = schemaRegion.getMeasurementCountGroupByLevel(pathPattern, level, isPrefixMatch);
       for (PartialPath path : sgResult.keySet()) {
         if (result.containsKey(path)) {
@@ -569,7 +569,7 @@ public class LocalSchemaProcessor {
         configManager.getNodesListInGivenLevel(pathPattern, nodeLevel, isPrefixMatch, filter);
     Set<PartialPath> result = new TreeSet<>(pair.left);
     for (PartialPath storageGroup : pair.right) {
-      for (SchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
+      for (ISchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
         result.addAll(
             schemaRegion.getNodesListInGivenLevel(pathPattern, nodeLevel, isPrefixMatch, filter));
       }
@@ -593,7 +593,7 @@ public class LocalSchemaProcessor {
         configManager.getChildNodePathInNextLevel(pathPattern);
     Set<String> result = pair.left;
     for (PartialPath storageGroup : pair.right) {
-      for (SchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
+      for (ISchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
         result.addAll(schemaRegion.getChildNodePathInNextLevel(pathPattern));
       }
     }
@@ -615,7 +615,7 @@ public class LocalSchemaProcessor {
         configManager.getChildNodeNameInNextLevel(pathPattern);
     Set<String> result = pair.left;
     for (PartialPath storageGroup : pair.right) {
-      for (SchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
+      for (ISchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
         result.addAll(schemaRegion.getChildNodeNameInNextLevel(pathPattern));
       }
     }
@@ -719,7 +719,7 @@ public class LocalSchemaProcessor {
   public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
       throws MetadataException {
     Set<PartialPath> result = new TreeSet<>();
-    for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+    for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
       result.addAll(schemaRegion.getMatchedDevices(pathPattern, isPrefixMatch));
     }
     return result;
@@ -738,7 +738,7 @@ public class LocalSchemaProcessor {
     int offset = plan.getOffset();
 
     Pair<List<ShowDevicesResult>, Integer> regionResult;
-    for (SchemaRegion schemaRegion :
+    for (ISchemaRegion schemaRegion :
         getInvolvedSchemaRegions(plan.getPath(), plan.isPrefixMatch())) {
       if (limit != 0 && plan.getLimit() == 0) {
         break;
@@ -805,7 +805,7 @@ public class LocalSchemaProcessor {
     int tmpLimit = limit;
     int tmpOffset = offset;
 
-    for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+    for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
       if (limit != 0 && tmpLimit == 0) {
         break;
       }
@@ -847,7 +847,7 @@ public class LocalSchemaProcessor {
     }
 
     Pair<List<ShowTimeSeriesResult>, Integer> regionResult;
-    for (SchemaRegion schemaRegion :
+    for (ISchemaRegion schemaRegion :
         getInvolvedSchemaRegions(plan.getPath(), plan.isPrefixMatch())) {
       if (limit != 0 && plan.getLimit() == 0) {
         break;
@@ -966,7 +966,7 @@ public class LocalSchemaProcessor {
   // endregion
 
   // region Interfaces for alias and tag/attribute operations
-  public void changeAlias(PartialPath path, String alias) throws MetadataException {
+  public void changeAlias(PartialPath path, String alias) throws MetadataException, IOException {
     getBelongedSchemaRegion(path).changeAlias(path, alias);
   }
 
@@ -1060,7 +1060,7 @@ public class LocalSchemaProcessor {
   public void collectMeasurementSchema(
       PartialPath prefixPath, List<IMeasurementSchema> measurementSchemas) {
     try {
-      for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(prefixPath, true)) {
+      for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(prefixPath, true)) {
         schemaRegion.collectMeasurementSchema(prefixPath, measurementSchemas);
       }
     } catch (MetadataException ignored) {
@@ -1076,7 +1076,7 @@ public class LocalSchemaProcessor {
   public void collectTimeseriesSchema(
       PartialPath prefixPath, Collection<TimeseriesSchema> timeseriesSchemas) {
     try {
-      for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(prefixPath, true)) {
+      for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(prefixPath, true)) {
         schemaRegion.collectTimeseriesSchema(prefixPath, timeseriesSchemas);
       }
     } catch (MetadataException ignored) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
index 46341679cc..f1826c1829 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
@@ -61,7 +61,7 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
   }
 
   /** @param alias alias of measurementName */
-  MeasurementMNode(IMNode parent, String name, IMeasurementSchema schema, String alias) {
+  public MeasurementMNode(IMNode parent, String name, IMeasurementSchema schema, String alias) {
     super(parent, name);
     this.schema = schema;
     this.alias = alias;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
new file mode 100644
index 0000000000..9008224310
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface ISchemaRegion {
+
+  @SuppressWarnings("squid:S2093")
+  void init(IStorageGroupMNode storageGroupMNode) throws MetadataException;
+
+  void clear();
+
+  void forceMlog();
+
+  // this method is mainly used for recover and metadata sync
+  void operation(PhysicalPlan plan) throws IOException, MetadataException;
+
+  void deleteSchemaRegion() throws MetadataException;
+
+  @SuppressWarnings("squid:S3776")
+  // Suppress high Cognitive Complexity warning
+  void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException;
+
+  void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException;
+
+  Pair<Integer, Set<String>> deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException;
+
+  void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException;
+
+  boolean isPathExist(PartialPath path) throws MetadataException;
+
+  int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException;
+
+  int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException;
+
+  int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch)
+      throws MetadataException;
+
+  Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
+      PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException;
+
+  // region Interfaces for level Node info Query
+  List<PartialPath> getNodesListInGivenLevel(
+      PartialPath pathPattern,
+      int nodeLevel,
+      boolean isPrefixMatch,
+      LocalSchemaProcessor.StorageGroupFilter filter)
+      throws MetadataException;
+
+  Set<String> getChildNodePathInNextLevel(PartialPath pathPattern) throws MetadataException;
+
+  Set<String> getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException;
+
+  Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException;
+
+  Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException;
+
+  Pair<List<ShowDevicesResult>, Integer> getMatchedDevices(ShowDevicesPlan plan)
+      throws MetadataException;
+
+  Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      throws MetadataException;
+
+  Pair<List<ShowTimeSeriesResult>, Integer> showTimeseries(
+      ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException;
+
+  // attention: this path must be a device node
+  List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
+      throws PathNotExistException;
+
+  // region Interfaces and methods for MNode query
+  IMNode getDeviceNode(PartialPath path) throws MetadataException;
+
+  IMeasurementMNode[] getMeasurementMNodes(PartialPath deviceId, String[] measurements)
+      throws MetadataException;
+
+  IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException;
+
+  List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException;
+
+  void changeAlias(PartialPath path, String alias) throws MetadataException, IOException;
+
+  @SuppressWarnings("squid:S3776")
+  // Suppress high Cognitive Complexity warning
+  void upsertTagsAndAttributes(
+      String alias,
+      Map<String, String> tagsMap,
+      Map<String, String> attributesMap,
+      PartialPath fullPath)
+      throws MetadataException, IOException;
+
+  void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
+      throws MetadataException, IOException;
+
+  void addTags(Map<String, String> tagsMap, PartialPath fullPath)
+      throws MetadataException, IOException;
+
+  @SuppressWarnings("squid:S3776")
+  // Suppress high Cognitive Complexity warning
+  void dropTagsOrAttributes(Set<String> keySet, PartialPath fullPath)
+      throws MetadataException, IOException;
+
+  @SuppressWarnings("squid:S3776")
+  // Suppress high Cognitive Complexity warning
+  void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
+      throws MetadataException, IOException;
+
+  @SuppressWarnings("squid:S3776")
+  // Suppress high Cognitive Complexity warning
+  void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
+      throws MetadataException, IOException;
+
+  void collectMeasurementSchema(
+      PartialPath prefixPath, List<IMeasurementSchema> measurementSchemas);
+
+  void collectTimeseriesSchema(
+      PartialPath prefixPath, Collection<TimeseriesSchema> timeseriesSchemas);
+
+  @SuppressWarnings("squid:S3776")
+  // Suppress high Cognitive Complexity warning
+  IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException, IOException;
+
+  Set<String> getPathsSetTemplate(String templateName) throws MetadataException;
+
+  Set<String> getPathsUsingTemplate(String templateName) throws MetadataException;
+
+  boolean isTemplateAppendable(Template template, List<String> measurements)
+      throws MetadataException;
+
+  void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException;
+
+  void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException;
+
+  void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException;
+
+  IMeasurementMNode getMeasurementMNodeForTrigger(PartialPath fullPath) throws MetadataException;
+
+  void releaseMeasurementMNodeAfterDropTrigger(IMeasurementMNode measurementMNode)
+      throws MetadataException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 5bfaa8002f..28ce4f31d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -20,9 +20,14 @@
 package org.apache.iotdb.db.metadata.schemaregion;
 
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaRegion;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Map;
@@ -31,7 +36,9 @@ import java.util.concurrent.ConcurrentHashMap;
 // manage all the schemaRegion in this dataNode
 public class SchemaEngine {
 
-  private Map<SchemaRegionId, SchemaRegion> schemaRegionMap;
+  private Map<SchemaRegionId, ISchemaRegion> schemaRegionMap;
+  private SchemaEngineMode schemaRegionStoredMode;
+  private static final Logger logger = LoggerFactory.getLogger(SchemaEngine.class);
 
   private static class SchemaEngineManagerHolder {
     private static final SchemaEngine INSTANCE = new SchemaEngine();
@@ -47,6 +54,9 @@ public class SchemaEngine {
 
   public void init() {
     schemaRegionMap = new ConcurrentHashMap<>();
+    schemaRegionStoredMode =
+        SchemaEngineMode.valueOf(IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode());
+    logger.info("used schema engine mode: {}.", schemaRegionStoredMode);
   }
 
   public void clear() {
@@ -56,22 +66,35 @@ public class SchemaEngine {
     }
   }
 
-  public SchemaRegion getSchemaRegion(SchemaRegionId regionId) {
+  public ISchemaRegion getSchemaRegion(SchemaRegionId regionId) {
     return schemaRegionMap.get(regionId);
   }
 
-  public Collection<SchemaRegion> getAllSchemaRegions() {
+  public Collection<ISchemaRegion> getAllSchemaRegions() {
     return schemaRegionMap.values();
   }
 
-  public synchronized SchemaRegion createSchemaRegion(
+  public synchronized ISchemaRegion createSchemaRegion(
       PartialPath storageGroup, SchemaRegionId schemaRegionId, IStorageGroupMNode storageGroupMNode)
       throws MetadataException {
-    SchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
+    ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
     if (schemaRegion != null) {
       return schemaRegion;
     }
-    schemaRegion = new SchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
+    switch (schemaRegionStoredMode) {
+      case Memory:
+      case Schema_File:
+        schemaRegion = new SchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
+        break;
+      case Rocksdb_based:
+        schemaRegion = new RSchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "This mode [%s] is not supported. Please check and modify it.",
+                schemaRegionStoredMode));
+    }
     schemaRegionMap.put(schemaRegionId, schemaRegion);
     return schemaRegion;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
index f02d88f4d7..9147b9374c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
@@ -21,5 +21,6 @@ package org.apache.iotdb.db.metadata.schemaregion;
 
 public enum SchemaEngineMode {
   Memory,
-  Schema_File
+  Schema_File,
+  Rocksdb_based
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegion.java
index 8beb55ad4b..a067b17e30 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegion.java
@@ -145,7 +145,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARA
  * </ol>
  */
 @SuppressWarnings("java:S1135") // ignore todos
-public class SchemaRegion {
+public class SchemaRegion implements ISchemaRegion {
 
   private static final Logger logger = LoggerFactory.getLogger(StorageGroupSchemaManager.class);
 
@@ -323,6 +323,7 @@ public class SchemaRegion {
   }
 
   /** function for clearing metadata components of one schema region */
+  @Override
   public synchronized void clear() {
     isClearing = true;
     try {
@@ -408,31 +409,7 @@ public class SchemaRegion {
     clear();
 
     // delete all the schema region files
-    File schemaRegionDir = SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath);
-    File[] sgFiles = schemaRegionDir.listFiles();
-    if (sgFiles == null) {
-      throw new MetadataException(
-          String.format("Can't get files in schema region dir %s", schemaRegionDirPath));
-    }
-    for (File file : sgFiles) {
-      if (file.delete()) {
-        logger.info("delete schema region folder {}", schemaRegionDir.getAbsolutePath());
-      } else {
-        logger.info("delete schema region folder {} failed.", schemaRegionDir.getAbsolutePath());
-        throw new MetadataException(
-            String.format(
-                "Failed to delete schema region folder %s", schemaRegionDir.getAbsolutePath()));
-      }
-    }
-
-    if (schemaRegionDir.delete()) {
-      logger.info("delete schema region folder {}", schemaRegionDir.getAbsolutePath());
-    } else {
-      logger.info("delete schema region folder {} failed.", schemaRegionDir.getAbsolutePath());
-      throw new MetadataException(
-          String.format(
-              "Failed to delete schema region folder %s", schemaRegionDir.getAbsolutePath()));
-    }
+    SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
   }
 
   // endregion
@@ -523,7 +500,7 @@ public class SchemaRegion {
    * @param encoding the encoding function {@code Encoding} of the timeseries
    * @param compressor the compressor function {@code Compressor} of the time series
    */
-  public void createTimeseries(
+  private void createTimeseries(
       PartialPath path,
       TSDataType dataType,
       TSEncoding encoding,
@@ -748,7 +725,6 @@ public class SchemaRegion {
    * <p>(we develop this method as we need to get the node's lock after we get the lock.writeLock())
    *
    * @param path path
-   * @param
    */
   public IMNode getDeviceNodeWithAutoCreate(PartialPath path, boolean autoCreateSchema)
       throws IOException, MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionUtils.java
new file mode 100644
index 0000000000..40f2cc10c1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+
+import org.slf4j.Logger;
+
+import java.io.File;
+
+public class SchemaRegionUtils {
+
+  public static void deleteSchemaRegionFolder(String schemaRegionDirPath, Logger logger)
+      throws MetadataException {
+    File schemaRegionDir = SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath);
+    File[] sgFiles = schemaRegionDir.listFiles();
+    if (sgFiles == null) {
+      throw new MetadataException(
+          String.format("Can't get files in schema region dir %s", schemaRegionDirPath));
+    }
+    for (File file : sgFiles) {
+      if (file.delete()) {
+        logger.info("delete schema region folder {}", schemaRegionDir.getAbsolutePath());
+      } else {
+        logger.info("delete schema region folder {} failed.", schemaRegionDir.getAbsolutePath());
+        throw new MetadataException(
+            String.format(
+                "Failed to delete schema region folder %s", schemaRegionDir.getAbsolutePath()));
+      }
+    }
+
+    if (schemaRegionDir.delete()) {
+      logger.info("delete schema region folder {}", schemaRegionDir.getAbsolutePath());
+    } else {
+      logger.info("delete schema region folder {} failed.", schemaRegionDir.getAbsolutePath());
+      throw new MetadataException(
+          String.format(
+              "Failed to delete schema region folder %s", schemaRegionDir.getAbsolutePath()));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/CheckKeyResult.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/CheckKeyResult.java
index 403b714d35..739e7a7a69 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/CheckKeyResult.java
@@ -16,24 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.service;
 
-import org.apache.iotdb.db.utils.MemUtils;
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
 
-public class IoTDBShutdownHook extends Thread {
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils.NODE_TYPE_ARRAY;
 
-  private static final Logger logger = LoggerFactory.getLogger(IoTDBShutdownHook.class);
+public class CheckKeyResult {
 
-  @Override
-  public void run() {
-    if (logger.isInfoEnabled()) {
-      logger.info(
-          "IoTDB exits. Jvm memory usage: {}",
-          MemUtils.bytesCntToStr(
-              Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()));
-    }
+  private byte[] value;
+  private RMNodeType nodeType;
+
+  public boolean existAnyKey() {
+    return nodeType != null;
+  }
+
+  public byte[] getValue() {
+    return value;
+  }
+
+  public void setValue(byte[] value) {
+    this.value = value;
+  }
+
+  public void setExistType(char type) {
+    nodeType = NODE_TYPE_ARRAY[type];
+  }
+
+  public boolean getResult(RMNodeType type) {
+    return type == nodeType;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConstants.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConstants.java
new file mode 100644
index 0000000000..e97031d529
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConstants.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+public class RSchemaConstants {
+
+  public static final char ZERO = '0';
+  public static final char ROOT_CHAR = 'r';
+  public static final String ROOT = "r";
+  public static final String ROOT_STRING = "root";
+
+  public static final String PATH_SEPARATOR = ".";
+  public static final String ESCAPE_PATH_SEPARATOR = "[.]";
+
+  public static final String TABLE_NAME_TAGS = "tags";
+
+  // Node type
+  public static final char NODE_TYPE_ROOT = '\u0000';
+  public static final char NODE_TYPE_INTERNAL = '\u0001';
+  public static final char NODE_TYPE_SG = '\u0002';
+  public static final char NODE_TYPE_ENTITY = '\u0004';
+  public static final char NODE_TYPE_MEASUREMENT = '\u0008';
+  public static final char NODE_TYPE_ALIAS = '\u0010';
+
+  public static final int MAX_NODE_TYPE_NUM = NODE_TYPE_ALIAS + 1;
+
+  public static final Character[] ALL_NODE_TYPE_ARRAY =
+      new Character[] {
+        NODE_TYPE_ROOT,
+        NODE_TYPE_INTERNAL,
+        NODE_TYPE_SG,
+        NODE_TYPE_ENTITY,
+        NODE_TYPE_MEASUREMENT,
+        NODE_TYPE_ALIAS
+      };
+
+  public static final byte DATA_VERSION = 0x00;
+
+  public static final byte DEFAULT_FLAG = 0x00;
+
+  public static final byte FLAG_SET_TTL = 0x01;
+  public static final byte FLAG_HAS_SCHEMA = 0x01 << 1;
+  public static final byte FLAG_HAS_ALIAS = 0x01 << 2;
+  public static final byte FLAG_HAS_TAGS = 0x01 << 3;
+  public static final byte FLAG_HAS_ATTRIBUTES = 0x01 << 4;
+  public static final byte FLAG_IS_ALIGNED = 0x01 << 5;
+
+  public static final byte DATA_BLOCK_TYPE_TTL = 0x01;
+  public static final byte DATA_BLOCK_TYPE_SCHEMA = 0x01 << 1;
+  public static final byte DATA_BLOCK_TYPE_ALIAS = 0x01 << 2;
+  public static final byte DATA_BLOCK_TYPE_TAGS = 0x01 << 3;
+  public static final byte DATA_BLOCK_TYPE_ATTRIBUTES = 0x01 << 4;
+  // alias's origin key
+  public static final byte DATA_BLOCK_TYPE_ORIGIN_KEY = 0x01 << 5;
+
+  public static final byte[] DEFAULT_NODE_VALUE = new byte[] {DATA_VERSION, DEFAULT_FLAG};
+  public static final byte[] DEFAULT_ALIGNED_ENTITY_VALUE =
+      new byte[] {DATA_VERSION, FLAG_IS_ALIGNED};
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaLogger.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaLogger.java
new file mode 100644
index 0000000000..346a1514dc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaLogger.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.rocksdb.DBOptions;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Logger;
+import org.rocksdb.Options;
+
+public class RSchemaLogger extends Logger {
+  private final org.slf4j.Logger logger;
+
+  public RSchemaLogger(Options options, org.slf4j.Logger logger) {
+    super(options);
+    this.logger = logger;
+  }
+
+  public RSchemaLogger(DBOptions options, org.slf4j.Logger logger) {
+    super(options);
+    this.logger = logger;
+  }
+
+  @Override
+  protected void log(InfoLogLevel infoLogLevel, String logMsg) {
+    switch (infoLogLevel) {
+      case DEBUG_LEVEL:
+        logger.debug(logMsg);
+        break;
+      case NUM_INFO_LOG_LEVELS:
+      case INFO_LEVEL:
+        logger.info(logMsg);
+        break;
+      case WARN_LEVEL:
+        logger.warn(logMsg);
+        break;
+      case ERROR_LEVEL:
+      case FATAL_LEVEL:
+      case HEADER_LEVEL:
+        logger.error(logMsg);
+      default:
+        break;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandler.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandler.java
new file mode 100644
index 0000000000..86b503bdaa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandler.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.primitives.Bytes;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Filter;
+import org.rocksdb.Holder;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.rocksdb.util.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ALL_NODE_TYPE_ARRAY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_VERSION;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_FLAG;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.TABLE_NAME_TAGS;
+
+public class RSchemaReadWriteHandler {
+
+  private static final Logger logger = LoggerFactory.getLogger(RSchemaReadWriteHandler.class);
+
+  protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final String ROCKSDB_FOLDER = "rocksdb-schema";
+
+  private static final String[] INNER_TABLES =
+      new String[] {new String(RocksDB.DEFAULT_COLUMN_FAMILY), TABLE_NAME_TAGS};
+
+  public static final String ROCKSDB_PATH = config.getSystemDir() + File.separator + ROCKSDB_FOLDER;
+
+  private static final long BLOCK_CACHE = 20L * 1024 * 1024 * 1024;
+  private static final long BLOCK_CACHE_COMPRESSED = 10L * 1024 * 1024 * 1024;
+
+  private RocksDB rocksDB;
+
+  ConcurrentMap<String, ColumnFamilyHandle> columnFamilyHandleMap = new ConcurrentHashMap<>();
+  List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
+  List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+  static {
+    RocksDB.loadLibrary();
+  }
+
+  public RSchemaReadWriteHandler(String path) throws RocksDBException {
+    initReadWriteHandler(path);
+  }
+
+  public RSchemaReadWriteHandler() throws RocksDBException {
+    initReadWriteHandler(ROCKSDB_PATH);
+  }
+
+  private void initReadWriteHandler(String path) throws RocksDBException {
+    try (Options options = new Options()) {
+      org.rocksdb.Logger rocksDBLogger = new RSchemaLogger(options, logger);
+      rocksDBLogger.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
+
+      options
+          .setCreateIfMissing(true)
+          .setAllowMmapReads(true)
+          .setWriteBufferSize(64 * SizeUnit.KB)
+          .setMaxWriteBufferNumber(6)
+          .setMaxBackgroundJobs(10)
+          .setStatistics(new Statistics())
+          .setLogger(rocksDBLogger);
+
+      final Filter bloomFilter = new BloomFilter(64);
+
+      final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
+      Cache cache = new LRUCache(BLOCK_CACHE, 6);
+      tableOptions
+          .setBlockCache(cache)
+          .setFilterPolicy(bloomFilter)
+          .setBlockSizeDeviation(5)
+          .setBlockRestartInterval(10)
+          .setCacheIndexAndFilterBlocks(true)
+          .setBlockCacheCompressed(new LRUCache(BLOCK_CACHE_COMPRESSED, 6));
+
+      options.setTableFormatConfig(tableOptions);
+
+      try (DBOptions dbOptions = new DBOptions(options)) {
+
+        initColumnFamilyDescriptors(options, path);
+
+        rocksDB = RocksDB.open(dbOptions, path, columnFamilyDescriptors, columnFamilyHandles);
+
+        initInnerColumnFamilies();
+
+        initRootKey();
+      }
+    }
+  }
+
+  private void initColumnFamilyDescriptors(Options options, String path) throws RocksDBException {
+    List<byte[]> cfs = RocksDB.listColumnFamilies(options, path);
+    if (cfs.isEmpty()) {
+      cfs = new ArrayList<>();
+      cfs.add(RocksDB.DEFAULT_COLUMN_FAMILY);
+    }
+
+    for (byte[] tableBytes : cfs) {
+      columnFamilyDescriptors.add(
+          new ColumnFamilyDescriptor(tableBytes, new ColumnFamilyOptions()));
+    }
+  }
+
+  private void initInnerColumnFamilies() throws RocksDBException {
+    for (String tableNames : INNER_TABLES) {
+      boolean tableCreated = false;
+      for (ColumnFamilyHandle cfh : columnFamilyHandles) {
+        if (tableNames.equals(new String(cfh.getName()))) {
+          tableCreated = true;
+          break;
+        }
+      }
+      if (!tableCreated) {
+        createTable(tableNames);
+      }
+    }
+    for (ColumnFamilyHandle handle : columnFamilyHandles) {
+      columnFamilyHandleMap.put(new String(handle.getName()), handle);
+    }
+  }
+
+  private void initRootKey() throws RocksDBException {
+    byte[] rootKey = RSchemaUtils.toRocksDBKey(ROOT, NODE_TYPE_ROOT);
+    if (!keyExist(rootKey)) {
+      rocksDB.put(rootKey, new byte[] {DATA_VERSION, DEFAULT_FLAG});
+    }
+  }
+
+  private void createTable(String tableName) throws RocksDBException {
+    ColumnFamilyHandle columnFamilyHandle =
+        rocksDB.createColumnFamily(
+            new ColumnFamilyDescriptor(tableName.getBytes(), new ColumnFamilyOptions()));
+    columnFamilyDescriptors.add(
+        new ColumnFamilyDescriptor(tableName.getBytes(), new ColumnFamilyOptions()));
+    columnFamilyHandles.add(columnFamilyHandle);
+  }
+
+  public ColumnFamilyHandle getColumnFamilyHandleByName(String columnFamilyName) {
+    return columnFamilyHandleMap.get(columnFamilyName);
+  }
+
+  public void updateNode(byte[] key, byte[] value) throws RocksDBException {
+    rocksDB.put(key, value);
+  }
+
+  public void createNode(String levelKey, RMNodeType type, byte[] value) throws RocksDBException {
+    byte[] nodeKey = RSchemaUtils.toRocksDBKey(levelKey, type.getValue());
+    rocksDB.put(nodeKey, value);
+  }
+
+  public void createNode(byte[] nodeKey, byte[] value) throws RocksDBException {
+    rocksDB.put(nodeKey, value);
+  }
+
+  public void convertToEntityNode(String levelPath, byte[] value) throws RocksDBException {
+    try (WriteBatch batch = new WriteBatch()) {
+      byte[] internalKey = RSchemaUtils.toInternalNodeKey(levelPath);
+      byte[] entityKey = RSchemaUtils.toEntityNodeKey(levelPath);
+      batch.delete(internalKey);
+      batch.put(entityKey, value);
+      executeBatch(batch);
+    }
+  }
+
+  public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
+    String[] nodes = fullPath.getNodes();
+    String key = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
+    try {
+      byte[] value = rocksDB.get(key.getBytes());
+      if (value == null) {
+        logger.warn("path not exist: {}", key);
+        throw new MetadataException("key not exist");
+      }
+      return new MeasurementMNode(null, fullPath.getFullPath(), null, null);
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  public boolean keyExistByType(String levelKey, RMNodeType type) throws RocksDBException {
+    return keyExistByType(levelKey, type, new Holder<>());
+  }
+
+  public boolean keyExistByType(String levelKey, RMNodeType type, Holder<byte[]> holder)
+      throws RocksDBException {
+    byte[] key = RSchemaUtils.toRocksDBKey(levelKey, type.getValue());
+    return keyExist(key, holder);
+  }
+
+  public CheckKeyResult keyExistByAllTypes(String levelKey) throws RocksDBException {
+    RMNodeType[] types =
+        new RMNodeType[] {
+          RMNodeType.ALISA,
+          RMNodeType.ENTITY,
+          RMNodeType.INTERNAL,
+          RMNodeType.MEASUREMENT,
+          RMNodeType.STORAGE_GROUP
+        };
+    return keyExistByTypes(levelKey, types);
+  }
+
+  public CheckKeyResult keyExistByTypes(String levelKey, RMNodeType... types)
+      throws RocksDBException {
+    CheckKeyResult result = new CheckKeyResult();
+    try {
+      Arrays.stream(types)
+          .forEach(
+              x -> {
+                byte[] key = Bytes.concat(new byte[] {(byte) x.getValue()}, levelKey.getBytes());
+                try {
+                  Holder<byte[]> holder = new Holder<>();
+                  boolean keyExisted = keyExist(key, holder);
+                  if (keyExisted) {
+                    result.setExistType(x.getValue());
+                    result.setValue(holder.getValue());
+                  }
+                } catch (RocksDBException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+    } catch (Exception e) {
+      if (e.getCause() instanceof RocksDBException) {
+        throw (RocksDBException) e.getCause();
+      }
+      throw e;
+    }
+    return result;
+  }
+
+  public boolean keyExist(byte[] key, Holder<byte[]> holder) throws RocksDBException {
+    boolean exist = false;
+    if (rocksDB.keyMayExist(key, holder)) {
+      if (holder.getValue() == null) {
+        byte[] value = rocksDB.get(key);
+        if (value != null) {
+          exist = true;
+          holder.setValue(value);
+        }
+      } else {
+        exist = true;
+      }
+    }
+    return exist;
+  }
+
+  public boolean keyExist(byte[] key) throws RocksDBException {
+    return keyExist(key, new Holder<>());
+  }
+
+  public void scanAllKeysRecursively(Set<String> seeds, int level, Function<String, Boolean> op) {
+    if (seeds == null || seeds.isEmpty()) {
+      return;
+    }
+    Set<String> children = ConcurrentHashMap.newKeySet();
+    seeds
+        .parallelStream()
+        .forEach(
+            x -> {
+              if (Boolean.TRUE.equals(op.apply(x))) {
+                // x is not leaf node
+                String childrenPrefix = RSchemaUtils.getNextLevelOfPath(x, level);
+                children.addAll(getAllByPrefix(childrenPrefix));
+              }
+            });
+    if (!children.isEmpty()) {
+      scanAllKeysRecursively(children, level + 1, op);
+    }
+  }
+
+  public Set<String> getAllByPrefix(String prefix) {
+    Set<String> result = new HashSet<>();
+    byte[] prefixKey = prefix.getBytes();
+    try (RocksIterator iterator = rocksDB.newIterator()) {
+      for (iterator.seek(prefixKey); iterator.isValid(); iterator.next()) {
+        String key = new String(iterator.key());
+        if (!key.startsWith(prefix)) {
+          break;
+        }
+        result.add(key);
+      }
+      return result;
+    }
+  }
+
+  public byte[] get(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
+    if (columnFamilyHandle == null) {
+      return rocksDB.get(key);
+    }
+    return rocksDB.get(columnFamilyHandle, key);
+  }
+
+  public RocksIterator iterator(ColumnFamilyHandle columnFamilyHandle) {
+    if (columnFamilyHandle == null) {
+      return rocksDB.newIterator();
+    }
+    return rocksDB.newIterator(columnFamilyHandle);
+  }
+
+  public boolean existAnySiblings(String siblingPrefix) {
+    for (char type : ALL_NODE_TYPE_ARRAY) {
+      byte[] key = RSchemaUtils.toRocksDBKey(siblingPrefix, type);
+      try (RocksIterator iterator = rocksDB.newIterator()) {
+        for (iterator.seek(key); iterator.isValid(); iterator.next()) {
+          if (!RSchemaUtils.prefixMatch(iterator.key(), key)) {
+            break;
+          } else {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  public void getKeyByPrefix(String innerName, Function<String, Boolean> function) {
+    try (RocksIterator iterator = rocksDB.newIterator()) {
+      for (iterator.seek(innerName.getBytes()); iterator.isValid(); iterator.next()) {
+        String keyStr = new String(iterator.key());
+        if (!keyStr.startsWith(innerName)) {
+          break;
+        }
+        function.apply(keyStr);
+      }
+    }
+  }
+
+  public Map<byte[], byte[]> getKeyValueByPrefix(String innerName) {
+    try (RocksIterator iterator = rocksDB.newIterator()) {
+      Map<byte[], byte[]> result = new HashMap<>();
+      for (iterator.seek(innerName.getBytes()); iterator.isValid(); iterator.next()) {
+        String keyStr = new String(iterator.key());
+        if (!keyStr.startsWith(innerName)) {
+          break;
+        }
+        result.put(iterator.key(), iterator.value());
+      }
+      return result;
+    }
+  }
+
+  public String findBelongToSpecifiedNodeType(String[] nodes, char nodeType) {
+    String innerPathName;
+    for (int level = nodes.length; level > 0; level--) {
+      String[] copy = Arrays.copyOf(nodes, level);
+      innerPathName = RSchemaUtils.convertPartialPathToInnerByNodes(copy, level, nodeType);
+      boolean isBelongToType = rocksDB.keyMayExist(innerPathName.getBytes(), new Holder<>());
+      if (isBelongToType) {
+        return innerPathName;
+      }
+    }
+    return null;
+  }
+
+  public void executeBatch(WriteBatch batch) throws RocksDBException {
+    rocksDB.write(new WriteOptions(), batch);
+  }
+
+  public void deleteNode(String[] nodes, RMNodeType type) throws RocksDBException {
+    byte[] key =
+        RSchemaUtils.toRocksDBKey(
+            RSchemaUtils.getLevelPath(nodes, nodes.length - 1), type.getValue());
+    rocksDB.delete(key);
+  }
+
+  public void deleteByKey(byte[] key) throws RocksDBException {
+    rocksDB.delete(key);
+  }
+
+  public void deleteNodeByPrefix(byte[] startKey, byte[] endKey) throws RocksDBException {
+    rocksDB.deleteRange(startKey, endKey);
+  }
+
+  public void deleteNodeByPrefix(ColumnFamilyHandle handle, byte[] startKey, byte[] endKey)
+      throws RocksDBException {
+    rocksDB.deleteRange(handle, startKey, endKey);
+  }
+
+  @TestOnly
+  public void scanAllKeys(String filePath) throws IOException {
+    try (RocksIterator iterator = rocksDB.newIterator()) {
+      logger.info("\n-----------------scan rocksdb start----------------------");
+      iterator.seekToFirst();
+      File outputFile = new File(filePath);
+      if (outputFile.exists()) {
+        boolean deleted = outputFile.delete();
+        logger.info("delete output file: " + deleted);
+      }
+      try (BufferedOutputStream outputStream =
+          new BufferedOutputStream(new FileOutputStream(outputFile))) {
+        while (iterator.isValid()) {
+          byte[] key = iterator.key();
+          key[0] = (byte) (key[0] + '0');
+          outputStream.write(key);
+          outputStream.write(" -> ".getBytes());
+
+          byte[] value = iterator.value();
+          ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+          // skip the version flag and node type flag
+          ReadWriteIOUtils.readBytes(byteBuffer, 2);
+          while (byteBuffer.hasRemaining()) {
+            byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+            switch (blockType) {
+              case RSchemaConstants.DATA_BLOCK_TYPE_TTL:
+                long l = ReadWriteIOUtils.readLong(byteBuffer);
+                outputStream.write(String.valueOf(l).getBytes());
+                outputStream.write(" ".getBytes());
+                break;
+              case DATA_BLOCK_TYPE_SCHEMA:
+                MeasurementSchema schema = MeasurementSchema.deserializeFrom(byteBuffer);
+                outputStream.write(schema.toString().getBytes());
+                outputStream.write(" ".getBytes());
+                break;
+              case RSchemaConstants.DATA_BLOCK_TYPE_ALIAS:
+                String str = ReadWriteIOUtils.readString(byteBuffer);
+                outputStream.write(Objects.requireNonNull(str).getBytes());
+                outputStream.write(" ".getBytes());
+                break;
+              case DATA_BLOCK_TYPE_ORIGIN_KEY:
+                byte[] originKey = RSchemaUtils.readOriginKey(byteBuffer);
+                outputStream.write(originKey);
+                outputStream.write(" ".getBytes());
+                break;
+              case RSchemaConstants.DATA_BLOCK_TYPE_TAGS:
+              case RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES:
+                Map<String, String> map = ReadWriteIOUtils.readMap(byteBuffer);
+                for (Map.Entry<String, String> entry : Objects.requireNonNull(map).entrySet()) {
+                  outputStream.write(
+                      ("<" + entry.getKey() + "," + entry.getValue() + ">").getBytes());
+                }
+                outputStream.write(" ".getBytes());
+                break;
+              default:
+                break;
+            }
+          }
+          outputStream.write("\n".getBytes());
+          iterator.next();
+        }
+      }
+      logger.info("\n-----------------scan rocksdb end----------------------");
+    }
+  }
+
+  public void close() throws RocksDBException {
+    rocksDB.syncWal();
+    rocksDB.closeE();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
new file mode 100644
index 0000000000..0eb798fd61
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -0,0 +1,1962 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MNodeTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.metadata.LocalSchemaProcessor.StorageGroupFilter;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.idtable.IDTable;
+import org.apache.iotdb.db.metadata.idtable.IDTableManager;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaRegionUtils;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.REntityMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeValueType;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMeasurementMNode;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import com.google.common.collect.MapMaker;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.rocksdb.Holder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ALL_NODE_TYPE_ARRAY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_NODE_VALUE;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ZERO;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+public class RSchemaRegion implements ISchemaRegion {
+  private static final Logger logger = LoggerFactory.getLogger(RSchemaRegion.class);
+
+  protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  // TODO: make it configurable
+  public static final int MAX_PATH_DEPTH = 10;
+
+  private static final long MAX_LOCK_WAIT_TIME = 50;
+
+  private final RSchemaReadWriteHandler readWriteHandler;
+
+  private final ReadWriteLock deleteUpdateLock = new ReentrantReadWriteLock();
+
+  private final Map<String, ReentrantLock> locksPool =
+      new MapMaker().weakValues().initialCapacity(10000).makeMap();
+
+  private String schemaRegionDirPath;
+  private String storageGroupFullPath;
+  private ConsensusGroupId schemaRegionId;
+  private IStorageGroupMNode storageGroupMNode;
+  private int storageGroupPathLevel;
+
+  public RSchemaRegion() throws MetadataException {
+    try {
+      readWriteHandler = new RSchemaReadWriteHandler();
+    } catch (RocksDBException e) {
+      logger.error("create RocksDBReadWriteHandler fail", e);
+      throw new MetadataException(e);
+    }
+  }
+
+  public RSchemaRegion(
+      PartialPath storageGroup,
+      ConsensusGroupId schemaRegionId,
+      IStorageGroupMNode storageGroupMNode)
+      throws MetadataException {
+    this.schemaRegionId = schemaRegionId;
+    storageGroupFullPath = storageGroup.getFullPath();
+    init(storageGroupMNode);
+    try {
+      readWriteHandler = new RSchemaReadWriteHandler(schemaRegionDirPath);
+    } catch (RocksDBException e) {
+      logger.error("create RocksDBReadWriteHandler fail", e);
+      throw new MetadataException(e);
+    }
+  }
+
+  @Override
+  public void init(IStorageGroupMNode storageGroupMNode) throws MetadataException {
+    this.storageGroupMNode = storageGroupMNode;
+    schemaRegionDirPath =
+        config.getSchemaDir()
+            + File.separator
+            + storageGroupFullPath
+            + File.separator
+            + schemaRegionId.getId();
+    File schemaRegionFolder = SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath);
+    if (!schemaRegionFolder.exists()) {
+      if (schemaRegionFolder.mkdirs()) {
+        logger.info("create schema region folder {}", schemaRegionDirPath);
+      } else {
+        if (!schemaRegionFolder.exists()) {
+          logger.error("create schema region folder {} failed.", schemaRegionDirPath);
+          throw new SchemaDirCreationFailureException(schemaRegionDirPath);
+        }
+      }
+    }
+    storageGroupPathLevel = RSchemaUtils.getLevelByPartialPath(storageGroupFullPath);
+  }
+
+  @Override
+  public void forceMlog() {
+    // do nothing
+  }
+
+  @Override
+  public void operation(PhysicalPlan plan) throws IOException, MetadataException {
+    switch (plan.getOperatorType()) {
+      case CREATE_TIMESERIES:
+        CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
+        createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
+        break;
+      case CREATE_ALIGNED_TIMESERIES:
+        CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
+            (CreateAlignedTimeSeriesPlan) plan;
+        createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+        break;
+      case DELETE_TIMESERIES:
+        DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
+        // cause we only has one path for one DeleteTimeSeriesPlan
+        deleteTimeseries(deleteTimeSeriesPlan.getPaths().get(0));
+        break;
+      case CHANGE_ALIAS:
+        ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
+        changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
+        break;
+      case AUTO_CREATE_DEVICE_MNODE:
+        AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
+        autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
+        break;
+      case CHANGE_TAG_OFFSET:
+      case SET_TEMPLATE:
+      case ACTIVATE_TEMPLATE:
+      case UNSET_TEMPLATE:
+        logger.error("unsupported operations {}", plan);
+        break;
+      default:
+        logger.error("Unrecognizable command {}", plan.getOperatorType());
+    }
+  }
+
+  @Override
+  public void deleteSchemaRegion() throws MetadataException {
+    clear();
+    SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
+  }
+
+  @Override
+  public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+    try {
+      if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        createTimeseries(
+            plan.getPath(),
+            new MeasurementSchema(
+                plan.getPath().getMeasurement(),
+                plan.getDataType(),
+                plan.getEncoding(),
+                plan.getCompressor(),
+                plan.getProps()),
+            plan.getAlias(),
+            plan.getTags(),
+            plan.getAttributes());
+        // update id table if id table log file is disabled
+        if (config.isEnableIDTable() && !config.isEnableIDTableLogFile()) {
+          IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPath().getDevicePath());
+          idTable.createTimeseries(plan);
+        }
+      } else {
+        throw new AcquireLockTimeoutException(
+            "Acquire lock timeout when creating timeseries: " + plan.getPath().getFullPath());
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    } finally {
+      deleteUpdateLock.readLock().unlock();
+    }
+  }
+
+  @TestOnly
+  protected void createTimeseries(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props)
+      throws MetadataException {
+    createTimeseries(path, dataType, encoding, compressor, props, null);
+  }
+
+  @TestOnly
+  protected void createTimeseries(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props,
+      String alias)
+      throws MetadataException {
+    createTimeseries(
+        path,
+        new MeasurementSchema(path.getMeasurement(), dataType, encoding, compressor, props),
+        alias,
+        null,
+        null);
+  }
+
+  protected void createTimeseries(
+      PartialPath path,
+      IMeasurementSchema schema,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes)
+      throws MetadataException {
+    // regular check
+    if (path.getNodes().length > RSchemaRegion.MAX_PATH_DEPTH) {
+      throw new IllegalPathException(
+          String.format(
+              "path is too long, provide: %d, max: %d",
+              path.getNodeLength(), RSchemaRegion.MAX_PATH_DEPTH));
+    }
+    MetaFormatUtils.checkTimeseries(path);
+    MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), schema.getProps());
+
+    // sg check and create
+    String[] nodes = path.getNodes();
+    SchemaUtils.checkDataTypeWithEncoding(schema.getType(), schema.getEncodingType());
+
+    try {
+      createTimeSeriesRecursively(
+          nodes, nodes.length, storageGroupPathLevel, schema, alias, tags, attributes);
+      // TODO: load tags to memory
+    } catch (RocksDBException | IOException e) {
+      throw new MetadataException(e);
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void createTimeSeriesRecursively(
+      String[] nodes,
+      int start,
+      int end,
+      IMeasurementSchema schema,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes)
+      throws InterruptedException, MetadataException, RocksDBException, IOException {
+    if (start <= end) {
+      // "ROOT" node must exist and don't need to check
+      return;
+    }
+    String levelPath = RSchemaUtils.getLevelPath(nodes, start - 1);
+    Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+    if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+      try {
+        CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
+        if (!checkResult.existAnyKey()) {
+          createTimeSeriesRecursively(nodes, start - 1, end, schema, alias, tags, attributes);
+          if (start == nodes.length) {
+            createTimeSeriesNode(nodes, levelPath, schema, alias, tags, attributes);
+          } else if (start == nodes.length - 1) {
+            readWriteHandler.createNode(levelPath, RMNodeType.ENTITY, DEFAULT_NODE_VALUE);
+          } else {
+            readWriteHandler.createNode(levelPath, RMNodeType.INTERNAL, DEFAULT_NODE_VALUE);
+          }
+        } else {
+          if (start == nodes.length) {
+            throw new PathAlreadyExistException(RSchemaUtils.getPathByLevelPath(levelPath));
+          }
+
+          if (start == nodes.length - 1) {
+            if (checkResult.getResult(RMNodeType.INTERNAL)) {
+              // convert the parent node to entity if it is internal node
+              readWriteHandler.convertToEntityNode(levelPath, DEFAULT_NODE_VALUE);
+            } else if (checkResult.getResult(RMNodeType.ENTITY)) {
+              if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
+                throw new AlignedTimeseriesException(
+                    "Timeseries under this entity is aligned, please use createAlignedTimeseries"
+                        + " or change entity.",
+                    RSchemaUtils.getPathByLevelPath(levelPath));
+              }
+            } else {
+              throw new MNodeTypeMismatchException(
+                  RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
+            }
+          }
+
+          if (checkResult.getResult(RMNodeType.MEASUREMENT)
+              || checkResult.getResult(RMNodeType.ALISA)) {
+            throw new MNodeTypeMismatchException(
+                RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.INTERNAL_MNODE_TYPE);
+          }
+        }
+      } finally {
+        lock.unlock();
+      }
+    } else {
+      lock.unlock();
+      throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+    }
+  }
+
+  private void createTimeSeriesNode(
+      String[] nodes,
+      String levelPath,
+      IMeasurementSchema schema,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes)
+      throws IOException, RocksDBException, MetadataException, InterruptedException {
+    // create time-series node
+    try (WriteBatch batch = new WriteBatch()) {
+      byte[] value = RSchemaUtils.buildMeasurementNodeValue(schema, alias, tags, attributes);
+      byte[] measurementKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
+      batch.put(measurementKey, value);
+
+      // measurement with tags will save in a separate table at the same time
+      if (tags != null && !tags.isEmpty()) {
+        batch.put(
+            readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
+            measurementKey,
+            DEFAULT_NODE_VALUE);
+      }
+
+      if (StringUtils.isNotEmpty(alias)) {
+        String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
+        aliasNodes[nodes.length - 1] = alias;
+        String aliasLevelPath = RSchemaUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
+        byte[] aliasNodeKey = RSchemaUtils.toAliasNodeKey(aliasLevelPath);
+        Lock lock = locksPool.computeIfAbsent(aliasLevelPath, x -> new ReentrantLock());
+        if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+          try {
+            if (!readWriteHandler.keyExistByAllTypes(aliasLevelPath).existAnyKey()) {
+              batch.put(aliasNodeKey, RSchemaUtils.buildAliasNodeValue(measurementKey));
+              readWriteHandler.executeBatch(batch);
+            } else {
+              throw new AliasAlreadyExistException(
+                  RSchemaUtils.getPathByLevelPath(levelPath), alias);
+            }
+          } finally {
+            lock.unlock();
+          }
+        } else {
+          lock.unlock();
+          throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+        }
+      } else {
+        readWriteHandler.executeBatch(batch);
+      }
+    }
+  }
+
+  private void createAlignedTimeSeries(
+      PartialPath prefixPath,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings)
+      throws MetadataException {
+    if (prefixPath.getNodeLength() > MAX_PATH_DEPTH - 1) {
+      throw new IllegalPathException(
+          String.format(
+              "Prefix path is too long, provide: %d, max: %d",
+              prefixPath.getNodeLength(), RSchemaRegion.MAX_PATH_DEPTH - 1));
+    }
+
+    MetaFormatUtils.checkTimeseries(prefixPath);
+
+    for (int i = 0; i < measurements.size(); i++) {
+      SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
+      MetaFormatUtils.checkNodeName(measurements.get(i));
+    }
+
+    try (WriteBatch batch = new WriteBatch()) {
+      createEntityRecursively(
+          prefixPath.getNodes(), prefixPath.getNodeLength(), storageGroupPathLevel + 1, true);
+      String[] locks = new String[measurements.size()];
+      for (int i = 0; i < measurements.size(); i++) {
+        String measurement = measurements.get(i);
+        String levelPath = RSchemaUtils.getMeasurementLevelPath(prefixPath.getNodes(), measurement);
+        locks[i] = levelPath;
+        MeasurementSchema schema =
+            new MeasurementSchema(measurement, dataTypes.get(i), encodings.get(i));
+        byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+        byte[] value = RSchemaUtils.buildMeasurementNodeValue(schema, null, null, null);
+        batch.put(key, value);
+      }
+
+      for (String lockKey : locks) {
+        Lock lock = locksPool.computeIfAbsent(lockKey, x -> new ReentrantLock());
+        if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+          try {
+            if (readWriteHandler.keyExistByAllTypes(lockKey).existAnyKey()) {
+              throw new PathAlreadyExistException(lockKey);
+            }
+          } finally {
+            lock.unlock();
+          }
+        } else {
+          lock.unlock();
+          throw new AcquireLockTimeoutException("acquire lock timeout: " + lockKey);
+        }
+      }
+      readWriteHandler.executeBatch(batch);
+
+      // TODO: update cache if necessary
+    } catch (RocksDBException | IOException e) {
+      throw new MetadataException(e);
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+    PartialPath prefixPath = plan.getPrefixPath();
+    List<String> measurements = plan.getMeasurements();
+    List<TSDataType> dataTypes = plan.getDataTypes();
+    List<TSEncoding> encodings = plan.getEncodings();
+
+    try {
+      if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings);
+        // update id table if not in recovering or disable id table log file
+        if (config.isEnableIDTable() && !config.isEnableIDTableLogFile()) {
+          IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
+          idTable.createAlignedTimeseries(plan);
+        }
+      } else {
+        throw new AcquireLockTimeoutException(
+            "Acquire lock timeout when do createAlignedTimeSeries: " + prefixPath.getFullPath());
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    } finally {
+      deleteUpdateLock.readLock().unlock();
+    }
+  }
+
+  private void createEntityRecursively(String[] nodes, int start, int end, boolean aligned)
+      throws RocksDBException, MetadataException, InterruptedException {
+    if (start <= end) {
+      // "ROOT" must exist
+      return;
+    }
+    String levelPath = RSchemaUtils.getLevelPath(nodes, start - 1);
+    Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+    if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+      try {
+        CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
+        if (!checkResult.existAnyKey()) {
+          createEntityRecursively(nodes, start - 1, end, aligned);
+          if (start == nodes.length) {
+            byte[] nodeKey = RSchemaUtils.toEntityNodeKey(levelPath);
+            byte[] value = aligned ? DEFAULT_ALIGNED_ENTITY_VALUE : DEFAULT_NODE_VALUE;
+            readWriteHandler.createNode(nodeKey, value);
+          } else {
+            readWriteHandler.createNode(levelPath, RMNodeType.INTERNAL, DEFAULT_NODE_VALUE);
+          }
+        } else {
+          if (start == nodes.length) {
+            // make sure sg node and entity node are different
+            // eg.,'root.a' is a storage group path, 'root.a.b' can not be a timeseries
+            if (checkResult.getResult(RMNodeType.STORAGE_GROUP)) {
+              throw new MetadataException("Storage Group Node and Entity Node could not be same!");
+            }
+
+            if (!checkResult.getResult(RMNodeType.ENTITY)) {
+              throw new MNodeTypeMismatchException(
+                  RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
+            }
+
+            if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) == 0) {
+              throw new MetadataException(
+                  "Timeseries under this entity is not aligned, please use createTimeseries or change entity. (Path: "
+                      + RSchemaUtils.getPathByLevelPath(levelPath)
+                      + ")");
+            }
+          } else if (checkResult.getResult(RMNodeType.MEASUREMENT)
+              || checkResult.getResult(RMNodeType.ALISA)) {
+            throw new MNodeTypeMismatchException(
+                RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
+          }
+        }
+      } finally {
+        lock.unlock();
+      }
+    } else {
+      lock.unlock();
+      throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+    }
+  }
+
+  @Override
+  public Pair<Integer, Set<String>> deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException {
+    try {
+      if (deleteUpdateLock.writeLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        Set<String> failedNames = ConcurrentHashMap.newKeySet();
+        Set<IMNode> parentNeedsToCheck = ConcurrentHashMap.newKeySet();
+
+        AtomicInteger atomicInteger = new AtomicInteger(0);
+        traverseOutcomeBasins(
+            pathPattern.getNodes(),
+            MAX_PATH_DEPTH,
+            (key, value) -> {
+              String path = null;
+              RMeasurementMNode deletedNode;
+              try {
+                path = RSchemaUtils.getPathByInnerName(new String(key));
+                String[] nodes = MetaUtils.splitPathToDetachedPath(path);
+                deletedNode = new RMeasurementMNode(path, value, readWriteHandler);
+                atomicInteger.incrementAndGet();
+                try (WriteBatch batch = new WriteBatch()) {
+                  // delete the last node of path
+                  batch.delete(key);
+                  if (deletedNode.getAlias() != null) {
+                    String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
+                    aliasNodes[nodes.length - 1] = deletedNode.getAlias();
+                    String aliasLevelPath =
+                        RSchemaUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
+                    batch.delete(RSchemaUtils.toAliasNodeKey(aliasLevelPath));
+                  }
+                  if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
+                    batch.delete(
+                        readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS), key);
+                    // TODO: tags invert index update
+                  }
+                  readWriteHandler.executeBatch(batch);
+                  if (!deletedNode.getParent().isStorageGroup()) {
+                    parentNeedsToCheck.add(deletedNode.getParent());
+                  }
+                }
+              } catch (Exception e) {
+                logger.error("delete timeseries [{}] fail", path, e);
+                failedNames.add(path);
+                return false;
+              }
+              return true;
+            },
+            new Character[] {NODE_TYPE_MEASUREMENT});
+
+        // delete parent without child after timeseries deleted
+        while (true) {
+          if (parentNeedsToCheck.isEmpty()) {
+            break;
+          }
+          Set<IMNode> tempSet = ConcurrentHashMap.newKeySet();
+
+          parentNeedsToCheck
+              .parallelStream()
+              .forEach(
+                  currentNode -> {
+                    if (!currentNode.isStorageGroup()) {
+                      PartialPath parentPath = currentNode.getPartialPath();
+                      int level = parentPath.getNodeLength();
+                      int end = parentPath.getNodeLength() - 1;
+                      if (!readWriteHandler.existAnySiblings(
+                          RSchemaUtils.getLevelPathPrefix(parentPath.getNodes(), end, level))) {
+                        try {
+                          readWriteHandler.deleteNode(
+                              parentPath.getNodes(), RSchemaUtils.typeOfMNode(currentNode));
+                          IMNode parentNode = currentNode.getParent();
+                          if (!parentNode.isStorageGroup()) {
+                            tempSet.add(currentNode.getParent());
+                          }
+                        } catch (Exception e) {
+                          logger.warn("delete {} fail.", parentPath.getFullPath(), e);
+                        }
+                      }
+                    }
+                  });
+          parentNeedsToCheck.clear();
+          parentNeedsToCheck.addAll(tempSet);
+        }
+        return new Pair<>(atomicInteger.get(), failedNames);
+      } else {
+        throw new AcquireLockTimeoutException(
+            "acquire lock timeout when delete timeseries: " + pathPattern);
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+      return new Pair<>(0, null);
+    } finally {
+      deleteUpdateLock.writeLock().unlock();
+    }
+  }
+
+  private void traverseOutcomeBasins(
+      String[] nodes,
+      int maxLevel,
+      BiFunction<byte[], byte[], Boolean> function,
+      Character[] nodeTypeArray)
+      throws IllegalPathException {
+    List<String[]> allNodesArray = RSchemaUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
+    allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, function, nodeTypeArray));
+  }
+
+  private void traverseByPatternPath(
+      String[] nodes, BiFunction<byte[], byte[], Boolean> function, Character[] nodeTypeArray) {
+
+    int startIndex = 0;
+    List<String[]> scanKeys = new ArrayList<>();
+
+    int indexOfPrefix = indexOfFirstWildcard(nodes, startIndex);
+    if (indexOfPrefix >= nodes.length) {
+      Arrays.stream(nodeTypeArray)
+          .parallel()
+          .forEach(
+              x -> {
+                String levelPrefix =
+                    RSchemaUtils.convertPartialPathToInnerByNodes(nodes, nodes.length - 1, x);
+                try {
+                  Holder<byte[]> holder = new Holder<>();
+                  readWriteHandler.keyExist(levelPrefix.getBytes(), holder);
+                  if (holder.getValue() != null) {
+                    function.apply(levelPrefix.getBytes(), holder.getValue());
+                  }
+                } catch (RocksDBException e) {
+                  logger.error(e.getMessage());
+                }
+              });
+      return;
+    }
+
+    startIndex = indexOfPrefix;
+    String[] seedPath = ArrayUtils.subarray(nodes, 0, indexOfPrefix);
+    scanKeys.add(seedPath);
+
+    while (!scanKeys.isEmpty()) {
+      int firstNonWildcardIndex = indexOfFirstNonWildcard(nodes, startIndex);
+      int nextFirstWildcardIndex = indexOfFirstWildcard(nodes, firstNonWildcardIndex);
+      startIndex = nextFirstWildcardIndex;
+      int level = nextFirstWildcardIndex - 1;
+
+      boolean lastIteration = nextFirstWildcardIndex >= nodes.length;
+      Character[] nodeType;
+      if (!lastIteration) {
+        nodeType = ALL_NODE_TYPE_ARRAY;
+      } else {
+        nodeType = nodeTypeArray;
+      }
+
+      Queue<String[]> tempNodes = new ConcurrentLinkedQueue<>();
+      byte[] suffixToMatch =
+          RSchemaUtils.getSuffixOfLevelPath(
+              ArrayUtils.subarray(nodes, firstNonWildcardIndex, nextFirstWildcardIndex), level);
+
+      scanKeys
+          .parallelStream()
+          .forEach(
+              prefixNodes -> {
+                String levelPrefix =
+                    RSchemaUtils.getLevelPathPrefix(prefixNodes, prefixNodes.length - 1, level);
+                Arrays.stream(nodeType)
+                    .parallel()
+                    .forEach(
+                        x -> {
+                          byte[] startKey = RSchemaUtils.toRocksDBKey(levelPrefix, x);
+                          RocksIterator iterator = readWriteHandler.iterator(null);
+                          iterator.seek(startKey);
+                          while (iterator.isValid()) {
+                            if (!RSchemaUtils.prefixMatch(iterator.key(), startKey)) {
+                              break;
+                            }
+                            if (RSchemaUtils.suffixMatch(iterator.key(), suffixToMatch)) {
+                              if (lastIteration) {
+                                function.apply(iterator.key(), iterator.value());
+                              } else {
+                                tempNodes.add(RSchemaUtils.toMetaNodes(iterator.key()));
+                              }
+                            }
+                            iterator.next();
+                          }
+                        });
+              });
+      scanKeys.clear();
+      scanKeys.addAll(tempNodes);
+      tempNodes.clear();
+    }
+  }
+
+  private int indexOfFirstWildcard(String[] nodes, int start) {
+    int index = start;
+    for (; index < nodes.length; index++) {
+      if (ONE_LEVEL_PATH_WILDCARD.equals(nodes[index])
+          || MULTI_LEVEL_PATH_WILDCARD.equals(nodes[index])) {
+        break;
+      }
+    }
+    return index;
+  }
+
+  private int indexOfFirstNonWildcard(String[] nodes, int start) {
+    int index = start;
+    for (; index < nodes.length; index++) {
+      if (!ONE_LEVEL_PATH_WILDCARD.equals(nodes[index])
+          && !MULTI_LEVEL_PATH_WILDCARD.equals(nodes[index])) {
+        break;
+      }
+    }
+    return index;
+  }
+
+  protected Pair<Integer, Set<String>> deleteTimeseries(PartialPath pathPattern)
+      throws MetadataException {
+    return deleteTimeseries(pathPattern, false);
+  }
+
+  private IMNode getDeviceNodeWithAutoCreate(PartialPath devicePath, boolean autoCreateSchema)
+      throws MetadataException {
+    IMNode node = null;
+    try {
+      node = getDeviceNode(devicePath);
+      return node;
+    } catch (PathNotExistException e) {
+      if (!config.isAutoCreateSchemaEnabled()) {
+        throw new PathNotExistException(devicePath.getFullPath());
+      }
+      try {
+        createEntityRecursively(
+            devicePath.getNodes(), devicePath.getNodeLength(), storageGroupPathLevel, false);
+        node = getDeviceNode(devicePath);
+      } catch (RocksDBException ex) {
+        throw new MetadataException(ex);
+      } catch (InterruptedException ex) {
+        logger.warn("Acquire lock interrupted", ex);
+        Thread.currentThread().interrupt();
+      }
+    }
+    return node;
+  }
+
+  @Override
+  public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isPathExist(PartialPath path) throws MetadataException {
+    if (PATH_ROOT.equals(path.getFullPath())) {
+      return true;
+    }
+
+    String innerPathName = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+    try {
+      CheckKeyResult checkKeyResult =
+          readWriteHandler.keyExistByTypes(innerPathName, RMNodeType.values());
+      if (checkKeyResult.existAnyKey()) {
+        return true;
+      }
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+    return false;
+  }
+
+  @Override
+  public int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException {
+    return getCountByNodeType(new Character[] {NODE_TYPE_MEASUREMENT}, pathPattern.getNodes());
+  }
+
+  @TestOnly
+  public int getAllTimeseriesCount(PartialPath pathPattern) throws MetadataException {
+    return getAllTimeseriesCount(pathPattern, false);
+  }
+
+  @Override
+  public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException {
+    return getCountByNodeType(new Character[] {NODE_TYPE_ENTITY}, pathPattern.getNodes());
+  }
+
+  private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
+    AtomicInteger atomicInteger = new AtomicInteger(0);
+    BiFunction<byte[], byte[], Boolean> function =
+        (a, b) -> {
+          atomicInteger.incrementAndGet();
+          return true;
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
+    return atomicInteger.get();
+  }
+
+  @Override
+  public int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch)
+      throws MetadataException {
+    // todo support wildcard
+    if (pathPattern.getFullPath().contains(ONE_LEVEL_PATH_WILDCARD)) {
+      throw new MetadataException(
+          "Wildcards are not currently supported for this operation"
+              + " [COUNT NODES pathPattern].");
+    }
+    String innerNameByLevel =
+        RSchemaUtils.getLevelPath(pathPattern.getNodes(), pathPattern.getNodeLength() - 1, level);
+    AtomicInteger atomicInteger = new AtomicInteger(0);
+    Function<String, Boolean> function =
+        s -> {
+          atomicInteger.incrementAndGet();
+          return true;
+        };
+    Arrays.stream(ALL_NODE_TYPE_ARRAY)
+        .parallel()
+        .forEach(
+            x -> {
+              String getKeyByInnerNameLevel =
+                  x + innerNameByLevel + RSchemaConstants.PATH_SEPARATOR + level;
+              readWriteHandler.getKeyByPrefix(getKeyByInnerNameLevel, function);
+            });
+
+    return atomicInteger.get();
+  }
+
+  @Override
+  public Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
+      PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<PartialPath> getNodesListInGivenLevel(
+      PartialPath pathPattern, int nodeLevel, boolean isPrefixMatch, StorageGroupFilter filter)
+      throws MetadataException {
+    return getNodesListInGivenLevel(pathPattern, nodeLevel);
+  }
+
+  public List<PartialPath> getNodesListInGivenLevel(PartialPath pathPattern, int nodeLevel)
+      throws MetadataException {
+    // TODO: ignore pathPattern with *, all nodeLevel are start from "root.*"
+    List<PartialPath> results = new ArrayList<>();
+    if (nodeLevel == 0) {
+      results.add(new PartialPath(RSchemaConstants.ROOT));
+      return results;
+    }
+    // TODO: level one usually only contains small numbers, query in serialize
+    Set<String> paths;
+    StringBuilder builder = new StringBuilder();
+    if (nodeLevel <= 5) {
+      char level = (char) (ZERO + nodeLevel);
+      String prefix =
+          builder.append(RSchemaConstants.ROOT).append(PATH_SEPARATOR).append(level).toString();
+      paths = readWriteHandler.getAllByPrefix(prefix);
+    } else {
+      paths = ConcurrentHashMap.newKeySet();
+      char upperLevel = (char) (ZERO + nodeLevel - 1);
+      String prefix =
+          builder
+              .append(RSchemaConstants.ROOT)
+              .append(PATH_SEPARATOR)
+              .append(upperLevel)
+              .toString();
+      Set<String> parentPaths = readWriteHandler.getAllByPrefix(prefix);
+      parentPaths
+          .parallelStream()
+          .forEach(
+              x -> {
+                String targetPrefix = RSchemaUtils.getNextLevelOfPath(x, upperLevel);
+                paths.addAll(readWriteHandler.getAllByPrefix(targetPrefix));
+              });
+    }
+    return RSchemaUtils.convertToPartialPath(paths, nodeLevel);
+  }
+
+  @Override
+  public Set<String> getChildNodePathInNextLevel(PartialPath pathPattern) throws MetadataException {
+    // todo support wildcard
+    if (pathPattern.getFullPath().contains(ONE_LEVEL_PATH_WILDCARD)) {
+      throw new MetadataException(
+          "Wildcards are not currently supported for this operation"
+              + " [SHOW CHILD PATHS pathPattern].");
+    }
+    Set<String> result = Collections.synchronizedSet(new HashSet<>());
+    String innerNameByLevel =
+        RSchemaUtils.getLevelPath(
+                pathPattern.getNodes(),
+                pathPattern.getNodeLength() - 1,
+                pathPattern.getNodeLength())
+            + RSchemaConstants.PATH_SEPARATOR
+            + pathPattern.getNodeLength();
+    Function<String, Boolean> function =
+        s -> {
+          result.add(RSchemaUtils.getPathByInnerName(s));
+          return true;
+        };
+
+    Arrays.stream(ALL_NODE_TYPE_ARRAY)
+        .parallel()
+        .forEach(x -> readWriteHandler.getKeyByPrefix(x + innerNameByLevel, function));
+    return result;
+  }
+
+  @Override
+  public Set<String> getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException {
+    Set<String> childPath = getChildNodePathInNextLevel(pathPattern);
+    Set<String> childName = new HashSet<>();
+    for (String str : childPath) {
+      childName.add(str.substring(str.lastIndexOf(RSchemaConstants.PATH_SEPARATOR) + 1));
+    }
+    return childName;
+  }
+
+  @Override
+  public Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException {
+    Set<PartialPath> result = Collections.synchronizedSet(new HashSet<>());
+    BiFunction<byte[], byte[], Boolean> function =
+        (a, b) -> {
+          String path = new String(a);
+          PartialPath partialPath;
+          try {
+            partialPath =
+                new PartialPath(path.substring(0, path.lastIndexOf(IoTDBConstant.PATH_SEPARATOR)));
+          } catch (IllegalPathException e) {
+            return false;
+          }
+          result.add(partialPath);
+          return true;
+        };
+    traverseOutcomeBasins(
+        timeseries.getNodes(), MAX_PATH_DEPTH, function, new Character[NODE_TYPE_ENTITY]);
+    return result;
+  }
+
+  @Override
+  public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
+      throws MetadataException {
+    Set<PartialPath> allPath = new HashSet<>();
+    getMatchedPathByNodeType(pathPattern.getNodes(), new Character[] {NODE_TYPE_ENTITY}, allPath);
+    return allPath;
+  }
+
+  private void getMatchedPathByNodeType(
+      String[] nodes, Character[] nodetype, Collection<PartialPath> collection)
+      throws IllegalPathException {
+    List<String> allResult = Collections.synchronizedList(new ArrayList<>());
+    BiFunction<byte[], byte[], Boolean> function =
+        (a, b) -> {
+          allResult.add(RSchemaUtils.getPathByInnerName(new String(a)));
+          return true;
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
+
+    for (String path : allResult) {
+      collection.add(new PartialPath(path));
+    }
+  }
+
+  @Override
+  public Pair<List<ShowDevicesResult>, Integer> getMatchedDevices(ShowDevicesPlan plan)
+      throws MetadataException {
+    List<ShowDevicesResult> res = Collections.synchronizedList(new ArrayList<>());
+    BiFunction<byte[], byte[], Boolean> function =
+        (a, b) -> {
+          String fullPath = RSchemaUtils.getPathByInnerName(new String(a));
+          res.add(new ShowDevicesResult(fullPath, RSchemaUtils.isAligned(b), storageGroupFullPath));
+          return true;
+        };
+    traverseOutcomeBasins(
+        plan.getPath().getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_ENTITY});
+
+    // todo Page query, record offset
+    return new Pair<>(res, 1);
+  }
+
+  @Override
+  public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMath)
+      throws MetadataException {
+    List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
+    BiFunction<byte[], byte[], Boolean> function =
+        (a, b) -> {
+          allResult.add(
+              new RMeasurementMNode(
+                      RSchemaUtils.getPathByInnerName(new String(a)), b, readWriteHandler)
+                  .getMeasurementPath());
+          return true;
+        };
+    traverseOutcomeBasins(
+        pathPattern.getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
+
+    return allResult;
+  }
+
+  @Override
+  public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
+      PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+      throws MetadataException {
+    // todo page query
+    return new Pair<>(getMeasurementPaths(pathPattern, false), offset + limit);
+  }
+
+  @Override
+  public Pair<List<ShowTimeSeriesResult>, Integer> showTimeseries(
+      ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
+    if (plan.getKey() != null && plan.getValue() != null) {
+      return showTimeseriesWithIndex(plan, context);
+    } else {
+      return showTimeseriesWithoutIndex(plan, context);
+    }
+  }
+
+  private Pair<List<ShowTimeSeriesResult>, Integer> showTimeseriesWithIndex(
+      ShowTimeSeriesPlan plan, QueryContext context) {
+    // temporarily unsupported
+    throw new UnsupportedOperationException("temporarily unsupported : showTimeseriesWithIndex");
+  }
+
+  private Pair<List<ShowTimeSeriesResult>, Integer> showTimeseriesWithoutIndex(
+      ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
+
+    List<ShowTimeSeriesResult> res = new LinkedList<>();
+    Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> measurementPathsAndTags =
+        getMatchedMeasurementPathWithTags(plan.getPath().getNodes());
+    for (Entry<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> entry :
+        measurementPathsAndTags.entrySet()) {
+      MeasurementPath measurementPath = entry.getKey();
+      res.add(
+          new ShowTimeSeriesResult(
+              measurementPath.getFullPath(),
+              measurementPath.getMeasurementAlias(),
+              storageGroupFullPath,
+              measurementPath.getMeasurementSchema().getType(),
+              measurementPath.getMeasurementSchema().getEncodingType(),
+              measurementPath.getMeasurementSchema().getCompressor(),
+              0,
+              entry.getValue().left,
+              entry.getValue().right));
+    }
+    // todo Page query, record offset
+    return new Pair<>(res, 1);
+  }
+
+  private Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>>
+      getMatchedMeasurementPathWithTags(String[] nodes) throws IllegalPathException {
+    Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> allResult =
+        new ConcurrentHashMap<>();
+    BiFunction<byte[], byte[], Boolean> function =
+        (a, b) -> {
+          MeasurementPath measurementPath =
+              new RMeasurementMNode(
+                      RSchemaUtils.getPathByInnerName(new String(a)), b, readWriteHandler)
+                  .getMeasurementPath();
+          Object tag = RSchemaUtils.parseNodeValue(b, RMNodeValueType.TAGS);
+          if (!(tag instanceof Map)) {
+            tag = Collections.emptyMap();
+          }
+          Object attributes = RSchemaUtils.parseNodeValue(b, RMNodeValueType.ATTRIBUTES);
+          if (!(attributes instanceof Map)) {
+            attributes = Collections.emptyMap();
+          }
+          @SuppressWarnings("unchecked")
+          Pair<Map<String, String>, Map<String, String>> tagsAndAttributes =
+              new Pair<>((Map<String, String>) tag, (Map<String, String>) attributes);
+          allResult.put(measurementPath, tagsAndAttributes);
+          return true;
+        };
+    traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
+
+    return allResult;
+  }
+
+  @Override
+  public List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
+      throws PathNotExistException {
+    List<MeasurementPath> result = new ArrayList<>();
+    String nextLevelPathName =
+        RSchemaUtils.convertPartialPathToInner(
+            devicePath.getFullPath(), devicePath.getNodeLength() + 1, NODE_TYPE_MEASUREMENT);
+    Map<byte[], byte[]> allMeasurementPath =
+        readWriteHandler.getKeyValueByPrefix(nextLevelPathName);
+    for (Map.Entry<byte[], byte[]> entry : allMeasurementPath.entrySet()) {
+      PartialPath pathName;
+      try {
+        pathName = new PartialPath(new String(entry.getKey()));
+      } catch (IllegalPathException e) {
+        throw new PathNotExistException(e.getMessage());
+      }
+      MeasurementSchema measurementSchema =
+          (MeasurementSchema) RSchemaUtils.parseNodeValue(entry.getValue(), RMNodeValueType.SCHEMA);
+      result.add(new MeasurementPath(pathName, measurementSchema));
+    }
+    return result;
+  }
+
+  @Override
+  public IMNode getDeviceNode(PartialPath path) throws MetadataException {
+    String[] nodes = path.getNodes();
+    String levelPath = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
+    Holder<byte[]> holder = new Holder<>();
+    try {
+      if (readWriteHandler.keyExistByType(levelPath, RMNodeType.ENTITY, holder)) {
+        return new REntityMNode(path.getFullPath(), holder.getValue(), readWriteHandler);
+      } else {
+        throw new PathNotExistException(path.getFullPath());
+      }
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  @Override
+  public IMeasurementMNode[] getMeasurementMNodes(PartialPath deviceId, String[] measurements)
+      throws MetadataException {
+    IMeasurementMNode[] mNodes = new IMeasurementMNode[measurements.length];
+    for (int i = 0; i < mNodes.length; i++) {
+      try {
+        mNodes[i] = getMeasurementMNode(deviceId.concatNode(measurements[i]));
+      } catch (PathNotExistException | MNodeTypeMismatchException ignored) {
+        logger.warn("MeasurementMNode {} does not exist in {}", measurements[i], deviceId);
+      }
+      if (mNodes[i] == null && !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+        throw new MetadataException(measurements[i] + " does not exist in " + deviceId);
+      }
+    }
+    return mNodes;
+  }
+
+  @Override
+  public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
+    String[] nodes = fullPath.getNodes();
+    String key = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
+    IMeasurementMNode node = null;
+    try {
+      Holder<byte[]> holder = new Holder<>();
+      if (readWriteHandler.keyExistByType(key, RMNodeType.MEASUREMENT, holder)) {
+        node = new RMeasurementMNode(fullPath.getFullPath(), holder.getValue(), readWriteHandler);
+      } else if (readWriteHandler.keyExistByType(key, RMNodeType.ALISA, holder)) {
+        byte[] aliasValue = holder.getValue();
+        if (aliasValue != null) {
+          ByteBuffer byteBuffer = ByteBuffer.wrap(aliasValue);
+          ReadWriteIOUtils.readBytes(byteBuffer, 3);
+          byte[] oriKey = RSchemaUtils.readOriginKey(byteBuffer);
+          node =
+              new RMeasurementMNode(
+                  fullPath.getFullPath(), readWriteHandler.get(null, oriKey), readWriteHandler);
+        }
+      }
+      return node;
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  @Override
+  public void changeAlias(PartialPath path, String alias) throws MetadataException, IOException {
+    upsertTagsAndAttributes(alias, null, null, path);
+  }
+
+  @Override
+  public void upsertTagsAndAttributes(
+      String alias,
+      Map<String, String> tagsMap,
+      Map<String, String> attributesMap,
+      PartialPath path)
+      throws MetadataException, IOException {
+    try {
+      if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+        byte[] originKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
+        try {
+          Lock rawKeyLock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+          if (rawKeyLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+            try {
+              boolean hasUpdate = false;
+              String[] nodes = path.getNodes();
+              RMeasurementMNode mNode = (RMeasurementMNode) getMeasurementMNode(path);
+              // upsert alias
+              if (StringUtils.isNotEmpty(alias)
+                  && (StringUtils.isEmpty(mNode.getAlias()) || !mNode.getAlias().equals(alias))) {
+                String oldAliasStr = mNode.getAlias();
+                mNode.setAlias(alias);
+                hasUpdate = true;
+                String[] newAlias = Arrays.copyOf(nodes, nodes.length);
+                newAlias[nodes.length - 1] = alias;
+                String newAliasLevel = RSchemaUtils.getLevelPath(newAlias, newAlias.length - 1);
+                byte[] newAliasKey = RSchemaUtils.toAliasNodeKey(newAliasLevel);
+                Lock newAliasLock =
+                    locksPool.computeIfAbsent(newAliasLevel, x -> new ReentrantLock());
+                Lock oldAliasLock = null;
+                try (WriteBatch batch = new WriteBatch()) {
+                  if (newAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+                    if (readWriteHandler.keyExistByAllTypes(newAliasLevel).existAnyKey()) {
+                      throw new PathAlreadyExistException("Alias node has exist: " + newAliasLevel);
+                    }
+                    batch.put(newAliasKey, RSchemaUtils.buildAliasNodeValue(originKey));
+                    if (StringUtils.isNotEmpty(oldAliasStr)) {
+                      String[] oldAliasNodes = Arrays.copyOf(nodes, nodes.length);
+                      oldAliasNodes[nodes.length - 1] = oldAliasStr;
+                      String oldAliasLevel =
+                          RSchemaUtils.getLevelPath(oldAliasNodes, oldAliasNodes.length - 1);
+                      byte[] oldAliasKey = RSchemaUtils.toAliasNodeKey(oldAliasLevel);
+                      oldAliasLock =
+                          locksPool.computeIfAbsent(oldAliasLevel, x -> new ReentrantLock());
+                      if (oldAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+                        if (!readWriteHandler.keyExist(oldAliasKey)) {
+                          logger.error(
+                              "origin node [{}] has alias but alias node [{}] doesn't exist ",
+                              levelPath,
+                              oldAliasLevel);
+                        }
+                        batch.delete(oldAliasKey);
+                      } else {
+                        throw new AcquireLockTimeoutException(
+                            "acquire lock timeout: " + oldAliasLevel);
+                      }
+                    }
+                  } else {
+                    throw new AcquireLockTimeoutException("acquire lock timeout: " + newAliasLevel);
+                  }
+                  // TODO: need application lock
+                  readWriteHandler.executeBatch(batch);
+                } finally {
+                  newAliasLock.unlock();
+                  if (oldAliasLock != null) {
+                    oldAliasLock.unlock();
+                  }
+                }
+              }
+
+              try (WriteBatch batch = new WriteBatch()) {
+                if (tagsMap != null && !tagsMap.isEmpty()) {
+                  if (mNode.getTags() == null) {
+                    mNode.setTags(tagsMap);
+                  } else {
+                    mNode.getTags().putAll(tagsMap);
+                  }
+                  batch.put(
+                      readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
+                      originKey,
+                      DEFAULT_NODE_VALUE);
+                  hasUpdate = true;
+                }
+                if (attributesMap != null && !attributesMap.isEmpty()) {
+                  if (mNode.getAttributes() == null) {
+                    mNode.setAttributes(attributesMap);
+                  } else {
+                    mNode.getAttributes().putAll(attributesMap);
+                  }
+                  hasUpdate = true;
+                }
+                if (hasUpdate) {
+                  batch.put(originKey, mNode.getRocksDBValue());
+                  readWriteHandler.executeBatch(batch);
+                }
+              }
+
+            } finally {
+              rawKeyLock.unlock();
+            }
+          } else {
+            rawKeyLock.unlock();
+            throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+          }
+        } catch (RocksDBException e) {
+          throw new MetadataException(e);
+        }
+      } else {
+        throw new AcquireLockTimeoutException(
+            "acquire lock timeout when do upsertTagsAndAttributes: " + path.getFullPath());
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    } finally {
+      deleteUpdateLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void addAttributes(Map<String, String> attributesMap, PartialPath path)
+      throws MetadataException, IOException {
+    if (attributesMap == null || attributesMap.isEmpty()) {
+      return;
+    }
+
+    try {
+      if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+        byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+        Holder<byte[]> holder = new Holder<>();
+        try {
+          Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+          if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!readWriteHandler.keyExist(key, holder)) {
+                throw new PathNotExistException(path.getFullPath());
+              }
+
+              byte[] originValue = holder.getValue();
+              RMeasurementMNode mNode =
+                  new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+              if (mNode.getAttributes() != null) {
+                for (Map.Entry<String, String> entry : attributesMap.entrySet()) {
+                  if (mNode.getAttributes().containsKey(entry.getKey())) {
+                    throw new MetadataException(
+                        String.format(
+                            "TimeSeries [%s] already has the attribute [%s].",
+                            path, new String(key)));
+                  }
+                }
+                attributesMap.putAll(mNode.getAttributes());
+              }
+              mNode.setAttributes(attributesMap);
+              readWriteHandler.updateNode(key, mNode.getRocksDBValue());
+            } finally {
+              lock.unlock();
+            }
+          } else {
+            lock.unlock();
+            throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+          }
+        } catch (RocksDBException e) {
+          throw new MetadataException(e);
+        }
+      } else {
+        throw new AcquireLockTimeoutException(
+            "acquire lock timeout when do addAttributes: " + path.getFullPath());
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    } finally {
+      deleteUpdateLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void addTags(Map<String, String> tagsMap, PartialPath path)
+      throws MetadataException, IOException {
+    if (tagsMap == null || tagsMap.isEmpty()) {
+      return;
+    }
+
+    try {
+      if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+        byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+        Holder<byte[]> holder = new Holder<>();
+        try {
+          Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+          if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!readWriteHandler.keyExist(key, holder)) {
+                throw new PathNotExistException(path.getFullPath());
+              }
+              byte[] originValue = holder.getValue();
+              RMeasurementMNode mNode =
+                  new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+              boolean hasTags = false;
+              if (mNode.getTags() != null && mNode.getTags().size() > 0) {
+                hasTags = true;
+                for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
+                  if (mNode.getTags().containsKey(entry.getKey())) {
+                    throw new MetadataException(
+                        String.format(
+                            "TimeSeries [%s] already has the tag [%s].", path, new String(key)));
+                  }
+                  tagsMap.putAll(mNode.getTags());
+                }
+              }
+              mNode.setTags(tagsMap);
+              try (WriteBatch batch = new WriteBatch()) {
+                if (!hasTags) {
+                  batch.put(
+                      readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
+                      key,
+                      DEFAULT_NODE_VALUE);
+                }
+                batch.put(key, mNode.getRocksDBValue());
+                // TODO: need application lock
+                readWriteHandler.executeBatch(batch);
+              }
+            } finally {
+              lock.unlock();
+            }
+          } else {
+            lock.unlock();
+            throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+          }
+        } catch (RocksDBException e) {
+          throw new MetadataException(e);
+        }
+      } else {
+        throw new AcquireLockTimeoutException(
+            "acquire lock timeout when do addTags: " + path.getFullPath());
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    } finally {
+      deleteUpdateLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void dropTagsOrAttributes(Set<String> keySet, PartialPath path)
+      throws MetadataException, IOException {
+    if (keySet == null || keySet.isEmpty()) {
+      return;
+    }
+    try {
+      if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+        byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+        Holder<byte[]> holder = new Holder<>();
+        try {
+          Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+          if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!readWriteHandler.keyExist(key, holder)) {
+                throw new PathNotExistException(path.getFullPath());
+              }
+
+              byte[] originValue = holder.getValue();
+              RMeasurementMNode mNode =
+                  new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+              int tagLen = mNode.getTags() == null ? 0 : mNode.getTags().size();
+
+              boolean didAnyUpdate = false;
+              for (String toDelete : keySet) {
+                didAnyUpdate = false;
+                if (mNode.getTags() != null && mNode.getTags().containsKey(toDelete)) {
+                  mNode.getTags().remove(toDelete);
+                  didAnyUpdate = true;
+                }
+                if (mNode.getAttributes() != null && mNode.getAttributes().containsKey(toDelete)) {
+                  mNode.getAttributes().remove(toDelete);
+                  didAnyUpdate = true;
+                }
+                if (!didAnyUpdate) {
+                  logger.warn("TimeSeries [{}] does not have tag/attribute [{}]", path, toDelete);
+                }
+              }
+              if (didAnyUpdate) {
+                try (WriteBatch batch = new WriteBatch()) {
+                  if (tagLen > 0 && mNode.getTags().size() <= 0) {
+                    batch.delete(
+                        readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS), key);
+                  }
+                  batch.put(key, mNode.getRocksDBValue());
+                  readWriteHandler.executeBatch(batch);
+                }
+              }
+            } finally {
+              lock.unlock();
+            }
+          } else {
+            lock.unlock();
+            throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+          }
+        } catch (RocksDBException e) {
+          throw new MetadataException(e);
+        }
+      } else {
+        throw new AcquireLockTimeoutException(
+            "acquire lock timeout when do dropTagsOrAttributes: " + path.getFullPath());
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    } finally {
+      deleteUpdateLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath path)
+      throws MetadataException, IOException {
+    if (alterMap == null || alterMap.isEmpty()) {
+      return;
+    }
+
+    try {
+      if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+        byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+        Holder<byte[]> holder = new Holder<>();
+        try {
+          Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+          if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!readWriteHandler.keyExist(key, holder)) {
+                throw new PathNotExistException(path.getFullPath());
+              }
+              byte[] originValue = holder.getValue();
+              RMeasurementMNode mNode =
+                  new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+              boolean didAnyUpdate = false;
+              for (Map.Entry<String, String> entry : alterMap.entrySet()) {
+                didAnyUpdate = false;
+                if (mNode.getTags() != null && mNode.getTags().containsKey(entry.getKey())) {
+                  mNode.getTags().put(entry.getKey(), entry.getValue());
+                  didAnyUpdate = true;
+                }
+                if (mNode.getAttributes() != null
+                    && mNode.getAttributes().containsKey(entry.getKey())) {
+                  mNode.getAttributes().put(entry.getKey(), entry.getValue());
+                  didAnyUpdate = true;
+                }
+                if (!didAnyUpdate) {
+                  throw new MetadataException(
+                      String.format(
+                          "TimeSeries [%s] does not have tag/attribute [%s].",
+                          path, new String(key)),
+                      true);
+                }
+              }
+              if (didAnyUpdate) {
+                readWriteHandler.updateNode(key, mNode.getRocksDBValue());
+              }
+            } finally {
+              lock.unlock();
+            }
+          } else {
+            lock.unlock();
+            throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+          }
+        } catch (RocksDBException e) {
+          throw new MetadataException(e);
+        }
+      } else {
+        throw new AcquireLockTimeoutException(
+            "acquire lock timeout when do setTagsOrAttributesValue: " + path.getFullPath());
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    } finally {
+      deleteUpdateLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath path)
+      throws MetadataException, IOException {
+    if (StringUtils.isEmpty(oldKey) || StringUtils.isEmpty(newKey)) {
+      return;
+    }
+
+    try {
+      if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+        byte[] nodeKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
+        Holder<byte[]> holder = new Holder<>();
+        try {
+          Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+          if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+            try {
+              if (!readWriteHandler.keyExist(nodeKey, holder)) {
+                throw new PathNotExistException(path.getFullPath());
+              }
+              byte[] originValue = holder.getValue();
+              RMeasurementMNode mNode =
+                  new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+              boolean didAnyUpdate = false;
+              if (mNode.getTags() != null && mNode.getTags().containsKey(oldKey)) {
+                String value = mNode.getTags().get(oldKey);
+                mNode.getTags().remove(oldKey);
+                mNode.getTags().put(newKey, value);
+                didAnyUpdate = true;
+              }
+
+              if (mNode.getAttributes() != null && mNode.getAttributes().containsKey(oldKey)) {
+                String value = mNode.getAttributes().get(oldKey);
+                mNode.getAttributes().remove(oldKey);
+                mNode.getAttributes().put(newKey, value);
+                didAnyUpdate = true;
+              }
+
+              if (didAnyUpdate) {
+                readWriteHandler.updateNode(nodeKey, mNode.getRocksDBValue());
+              } else {
+                throw new MetadataException(
+                    String.format(
+                        "TimeSeries [%s] does not have tag/attribute [%s].", path, oldKey),
+                    true);
+              }
+            } finally {
+              lock.unlock();
+            }
+          } else {
+            lock.unlock();
+            throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+          }
+        } catch (RocksDBException e) {
+          throw new MetadataException(e);
+        }
+      } else {
+        throw new AcquireLockTimeoutException(
+            "acquire lock timeout when do renameTagOrAttributeKey: " + path.getFullPath());
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Acquire lock interrupted", e);
+      Thread.currentThread().interrupt();
+    } finally {
+      deleteUpdateLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void collectMeasurementSchema(
+      PartialPath prefixPath, List<IMeasurementSchema> measurementSchemas) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void collectTimeseriesSchema(
+      PartialPath prefixPath, Collection<TimeseriesSchema> timeseriesSchemas) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
+      throws MetadataException, IOException {
+    // devicePath is a logical path which is parent of measurement, whether in template or not
+    PartialPath devicePath = plan.getDevicePath();
+    String[] measurementList = plan.getMeasurements();
+    IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
+
+    IMNode deviceMNode = getDeviceNodeWithAutoCreate(devicePath, plan.isAligned());
+
+    if (deviceMNode == null) {
+      throw new MetadataException(
+          String.format("Failed to create deviceMNode,device path:[%s]", plan.getDevicePath()));
+    }
+    // check insert non-aligned InsertPlan for aligned timeseries
+    if (deviceMNode.isEntity()) {
+      if (plan.isAligned() && !deviceMNode.getAsEntityMNode().isAligned()) {
+        throw new MetadataException(
+            String.format(
+                "Timeseries under path [%s] is not aligned , please set"
+                    + " InsertPlan.isAligned() = false",
+                plan.getDevicePath()));
+      }
+
+      if (!plan.isAligned() && deviceMNode.getAsEntityMNode().isAligned()) {
+        throw new MetadataException(
+            String.format(
+                "Timeseries under path [%s] is aligned , please set"
+                    + " InsertPlan.isAligned() = true",
+                plan.getDevicePath()));
+      }
+    }
+
+    // get node for each measurement
+    Map<Integer, IMeasurementMNode> nodeMap = new HashMap<>();
+    Map<Integer, PartialPath> missingNodeIndex = new HashMap<>();
+    for (int i = 0; i < measurementList.length; i++) {
+      PartialPath path = new PartialPath(devicePath.getFullPath(), measurementList[i]);
+      IMeasurementMNode node = getMeasurementMNode(path);
+      if (node == null) {
+        if (!config.isAutoCreateSchemaEnabled()) {
+          throw new PathNotExistException(path.getFullPath());
+        }
+        missingNodeIndex.put(i, path);
+      } else {
+        nodeMap.put(i, node);
+      }
+    }
+
+    // create missing nodes
+    if (!missingNodeIndex.isEmpty()) {
+      if (!(plan instanceof InsertRowPlan) && !(plan instanceof InsertTabletPlan)) {
+        throw new MetadataException(
+            String.format(
+                "Only support insertRow and insertTablet, plan is [%s]", plan.getOperatorType()));
+      }
+
+      if (plan.isAligned()) {
+        List<String> measurements = new ArrayList<>();
+        List<TSDataType> dataTypes = new ArrayList<>();
+        List<TSEncoding> encodings = new ArrayList<>();
+        for (Integer index : missingNodeIndex.keySet()) {
+          measurements.add(measurementList[index]);
+          TSDataType type = plan.getDataTypes()[index];
+          dataTypes.add(type);
+          encodings.add(getDefaultEncoding(type));
+        }
+        createAlignedTimeSeries(devicePath, measurements, dataTypes, encodings);
+      } else {
+        for (Map.Entry<Integer, PartialPath> entry : missingNodeIndex.entrySet()) {
+          IMeasurementSchema schema =
+              new MeasurementSchema(
+                  entry.getValue().getMeasurement(), plan.getDataTypes()[entry.getKey()]);
+          createTimeseries(entry.getValue(), schema, null, null, null);
+        }
+      }
+
+      // get the latest node
+      for (Entry<Integer, PartialPath> entry : missingNodeIndex.entrySet()) {
+        nodeMap.put(entry.getKey(), getMeasurementMNode(entry.getValue()));
+      }
+    }
+
+    // check datatype
+    for (int i = 0; i < measurementList.length; i++) {
+      try {
+        // check type is match
+        if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
+          try {
+            checkDataTypeMatch(plan, i, nodeMap.get(i).getSchema().getType());
+          } catch (DataTypeMismatchException mismatchException) {
+            logger.warn(
+                "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
+                measurementList[i],
+                plan.getDataTypes()[i],
+                nodeMap.get(i).getSchema().getType());
+            if (!config.isEnablePartialInsert()) {
+              throw mismatchException;
+            } else {
+              // mark failed measurement
+              plan.markFailedMeasurementInsertion(i, mismatchException);
+              continue;
+            }
+          }
+          measurementMNodes[i] = nodeMap.get(i);
+          // set measurementName instead of alias
+          measurementList[i] = nodeMap.get(i).getName();
+        }
+      } catch (MetadataException e) {
+        if (IoTDB.isClusterMode()) {
+          logger.debug(
+              "meet error when check {}.{}, message: {}",
+              devicePath,
+              measurementList[i],
+              e.getMessage());
+        } else {
+          logger.warn(
+              "meet error when check {}.{}, message: {}",
+              devicePath,
+              measurementList[i],
+              e.getMessage());
+        }
+        if (config.isEnablePartialInsert()) {
+          // mark failed measurement
+          plan.markFailedMeasurementInsertion(i, e);
+        } else {
+          throw e;
+        }
+      }
+    }
+    return deviceMNode;
+  }
+
+  private void checkDataTypeMatch(InsertPlan plan, int loc, TSDataType dataType)
+      throws MetadataException {
+    TSDataType insertDataType;
+    if (plan instanceof InsertRowPlan) {
+      if (!((InsertRowPlan) plan).isNeedInferType()) {
+        // only when InsertRowPlan's values is object[], we should check type
+        insertDataType = getTypeInLoc(plan, loc);
+      } else {
+        insertDataType = dataType;
+      }
+    } else {
+      insertDataType = getTypeInLoc(plan, loc);
+    }
+    if (dataType != insertDataType) {
+      String measurement = plan.getMeasurements()[loc];
+      logger.warn(
+          "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
+          measurement,
+          insertDataType,
+          dataType);
+      throw new DataTypeMismatchException(measurement, insertDataType, dataType);
+    }
+  }
+  /** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */
+  private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException {
+    TSDataType dataType;
+    if (plan instanceof InsertRowPlan) {
+      InsertRowPlan tPlan = (InsertRowPlan) plan;
+      dataType =
+          TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
+    } else if (plan instanceof InsertTabletPlan) {
+      dataType = (plan).getDataTypes()[loc];
+    } else {
+      throw new MetadataException(
+          String.format(
+              "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+    }
+    return dataType;
+  }
+
+  @Override
+  public void clear() {
+    try {
+      readWriteHandler.close();
+    } catch (RocksDBException e) {
+      logger.error("Failed to close readWriteHandler,try again.", e);
+      try {
+        Thread.sleep(5);
+        readWriteHandler.close();
+      } catch (RocksDBException e1) {
+        logger.error(String.format("This schemaRegion [%s] closed failed.", this), e);
+      } catch (InterruptedException e1) {
+        logger.warn("Close RocksdDB instance interrupted", e1);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @Override
+  public Set<String> getPathsSetTemplate(String templateName) throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<String> getPathsUsingTemplate(String templateName) throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isTemplateAppendable(Template template, List<String> measurements) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public IMeasurementMNode getMeasurementMNodeForTrigger(PartialPath fullPath)
+      throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void releaseMeasurementMNodeAfterDropTrigger(IMeasurementMNode measurementMNode)
+      throws MetadataException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("storage group:[%s]", storageGroupFullPath);
+  }
+
+  @TestOnly
+  public void printScanAllKeys() throws IOException {
+    readWriteHandler.scanAllKeys(schemaRegionDirPath + File.separator + "scanAllKeys.txt");
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaUtils.java
new file mode 100644
index 0000000000..1ad26f3aa8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaUtils.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.REntityMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeValueType;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMeasurementMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RStorageGroupMNode;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_ALIAS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_TAGS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_TTL;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_VERSION;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_FLAG;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ESCAPE_PATH_SEPARATOR;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_HAS_ALIAS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_HAS_ATTRIBUTES;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_HAS_SCHEMA;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_HAS_TAGS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_SET_TTL;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ALIAS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_INTERNAL;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.PATH_SEPARATOR;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT_CHAR;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT_STRING;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ZERO;
+
+public class RSchemaUtils {
+  public static final RMNodeType[] NODE_TYPE_ARRAY = new RMNodeType[NODE_TYPE_ALIAS + 1];
+
+  static {
+    NODE_TYPE_ARRAY[NODE_TYPE_INTERNAL] = RMNodeType.INTERNAL;
+    NODE_TYPE_ARRAY[NODE_TYPE_SG] = RMNodeType.STORAGE_GROUP;
+    NODE_TYPE_ARRAY[NODE_TYPE_ENTITY] = RMNodeType.ENTITY;
+    NODE_TYPE_ARRAY[NODE_TYPE_MEASUREMENT] = RMNodeType.MEASUREMENT;
+    NODE_TYPE_ARRAY[NODE_TYPE_ALIAS] = RMNodeType.ALISA;
+  }
+
+  protected static byte[] toInternalNodeKey(String levelPath) {
+    return toRocksDBKey(levelPath, NODE_TYPE_INTERNAL);
+  }
+
+  protected static byte[] toStorageNodeKey(String levelPath) {
+    return toRocksDBKey(levelPath, NODE_TYPE_SG);
+  }
+
+  protected static byte[] toEntityNodeKey(String levelPath) {
+    return toRocksDBKey(levelPath, NODE_TYPE_ENTITY);
+  }
+
+  protected static byte[] toMeasurementNodeKey(String levelPath) {
+    return toRocksDBKey(levelPath, NODE_TYPE_MEASUREMENT);
+  }
+
+  protected static byte[] toAliasNodeKey(String levelPath) {
+    return toRocksDBKey(levelPath, NODE_TYPE_ALIAS);
+  }
+
+  protected static byte[] toRocksDBKey(String levelPath, char type) {
+    return (type + levelPath).getBytes();
+  }
+
+  public static String getLevelPathPrefix(String[] nodes, int end, int level) {
+    StringBuilder builder = new StringBuilder();
+    char depth = (char) (ZERO + level);
+    builder.append(ROOT).append(PATH_SEPARATOR).append(depth);
+    for (int i = 1; i <= end; i++) {
+      builder.append(nodes[i]).append(PATH_SEPARATOR).append(depth);
+    }
+    return builder.toString();
+  }
+
+  public static String getLevelPath(String[] nodes, int end) {
+    return getLevelPath(nodes, end, end);
+  }
+
+  public static String getLevelPath(String[] nodes, int end, int level) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(ROOT);
+    char depth = (char) (ZERO + level);
+    for (int i = 1; i <= end; i++) {
+      builder.append(PATH_SEPARATOR).append(depth).append(nodes[i]);
+    }
+    return builder.toString();
+  }
+
+  public static String getMeasurementLevelPath(String[] prefixPath, String measurement) {
+    String[] nodes = ArrayUtils.add(prefixPath, measurement);
+    return getLevelPath(nodes, nodes.length - 1);
+  }
+
+  public static List<PartialPath> convertToPartialPath(Collection<String> paths, int level) {
+    return paths
+        .parallelStream()
+        .map(x -> getPartialPathFromInnerPath(x, level))
+        .collect(Collectors.toList());
+  }
+
+  public static String getNextLevelOfPath(String innerPath, int currentLevel) {
+    char levelChar = (char) (ZERO + currentLevel);
+    String old = PATH_SEPARATOR + levelChar;
+    String target = PATH_SEPARATOR + (char) (levelChar + 1);
+    return innerPath.replace(old, target);
+  }
+
+  public static String getNextLevelOfPath(String innerPath, char currentLevel) {
+    String old = PATH_SEPARATOR + currentLevel;
+    String target = PATH_SEPARATOR + (char) (currentLevel + 1);
+    return innerPath.replace(old, target);
+  }
+
+  public static PartialPath getPartialPathFromInnerPath(String path, int level) {
+    char charLevel = (char) (ZERO + level);
+    return getPartialPathFromInnerPath(path, charLevel);
+  }
+
+  public static PartialPath getPartialPathFromInnerPath(String path, char level) {
+    String pathWithoutLevel = path.replace(PATH_SEPARATOR + level, PATH_SEPARATOR);
+    String[] nodes = pathWithoutLevel.split(ESCAPE_PATH_SEPARATOR);
+    nodes[0] = PATH_ROOT;
+    return new PartialPath(nodes);
+  }
+
+  public static RMNodeType typeOfMNode(IMNode mNode) {
+    // order sensitive
+    if (mNode instanceof REntityMNode) {
+      return RMNodeType.ENTITY;
+    }
+
+    if (mNode instanceof RStorageGroupMNode) {
+      return RMNodeType.STORAGE_GROUP;
+    }
+
+    if (mNode instanceof RMeasurementMNode) {
+      return RMNodeType.MEASUREMENT;
+    }
+
+    return RMNodeType.INTERNAL;
+  }
+
+  public static byte[] buildMeasurementNodeValue(
+      IMeasurementSchema schema,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes)
+      throws IOException {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    ReadWriteIOUtils.write(DATA_VERSION, outputStream);
+
+    byte flag = DEFAULT_FLAG;
+    if (alias != null) {
+      flag = (byte) (flag | FLAG_HAS_ALIAS);
+    }
+
+    if (tags != null && tags.size() > 0) {
+      flag = (byte) (flag | FLAG_HAS_TAGS);
+    }
+
+    if (attributes != null && attributes.size() > 0) {
+      flag = (byte) (flag | FLAG_HAS_ATTRIBUTES);
+    }
+
+    if (schema != null) {
+      flag = (byte) (flag | FLAG_HAS_SCHEMA);
+    }
+
+    ReadWriteIOUtils.write(flag, outputStream);
+
+    if (schema != null) {
+      ReadWriteIOUtils.write(DATA_BLOCK_TYPE_SCHEMA, outputStream);
+      schema.serializeTo(outputStream);
+    }
+
+    if (alias != null) {
+      ReadWriteIOUtils.write(DATA_BLOCK_TYPE_ALIAS, outputStream);
+      ReadWriteIOUtils.write(alias, outputStream);
+    }
+
+    if (tags != null && tags.size() > 0) {
+      ReadWriteIOUtils.write(DATA_BLOCK_TYPE_TAGS, outputStream);
+      ReadWriteIOUtils.write(tags, outputStream);
+    }
+
+    if (attributes != null && attributes.size() > 0) {
+      ReadWriteIOUtils.write(DATA_BLOCK_TYPE_TAGS, outputStream);
+      ReadWriteIOUtils.write(tags, outputStream);
+    }
+    return outputStream.toByteArray();
+  }
+
+  public static byte[] buildAliasNodeValue(byte[] originKey) {
+    byte[] prefix = new byte[] {DATA_VERSION, DEFAULT_FLAG, DATA_BLOCK_TYPE_ORIGIN_KEY};
+    byte[] len = BytesUtils.intToBytes(originKey.length);
+    return BytesUtils.concatByteArray(BytesUtils.concatByteArray(prefix, len), originKey);
+  }
+
+  public static byte[] readOriginKey(ByteBuffer buffer) {
+    int len = ReadWriteIOUtils.readInt(buffer);
+    return ReadWriteIOUtils.readBytes(buffer, len);
+  }
+
+  public static int indexOfDataBlockType(byte[] data, RMNodeValueType valueType) {
+    if (valueType.getFlag() != null && (data[1] & valueType.getFlag()) == 0) {
+      return -1;
+    }
+
+    int index = -1;
+    boolean typeExist = false;
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+    // skip the data version and filter byte
+    ReadWriteIOUtils.readBytes(byteBuffer, 2);
+
+    while (byteBuffer.hasRemaining()) {
+      byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+      index = byteBuffer.position();
+      switch (blockType) {
+        case DATA_BLOCK_TYPE_TTL:
+          ReadWriteIOUtils.readLong(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_ALIAS:
+          ReadWriteIOUtils.readString(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_ORIGIN_KEY:
+          readOriginKey(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_SCHEMA:
+          MeasurementSchema.deserializeFrom(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_TAGS:
+        case DATA_BLOCK_TYPE_ATTRIBUTES:
+          ReadWriteIOUtils.readMap(byteBuffer);
+          break;
+        default:
+          break;
+      }
+      // got the data we need,don't need to read any more
+      if (valueType.getType() == blockType) {
+        typeExist = true;
+        break;
+      }
+    }
+    return typeExist ? index : -1;
+  }
+
+  public static byte[] updateTTL(byte[] origin, long ttl) {
+    int index = indexOfDataBlockType(origin, RMNodeValueType.TTL);
+    if (index < 1) {
+      byte[] ttlBlock = new byte[Long.BYTES + 1];
+      ttlBlock[0] = DATA_BLOCK_TYPE_TTL;
+      BytesUtils.longToBytes(ttl, ttlBlock, 1);
+      origin[1] = (byte) (origin[1] | FLAG_SET_TTL);
+      return BytesUtils.concatByteArray(origin, ttlBlock);
+    } else {
+      BytesUtils.longToBytes(ttl, origin, index);
+      return origin;
+    }
+  }
+
+  private static final char START_FLAG = '\u0019';
+  private static final char SPLIT_FLAG = '.';
+
+  /**
+   * parse value and return a specified type. if no data is required, null is returned.
+   *
+   * @param value value written in default table
+   * @param valueType the type of value to obtain
+   */
+  public static Object parseNodeValue(byte[] value, RMNodeValueType valueType) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+    // skip the version flag and node type flag
+    ReadWriteIOUtils.readByte(byteBuffer);
+    // get block type
+    byte filter = ReadWriteIOUtils.readByte(byteBuffer);
+    Object obj = null;
+    if (valueType.getFlag() != null && (filter & valueType.getFlag()) == 0) {
+      return obj;
+    }
+
+    // this means that the following data contains the information we need
+    while (byteBuffer.hasRemaining()) {
+      byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+      switch (blockType) {
+        case DATA_BLOCK_TYPE_TTL:
+          obj = ReadWriteIOUtils.readLong(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_ALIAS:
+          obj = ReadWriteIOUtils.readString(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_ORIGIN_KEY:
+          obj = readOriginKey(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_SCHEMA:
+          obj = MeasurementSchema.deserializeFrom(byteBuffer);
+          break;
+        case DATA_BLOCK_TYPE_TAGS:
+        case DATA_BLOCK_TYPE_ATTRIBUTES:
+          obj = ReadWriteIOUtils.readMap(byteBuffer);
+          break;
+        default:
+          break;
+      }
+      // got the data we need,don't need to read any more
+      if (valueType.getType() == blockType) {
+        break;
+      }
+    }
+    return obj;
+  }
+
+  public static boolean isAligned(byte[] value) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+    // skip the version flag and node type flag
+    ReadWriteIOUtils.readByte(byteBuffer);
+    // get block type
+    byte flag = ReadWriteIOUtils.readByte(byteBuffer);
+    return (flag & FLAG_IS_ALIGNED) > 0;
+  }
+  /**
+   * get inner name by converting partial path.
+   *
+   * @param partialPath the path needed to be converted.
+   * @param level the level needed to be added.
+   * @param nodeType specified type
+   * @return inner name
+   */
+  public static String convertPartialPathToInner(String partialPath, int level, char nodeType) {
+    StringBuilder stringBuilder = new StringBuilder(nodeType + ROOT);
+    for (int i = partialPath.indexOf(PATH_SEPARATOR); i < partialPath.length() && i >= 0; i++) {
+      char currentChar = partialPath.charAt(i);
+      stringBuilder.append(currentChar);
+      if (currentChar == SPLIT_FLAG) {
+        stringBuilder.append(level);
+      }
+    }
+    return stringBuilder.toString();
+  }
+
+  public static String convertPartialPathToInnerByNodes(String[] nodes, int level, char nodeType) {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append(nodeType).append(ROOT);
+    for (int i = 0; i < nodes.length; i++) {
+      if (i == 0 && nodes[i].equals(ROOT_STRING)) {
+        continue;
+      }
+      stringBuilder.append(SPLIT_FLAG).append(level).append(nodes[i]);
+    }
+    return stringBuilder.toString();
+  }
+
+  public static int getLevelByPartialPath(String partialPath) {
+    int levelCount = 0;
+    for (char c : partialPath.toCharArray()) {
+      if (SPLIT_FLAG == c) {
+        levelCount++;
+      }
+    }
+    return levelCount;
+  }
+
+  public static byte[] getSuffixOfLevelPath(String[] nodes, int level) {
+    StringBuilder stringBuilder = new StringBuilder();
+    for (String str : nodes) {
+      stringBuilder.append(PATH_SEPARATOR).append(level).append(str);
+    }
+    return stringBuilder.toString().getBytes();
+  }
+
+  public static boolean suffixMatch(byte[] key, byte[] suffix) {
+    if (key.length < suffix.length) {
+      return false;
+    }
+
+    for (int i = key.length - 1, j = suffix.length - 1; i >= 0 && j >= 0; i--, j--) {
+      if ((key[i] ^ suffix[j]) != 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static boolean prefixMatch(byte[] key, byte[] prefix) {
+    if (key.length < prefix.length) {
+      return false;
+    }
+
+    for (int i = 0, j = 0; i < key.length && j < prefix.length; i++, j++) {
+      if ((key[i] ^ prefix[j]) != 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static String[] toMetaNodes(byte[] rocksdbKey) {
+    String rawKey =
+        new String(
+            Objects.requireNonNull(BytesUtils.subBytes(rocksdbKey, 1, rocksdbKey.length - 1)));
+    String[] nodes = rawKey.split(ESCAPE_PATH_SEPARATOR);
+    nodes[0] = ROOT_STRING;
+    for (int i = 1; i < nodes.length; i++) {
+      nodes[i] = nodes[i].substring(1);
+    }
+    return nodes;
+  }
+
+  public static String getPathByInnerName(String innerName) {
+    char[] keyConvertToCharArray = innerName.toCharArray();
+    StringBuilder stringBuilder = new StringBuilder();
+    char lastChar = START_FLAG;
+    boolean replaceFlag = true;
+    for (char c : keyConvertToCharArray) {
+      if (SPLIT_FLAG == lastChar || START_FLAG == lastChar) {
+        lastChar = c;
+        continue;
+      }
+      if (ROOT_CHAR == c && replaceFlag) {
+        lastChar = c;
+        stringBuilder.append(ROOT_STRING);
+        replaceFlag = false;
+        continue;
+      }
+      stringBuilder.append(c);
+      lastChar = c;
+    }
+    return stringBuilder.toString();
+  }
+
+  public static String getPathByLevelPath(String innerName) {
+    char[] keyConvertToCharArray = innerName.toCharArray();
+    StringBuilder stringBuilder = new StringBuilder();
+    char lastChar = START_FLAG;
+    boolean replaceFlag = true;
+    for (char c : keyConvertToCharArray) {
+      if (SPLIT_FLAG == lastChar) {
+        lastChar = c;
+        continue;
+      }
+      if (ROOT_CHAR == c && replaceFlag) {
+        lastChar = c;
+        stringBuilder.append(ROOT_STRING);
+        replaceFlag = false;
+        continue;
+      }
+      stringBuilder.append(c);
+      lastChar = c;
+    }
+    return stringBuilder.toString();
+  }
+
+  public static String[] newStringArray(String[] oldArray) throws IllegalPathException {
+    StringBuilder stringBuilder = new StringBuilder();
+    for (String str : oldArray) {
+      stringBuilder.append(PATH_SEPARATOR).append(str);
+    }
+    return MetaUtils.splitPathToDetachedPath(stringBuilder.substring(1));
+  }
+
+  public static String replaceWildcard(int num) {
+    StringBuilder stringBuilder = new StringBuilder();
+    for (int i = 0; i < num; i++) {
+      stringBuilder.append(RSchemaConstants.PATH_SEPARATOR).append(ONE_LEVEL_PATH_WILDCARD);
+    }
+    return stringBuilder.substring(1);
+  }
+
+  public static List<int[]> getAllCompoundMode(int sum, int n) {
+    if (n <= 2) {
+      List<int[]> result = new ArrayList<>();
+      for (int i = 1; i < sum; i++) {
+        result.add(new int[] {i, sum - i});
+      }
+      return result;
+    }
+    List<int[]> allResult = new ArrayList<>();
+    for (int i = 1; i <= sum - n + 1; i++) {
+      List<int[]> temp = getAllCompoundMode(sum - i, n - 1);
+      for (int[] value : temp) {
+        int[] result = new int[value.length + 1];
+        result[0] = i;
+        System.arraycopy(value, 0, result, 1, value.length);
+        allResult.add(result);
+      }
+    }
+    return allResult;
+  }
+
+  // eg. root.a.*.**.b.**.c
+  public static List<String[]> replaceMultiWildcardToSingle(String[] nodes, int maxLevel)
+      throws IllegalPathException {
+    List<String[]> allNodesArray = new ArrayList<>();
+    List<Integer> multiWildcardPosition = new ArrayList<>();
+    for (int i = 0; i < nodes.length; i++) {
+      if (MULTI_LEVEL_PATH_WILDCARD.equals(nodes[i])) {
+        multiWildcardPosition.add(i);
+      }
+    }
+    if (multiWildcardPosition.isEmpty()) {
+      allNodesArray.add(nodes);
+    } else if (multiWildcardPosition.size() == 1) {
+      for (int i = 1; i <= maxLevel - nodes.length + 2; i++) {
+        String[] clone = nodes.clone();
+        clone[multiWildcardPosition.get(0)] = replaceWildcard(i);
+        allNodesArray.add(RSchemaUtils.newStringArray(clone));
+      }
+    } else {
+      for (int sum = multiWildcardPosition.size();
+          sum <= maxLevel - (nodes.length - multiWildcardPosition.size() - 1);
+          sum++) {
+        List<int[]> result = getAllCompoundMode(sum, multiWildcardPosition.size());
+        for (int[] value : result) {
+          String[] clone = nodes.clone();
+          for (int i = 0; i < value.length; i++) {
+            clone[multiWildcardPosition.get(i)] = replaceWildcard(value[i]);
+          }
+          allNodesArray.add(RSchemaUtils.newStringArray(clone));
+        }
+      }
+    }
+    return allNodesArray;
+  }
+
+  public static String concatNodesName(String[] nodes, int startIdx, int endIdx) {
+    StringBuilder stringBuilder = new StringBuilder();
+    for (int i = startIdx; i <= endIdx; i++) {
+      stringBuilder.append(PATH_SEPARATOR).append(nodes[i]);
+    }
+    return stringBuilder.substring(1);
+  }
+
+  public static boolean startWith(byte[] a, byte[] b) {
+    if (a.length < b.length) {
+      return false;
+    }
+
+    for (int i = 0; i < b.length; i++) {
+      if (a[i] != b[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/REntityMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/REntityMNode.java
new file mode 100644
index 0000000000..97768c9e88
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/REntityMNode.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class REntityMNode extends RInternalMNode implements IEntityMNode {
+
+  private volatile boolean isAligned = false;
+
+  /**
+   * Constructor of MNode.
+   *
+   * @param fullPath
+   */
+  public REntityMNode(String fullPath, RSchemaReadWriteHandler readWriteHandler) {
+    super(fullPath, readWriteHandler);
+  }
+
+  @Override
+  void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException {
+    String innerName =
+        RSchemaUtils.convertPartialPathToInner(
+            childName, childNameMaxLevel, RMNodeType.ENTITY.getValue());
+    try {
+      readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  public REntityMNode(String fullPath, byte[] value, RSchemaReadWriteHandler readWriteHandler) {
+    super(fullPath, readWriteHandler);
+    deserialize(value);
+  }
+
+  @Override
+  public boolean addAlias(String alias, IMeasurementMNode child) {
+    // In rocksdb-based mode, there is no need to update in memory
+    return true;
+  }
+
+  @Override
+  public void deleteAliasChild(String alias) {
+    // Don't do any update in the MNode related class, it won't persistent in memory or on disk
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, IMeasurementMNode> getAliasChildren() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setAliasChildren(Map<String, IMeasurementMNode> aliasChildren) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isAligned() {
+    return isAligned;
+  }
+
+  @Override
+  public void setAligned(boolean isAligned) {
+    this.isAligned = isAligned;
+  }
+
+  private void deserialize(byte[] value) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+    // skip the version flag and node type flag
+    byte flag = ReadWriteIOUtils.readBytes(byteBuffer, 2)[1];
+    isAligned = (RSchemaConstants.FLAG_IS_ALIGNED & flag) > 0;
+  }
+
+  @Override
+  public ILastCacheContainer getLastCacheContainer(String measurementId) {
+    return null;
+  }
+
+  @Override
+  public Map<String, ILastCacheContainer> getTemplateLastCaches() {
+    return null;
+  }
+
+  @Override
+  public boolean isEntity() {
+    return true;
+  }
+
+  @Override
+  public void serializeTo(MLogWriter logWriter) throws IOException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RInternalMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RInternalMNode.java
new file mode 100644
index 0000000000..b9113f6737
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RInternalMNode.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.container.IMNodeContainer;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+import org.apache.iotdb.db.metadata.template.Template;
+
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+
+public class RInternalMNode extends RMNode {
+
+  // schema template
+  protected Template schemaTemplate = null;
+
+  private volatile boolean useTemplate = false;
+
+  /**
+   * Constructor of MNode.
+   *
+   * @param fullPath
+   */
+  public RInternalMNode(String fullPath, RSchemaReadWriteHandler readWriteHandler) {
+    super(fullPath, readWriteHandler);
+  }
+
+  @Override
+  void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException {
+    String innerName =
+        RSchemaUtils.convertPartialPathToInner(
+            childName, childNameMaxLevel, RMNodeType.INTERNAL.getValue());
+    try {
+      readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  /** check whether the MNode has a child with the name */
+  @Override
+  public boolean hasChild(String name) {
+    String childPathName = fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    IMNode node = getNodeBySpecifiedPath(childPathName);
+    return node != null;
+  }
+
+  /** get the child with the name */
+  @Override
+  public IMNode getChild(String name) {
+    String childPathName = fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    return getNodeBySpecifiedPath(childPathName);
+  }
+
+  /**
+   * add a child to current mnode
+   *
+   * @param name child's name
+   * @param child child's node
+   * @return
+   */
+  @Override
+  public IMNode addChild(String name, IMNode child) {
+    child.setParent(this);
+    String childName = fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    int childNameMaxLevel = RSchemaUtils.getLevelByPartialPath(childName);
+    try {
+      if (child instanceof RMNode) {
+        ((RMNode) child).updateChildNode(childName, childNameMaxLevel);
+      } else {
+        logger.error("Rocksdb-based is currently used, but the node type received is not RMNode!");
+      }
+    } catch (MetadataException e) {
+      logger.error(e.getMessage());
+    }
+    return child;
+  }
+
+  /**
+   * Add a child to the current mnode.
+   *
+   * <p>This method will not take the child's name as one of the inputs and will also make this
+   * Mnode be child node's parent. All is to reduce the probability of mistaken by users and be more
+   * convenient for users to use. And the return of this method is used to conveniently construct a
+   * chain of time series for users.
+   *
+   * @param child child's node
+   * @return return the MNode already added
+   */
+  @Override
+  public IMNode addChild(IMNode child) {
+    addChild(child.getName(), child);
+    return child;
+  }
+
+  /** delete a child */
+  @Override
+  public IMNode deleteChild(String name) {
+    String childPathName = fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    int nodeNameMaxLevel = RSchemaUtils.getLevelByPartialPath(childPathName);
+    for (RMNodeType type : RMNodeType.values()) {
+      byte[] childInnerName =
+          RSchemaUtils.convertPartialPathToInner(childPathName, nodeNameMaxLevel, type.getValue())
+              .getBytes();
+      try {
+        if (readWriteHandler.keyExist(childInnerName)) {
+          readWriteHandler.deleteByKey(childInnerName);
+          return null;
+        }
+      } catch (RocksDBException e) {
+        logger.error(e.getMessage());
+      }
+    }
+    // The return value from this method is used to estimate memory size
+    // When based on Rocksdb, mNodes are not held in memory for long periods
+    // Therefore, the return value here is meaningless
+    return null;
+  }
+
+  /**
+   * replace a child of this mnode
+   *
+   * @param oldChildName measurement name
+   * @param newChildNode new child node
+   */
+  @Override
+  public void replaceChild(String oldChildName, IMNode newChildNode) {
+    if (!oldChildName.equals(newChildNode.getName())) {
+      throw new RuntimeException("New child's name must be the same as old child's name!");
+    }
+    deleteChild(oldChildName);
+    addChild(newChildNode);
+  }
+
+  @Override
+  public IMNodeContainer getChildren() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * get upper template of this node, remember we get nearest template alone this node to root
+   *
+   * @return upper template
+   */
+  @Override
+  public Template getUpperTemplate() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Template getSchemaTemplate() {
+    return schemaTemplate;
+  }
+
+  @Override
+  public void setSchemaTemplate(Template schemaTemplate) {
+    this.schemaTemplate = schemaTemplate;
+  }
+
+  @Override
+  public boolean isUseTemplate() {
+    return useTemplate;
+  }
+
+  @Override
+  public void setUseTemplate(boolean useTemplate) {
+    this.useTemplate = useTemplate;
+  }
+
+  @Override
+  public void serializeTo(MLogWriter logWriter) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNode.java
new file mode 100644
index 0000000000..ee9d9983c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNode.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mnode.container.IMNodeContainer;
+import org.apache.iotdb.db.metadata.mtree.store.disk.cache.CacheEntry;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public abstract class RMNode implements IMNode {
+  /** from root to this node, only be set when used once for InternalMNode */
+  protected String fullPath;
+
+  protected RSchemaReadWriteHandler readWriteHandler;
+
+  protected IMNode parent;
+
+  protected String name;
+
+  protected static final Logger logger = LoggerFactory.getLogger(RMNode.class);
+
+  /** Constructor of MNode. */
+  public RMNode(String fullPath, RSchemaReadWriteHandler readWriteHandler) {
+    this.fullPath = fullPath.intern();
+    this.name = fullPath.substring(fullPath.lastIndexOf(RSchemaConstants.PATH_SEPARATOR) + 1);
+    this.readWriteHandler = readWriteHandler;
+  }
+
+  abstract void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException;
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public IMNode getParent() {
+    if (parent != null) {
+      return parent;
+    }
+    String parentName =
+        fullPath.substring(0, fullPath.lastIndexOf(RSchemaConstants.PATH_SEPARATOR));
+    parent = getNodeBySpecifiedPath(parentName);
+    return parent;
+  }
+
+  protected IMNode getNodeBySpecifiedPath(String keyName) {
+    byte[] value = null;
+    IMNode node;
+    int nodeNameMaxLevel = RSchemaUtils.getLevelByPartialPath(keyName);
+    for (RMNodeType type : RMNodeType.values()) {
+      String parentInnerName =
+          RSchemaUtils.convertPartialPathToInner(keyName, nodeNameMaxLevel, type.getValue());
+      try {
+        value = readWriteHandler.get(null, parentInnerName.getBytes());
+      } catch (RocksDBException e) {
+        logger.error("Failed to get parent node.", e);
+      }
+      if (value != null) {
+        switch (type.getValue()) {
+          case RSchemaConstants.NODE_TYPE_SG:
+            node = new RStorageGroupMNode(keyName, value, readWriteHandler);
+            return node;
+          case RSchemaConstants.NODE_TYPE_INTERNAL:
+            node = new RInternalMNode(keyName, readWriteHandler);
+            return node;
+          case RSchemaConstants.NODE_TYPE_ENTITY:
+            node = new REntityMNode(keyName, value, readWriteHandler);
+            return node;
+          case RSchemaConstants.NODE_TYPE_MEASUREMENT:
+            node = new RMeasurementMNode(keyName, value, readWriteHandler);
+            return node;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void setParent(IMNode parent) {
+    this.parent = parent;
+  }
+
+  /**
+   * get partial path of this node
+   *
+   * @return partial path
+   */
+  @Override
+  public PartialPath getPartialPath() {
+    try {
+      return new PartialPath(fullPath);
+    } catch (IllegalPathException ignored) {
+      return null;
+    }
+  }
+
+  /** get full path */
+  @Override
+  public String getFullPath() {
+    return fullPath;
+  }
+
+  @Override
+  public void setFullPath(String fullPath) {
+    this.fullPath = fullPath;
+  }
+
+  @Override
+  public boolean isUseTemplate() {
+    return false;
+  }
+
+  @Override
+  public boolean isStorageGroup() {
+    return false;
+  }
+
+  @Override
+  public boolean isEntity() {
+    return false;
+  }
+
+  @Override
+  public boolean isMeasurement() {
+    return false;
+  }
+
+  @Override
+  public IStorageGroupMNode getAsStorageGroupMNode() {
+    if (isStorageGroup()) {
+      return (IStorageGroupMNode) this;
+    } else {
+      throw new UnsupportedOperationException("Wrong MNode Type");
+    }
+  }
+
+  @Override
+  public IEntityMNode getAsEntityMNode() {
+    if (isEntity()) {
+      return (IEntityMNode) this;
+    } else {
+      throw new UnsupportedOperationException("Wrong MNode Type");
+    }
+  }
+
+  @Override
+  public IMeasurementMNode getAsMeasurementMNode() {
+    if (isMeasurement()) {
+      return (IMeasurementMNode) this;
+    } else {
+      throw new UnsupportedOperationException("Wrong MNode Type");
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    RMNode mNode = (RMNode) o;
+    return Objects.equals(fullPath, mNode.getFullPath());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(fullPath);
+  }
+
+  @Override
+  public String toString() {
+    return this.getName();
+  }
+
+  @Override
+  public void moveDataToNewMNode(IMNode newMNode) {
+    throw new UnsupportedOperationException("Temporarily unsupported");
+  }
+
+  @Override
+  public CacheEntry getCacheEntry() {
+    throw new UnsupportedOperationException("Temporarily unsupported");
+  }
+
+  @Override
+  public void setCacheEntry(CacheEntry cacheEntry) {
+    throw new UnsupportedOperationException("Temporarily unsupported");
+  }
+
+  @Override
+  public void setChildren(IMNodeContainer children) {
+    throw new UnsupportedOperationException("Temporarily unsupported");
+  }
+  // end
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeType.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeType.java
new file mode 100644
index 0000000000..9eb1f8ad2c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ALIAS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_INTERNAL;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils.NODE_TYPE_ARRAY;
+
+public enum RMNodeType {
+  INTERNAL(NODE_TYPE_INTERNAL),
+  STORAGE_GROUP(NODE_TYPE_SG),
+  ENTITY(NODE_TYPE_ENTITY),
+  MEASUREMENT(NODE_TYPE_MEASUREMENT),
+  ALISA(NODE_TYPE_ALIAS);
+
+  private char value;
+
+  RMNodeType(char value) {
+    this.value = value;
+  }
+
+  public RMNodeType of(char type) {
+    return NODE_TYPE_ARRAY[type];
+  }
+
+  public char getValue() {
+    return value;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeValueType.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeValueType.java
new file mode 100644
index 0000000000..e333f5e03a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeValueType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+
+public enum RMNodeValueType {
+  TTL(RSchemaConstants.FLAG_SET_TTL, RSchemaConstants.DATA_BLOCK_TYPE_TTL),
+  SCHEMA(RSchemaConstants.FLAG_HAS_SCHEMA, RSchemaConstants.DATA_BLOCK_TYPE_SCHEMA),
+  ALIAS(RSchemaConstants.FLAG_HAS_ALIAS, RSchemaConstants.DATA_BLOCK_TYPE_ALIAS),
+  TAGS(RSchemaConstants.FLAG_HAS_TAGS, RSchemaConstants.DATA_BLOCK_TYPE_TAGS),
+  ATTRIBUTES(RSchemaConstants.FLAG_HAS_ATTRIBUTES, RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES),
+  ORIGIN_KEY(null, RSchemaConstants.DATA_BLOCK_TYPE_ORIGIN_KEY);
+
+  private byte type;
+  private Byte flag;
+
+  RMNodeValueType(Byte flag, byte type) {
+    this.type = type;
+    this.flag = flag;
+  }
+
+  public byte getType() {
+    return type;
+  }
+
+  public Byte getFlag() {
+    return flag;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
index 46341679cc..37a05773c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
@@ -16,74 +16,86 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.metadata.mnode;
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
 
 import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
-import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer;
 import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.container.IMNodeContainer;
-import org.apache.iotdb.db.metadata.mnode.container.MNodeContainers;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
 import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.rocksdb.RocksDBException;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
 
-public class MeasurementMNode extends MNode implements IMeasurementMNode {
-
-  private static final Logger logger = LoggerFactory.getLogger(MeasurementMNode.class);
+public class RMeasurementMNode extends RMNode implements IMeasurementMNode {
 
-  /** alias name of this measurement */
   protected String alias;
-  /** tag/attribute's start offset in tag file */
-  private long offset = -1;
-  /** measurement's Schema for one timeseries represented by current leaf node */
+
   private IMeasurementSchema schema;
-  /** last value cache */
-  private volatile ILastCacheContainer lastCacheContainer = null;
-  /** registered trigger */
-  private TriggerExecutor triggerExecutor = null;
+
+  private Map<String, String> tags;
+
+  private Map<String, String> attributes;
 
   /**
-   * MeasurementMNode factory method. The type of returned MeasurementMNode is according to the
-   * schema type. The default type is UnaryMeasurementMNode, which means if schema == null, an
-   * UnaryMeasurementMNode will return.
+   * Constructor of MNode.
+   *
+   * @param fullPath
    */
-  public static IMeasurementMNode getMeasurementMNode(
-      IEntityMNode parent, String measurementName, IMeasurementSchema schema, String alias) {
-    return new MeasurementMNode(parent, measurementName, schema, alias);
+  public RMeasurementMNode(String fullPath, RSchemaReadWriteHandler readWriteHandler) {
+    super(fullPath, readWriteHandler);
   }
 
-  /** @param alias alias of measurementName */
-  MeasurementMNode(IMNode parent, String name, IMeasurementSchema schema, String alias) {
-    super(parent, name);
-    this.schema = schema;
-    this.alias = alias;
+  @Override
+  void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException {
+    String innerName =
+        RSchemaUtils.convertPartialPathToInner(
+            childName, childNameMaxLevel, RMNodeType.MEASUREMENT.getValue());
+    // todo all existing attributes of the measurementNode need to be written
+    try {
+      readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  public RMeasurementMNode(
+      String fullPath, byte[] value, RSchemaReadWriteHandler readWriteHandler) {
+    super(fullPath, readWriteHandler);
+    deserialize(value);
   }
 
   @Override
   public IEntityMNode getParent() {
-    if (parent == null) {
+    if (super.getParent() == null) {
       return null;
     }
     return parent.getAsEntityMNode();
   }
 
-  /**
-   * get MeasurementPath of this node
-   *
-   * @return MeasurementPath
-   */
   @Override
   public MeasurementPath getMeasurementPath() {
     MeasurementPath result = new MeasurementPath(super.getPartialPath(), schema);
     result.setUnderAlignedEntity(getParent().isAligned());
+    if (alias != null && !alias.isEmpty()) {
+      result.setMeasurementAlias(alias);
+    }
     return result;
   }
 
@@ -92,25 +104,20 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
     return schema;
   }
 
-  /**
-   * get data type
-   *
-   * @param measurementId if it's a vector schema, we need sensor name of it
-   * @return measurement data type
-   */
   @Override
   public TSDataType getDataType(String measurementId) {
     return schema.getType();
   }
 
+  // unsupported exceptions
   @Override
   public long getOffset() {
-    return offset;
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void setOffset(long offset) {
-    this.offset = offset;
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -125,51 +132,53 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
 
   @Override
   public TriggerExecutor getTriggerExecutor() {
-    return triggerExecutor;
+    return null;
   }
 
   @Override
   public void setTriggerExecutor(TriggerExecutor triggerExecutor) {
-    this.triggerExecutor = triggerExecutor;
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public ILastCacheContainer getLastCacheContainer() {
-    if (lastCacheContainer == null) {
-      synchronized (this) {
-        if (lastCacheContainer == null) {
-          lastCacheContainer = new LastCacheContainer();
-        }
-      }
-    }
-    return lastCacheContainer;
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void setLastCacheContainer(ILastCacheContainer lastCacheContainer) {
-    this.lastCacheContainer = lastCacheContainer;
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void serializeTo(MLogWriter logWriter) throws IOException {
-    logWriter.serializeMeasurementMNode(this);
-  }
-
-  /** deserialize MeasurementMNode from MeasurementNodePlan */
-  public static IMeasurementMNode deserializeFrom(MeasurementMNodePlan plan) {
-    IMeasurementMNode node =
-        MeasurementMNode.getMeasurementMNode(
-            null, plan.getName(), plan.getSchema(), plan.getAlias());
-    node.setOffset(plan.getOffset());
-    return node;
+    throw new UnsupportedOperationException();
   }
 
-  @Override
-  public String getFullPath() {
-    if (fullPath != null) {
-      return fullPath;
+  private void deserialize(byte[] value) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+    // skip the version flag and node type flag
+    ReadWriteIOUtils.readBytes(byteBuffer, 2);
+
+    while (byteBuffer.hasRemaining()) {
+      byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+      switch (blockType) {
+        case RSchemaConstants.DATA_BLOCK_TYPE_ALIAS:
+          alias = ReadWriteIOUtils.readString(byteBuffer);
+          break;
+        case RSchemaConstants.DATA_BLOCK_TYPE_SCHEMA:
+          schema = MeasurementSchema.deserializeFrom(byteBuffer);
+          break;
+        case RSchemaConstants.DATA_BLOCK_TYPE_TAGS:
+          tags = ReadWriteIOUtils.readMap(byteBuffer);
+          break;
+        case RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES:
+          attributes = ReadWriteIOUtils.readMap(byteBuffer);
+          break;
+        default:
+          break;
+      }
     }
-    return concatFullPath();
   }
 
   @Override
@@ -179,8 +188,6 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
 
   @Override
   public IMNode getChild(String name) {
-    MeasurementMNode.logger.warn(
-        "current node {} is a MeasurementMNode, can not get child {}", this.name, name);
     throw new RuntimeException(
         String.format(
             "current node %s is a MeasurementMNode, can not get child %s", super.name, name));
@@ -189,7 +196,7 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
   @Override
   public IMNode addChild(String name, IMNode child) {
     // Do nothing
-    return null;
+    return child;
   }
 
   @Override
@@ -199,6 +206,7 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
 
   @Override
   public IMNode deleteChild(String name) {
+    // Do nothing
     return null;
   }
 
@@ -207,30 +215,23 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
 
   @Override
   public IMNodeContainer getChildren() {
-    return MNodeContainers.emptyMNodeContainer();
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public void setChildren(IMNodeContainer children) {
-    // Do nothing
+  public Template getUpperTemplate() {
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public Template getUpperTemplate() {
-    return parent.getUpperTemplate();
-  }
+  public void setSchemaTemplate(Template schemaTemplate) {}
 
   @Override
   public Template getSchemaTemplate() {
-    MeasurementMNode.logger.warn(
-        "current node {} is a MeasurementMNode, can not get Device Template", name);
     throw new RuntimeException(
         String.format("current node %s is a MeasurementMNode, can not get Device Template", name));
   }
 
-  @Override
-  public void setSchemaTemplate(Template schemaTemplate) {}
-
   @Override
   public void setUseTemplate(boolean useTemplate) {}
 
@@ -238,4 +239,24 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
   public boolean isMeasurement() {
     return true;
   }
+
+  public Map<String, String> getTags() {
+    return tags;
+  }
+
+  public Map<String, String> getAttributes() {
+    return attributes;
+  }
+
+  public void setTags(Map<String, String> tags) {
+    this.tags = tags;
+  }
+
+  public void setAttributes(Map<String, String> attributes) {
+    this.attributes = attributes;
+  }
+
+  public byte[] getRocksDBValue() throws IOException {
+    return RSchemaUtils.buildMeasurementNodeValue(schema, alias, tags, attributes);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RStorageGroupMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RStorageGroupMNode.java
new file mode 100644
index 0000000000..799149c674
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RStorageGroupMNode.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+
+public class RStorageGroupMNode extends RInternalMNode implements IStorageGroupMNode {
+
+  private long dataTTL;
+
+  /**
+   * Constructor of MNode.
+   *
+   * @param fullPath
+   */
+  public RStorageGroupMNode(
+      String fullPath, long dataTTL, RSchemaReadWriteHandler readWriteHandler) {
+    super(fullPath, readWriteHandler);
+    this.dataTTL = dataTTL;
+  }
+
+  public RStorageGroupMNode(
+      String fullPath, byte[] value, RSchemaReadWriteHandler readWriteHandler) {
+    super(fullPath, readWriteHandler);
+    Object ttl = RSchemaUtils.parseNodeValue(value, RMNodeValueType.TTL);
+    if (ttl == null) {
+      ttl = IoTDBDescriptor.getInstance().getConfig().getDefaultTTL();
+    }
+    this.dataTTL = (long) ttl;
+  }
+
+  @Override
+  void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException {
+    String innerName =
+        RSchemaUtils.convertPartialPathToInner(
+            childName, childNameMaxLevel, RMNodeType.STORAGE_GROUP.getValue());
+    long ttl = getDataTTL();
+    try {
+      readWriteHandler.updateNode(
+          innerName.getBytes(), RSchemaUtils.updateTTL(RSchemaConstants.DEFAULT_NODE_VALUE, ttl));
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  @Override
+  public boolean isStorageGroup() {
+    return true;
+  }
+
+  @Override
+  public boolean isEntity() {
+    return false;
+  }
+
+  @Override
+  public boolean isMeasurement() {
+    return false;
+  }
+
+  @Override
+  public void serializeTo(MLogWriter logWriter) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getDataTTL() {
+    return dataTTL;
+  }
+
+  @Override
+  public void setDataTTL(long dataTTL) {
+    this.dataTTL = dataTTL;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index 403b714d35..dd7554a3cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.service;
 
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.utils.MemUtils;
 
 import org.slf4j.Logger;
@@ -29,6 +31,10 @@ public class IoTDBShutdownHook extends Thread {
 
   @Override
   public void run() {
+    // close rocksdb if possible to avoid lose data
+    for (ISchemaRegion schemaRegion : SchemaEngine.getInstance().getAllSchemaRegions()) {
+      schemaRegion.clear();
+    }
     if (logger.isInfoEnabled()) {
       logger.info(
           "IoTDB exits. Jvm memory usage: {}",
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBBenchmark.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBBenchmark.java
new file mode 100644
index 0000000000..54441c30cf
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBBenchmark.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@Ignore
+public class MRocksDBBenchmark {
+  protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private final RSchemaRegion rocksDBManager;
+
+  private static final Logger logger = LoggerFactory.getLogger(MRocksDBBenchmark.class);
+
+  public MRocksDBBenchmark(RSchemaRegion rocksDBManager) {
+    this.rocksDBManager = rocksDBManager;
+  }
+
+  public List<RocksDBBenchmarkTask.BenchmarkResult> benchmarkResults = new ArrayList<>();
+
+  public void testTimeSeriesCreation(List<List<CreateTimeSeriesPlan>> timeSeriesSet)
+      throws IOException {
+    RocksDBBenchmarkTask<List<CreateTimeSeriesPlan>> task =
+        new RocksDBBenchmarkTask<>(timeSeriesSet, RocksDBTestUtils.WRITE_CLIENT_NUM, 100);
+    RocksDBBenchmarkTask.BenchmarkResult result =
+        task.runBatchWork(
+            createTimeSeriesPlans -> {
+              RocksDBBenchmarkTask.TaskResult taskResult = new RocksDBBenchmarkTask.TaskResult();
+              createTimeSeriesPlans.forEach(
+                  ts -> {
+                    try {
+                      rocksDBManager.createTimeseries(
+                          ts.getPath(),
+                          ts.getDataType(),
+                          ts.getEncoding(),
+                          ts.getCompressor(),
+                          ts.getProps(),
+                          ts.getAlias());
+                      taskResult.success++;
+                    } catch (Exception e) {
+                      e.printStackTrace();
+                      taskResult.failure++;
+                    }
+                  });
+              return taskResult;
+            },
+            "CreateTimeSeries");
+    benchmarkResults.add(result);
+  }
+
+  public void testMeasurementNodeQuery(Collection<String> queryTsSet) {
+    RocksDBBenchmarkTask<String> task =
+        new RocksDBBenchmarkTask<>(queryTsSet, RocksDBTestUtils.WRITE_CLIENT_NUM, 10000);
+    RocksDBBenchmarkTask.BenchmarkResult result =
+        task.runWork(
+            s -> {
+              try {
+                IMNode node = rocksDBManager.getMeasurementMNode(new PartialPath(s));
+                if (node != null) {
+                  logger.warn(node.toString());
+                }
+                return true;
+              } catch (Exception e) {
+                return false;
+              }
+            },
+            "MeasurementNodeQuery");
+    benchmarkResults.add(result);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java
new file mode 100644
index 0000000000..ed8364cb53
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mnode.InternalMNode;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Ignore
+public class MRocksDBUnitTest {
+
+  private RSchemaRegion rSchemaRegion;
+
+  @Before
+  public void setUp() throws MetadataException {
+
+    PartialPath storageGroup = new PartialPath("root.test.sg");
+    SchemaRegionId schemaRegionId = new SchemaRegionId((int) (Math.random() * 10));
+    MNode root = new InternalMNode(null, IoTDBConstant.PATH_ROOT);
+    IStorageGroupMNode storageGroupMNode = new StorageGroupMNode(root, "test", -1);
+    rSchemaRegion = new RSchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
+  }
+
+  @Test
+  public void testCreateTimeSeries() throws MetadataException, IOException {
+    PartialPath path = new PartialPath("root.test.sg.dd.m1");
+    rSchemaRegion.createTimeseries(
+        path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+
+    IMeasurementMNode m1 = rSchemaRegion.getMeasurementMNode(path);
+    Assert.assertNull(m1.getAlias());
+    Assert.assertEquals(m1.getSchema().getCompressor(), CompressionType.UNCOMPRESSED);
+    Assert.assertEquals(m1.getSchema().getEncodingType(), TSEncoding.PLAIN);
+    Assert.assertEquals(m1.getSchema().getType(), TSDataType.TEXT);
+    Assert.assertNull(m1.getSchema().getProps());
+
+    PartialPath path2 = new PartialPath("root.tt.sg.dd.m2");
+    rSchemaRegion.createTimeseries(
+        path2, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.GZIP, null, "ma");
+
+    rSchemaRegion.printScanAllKeys();
+
+    IMeasurementMNode m2 = rSchemaRegion.getMeasurementMNode(path2);
+    Assert.assertEquals(m2.getAlias(), "ma");
+    Assert.assertEquals(m2.getSchema().getCompressor(), CompressionType.GZIP);
+    Assert.assertEquals(m2.getSchema().getEncodingType(), TSEncoding.PLAIN);
+    Assert.assertEquals(m2.getSchema().getType(), TSDataType.DOUBLE);
+    Assert.assertNull(m2.getSchema().getProps());
+  }
+
+  @Test
+  public void testCreateAlignedTimeSeries() throws MetadataException, IOException {
+    PartialPath prefixPath = new PartialPath("root.tt.sg.dd");
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<TSEncoding> encodings = new ArrayList<>();
+    List<CompressionType> compressions = new ArrayList<>();
+
+    for (int i = 0; i < 6; i++) {
+      measurements.add("mm" + i);
+      dataTypes.add(TSDataType.INT32);
+      encodings.add(TSEncoding.PLAIN);
+      compressions.add(CompressionType.UNCOMPRESSED);
+    }
+
+    rSchemaRegion.createAlignedTimeSeries(
+        new CreateAlignedTimeSeriesPlan(
+            prefixPath, measurements, dataTypes, encodings, compressions, null, null, null));
+
+    try {
+      PartialPath path = new PartialPath("root.tt.sg.dd.mn");
+      rSchemaRegion.createTimeseries(
+          path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+      assert false;
+    } catch (MetadataException e) {
+      assert true;
+    }
+    rSchemaRegion.printScanAllKeys();
+  }
+
+  @Test
+  public void testNodeTypeCount() throws MetadataException, IOException {
+    List<PartialPath> storageGroups = new ArrayList<>();
+    storageGroups.add(new PartialPath("root.sg1"));
+    storageGroups.add(new PartialPath("root.inner.sg1"));
+    storageGroups.add(new PartialPath("root.inner.sg2"));
+    storageGroups.add(new PartialPath("root.inner1.inner2.inner3.sg"));
+    storageGroups.add(new PartialPath("root.inner1.inner2.sg"));
+
+    PartialPath path = new PartialPath("root.tt.sg.dd.m1");
+    rSchemaRegion.createTimeseries(
+        path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+
+    PartialPath path2 = new PartialPath("root.tt.sg.ddd.m2");
+    rSchemaRegion.createTimeseries(
+        path2, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, "ma");
+
+    rSchemaRegion.printScanAllKeys();
+
+    // test all timeseries number
+    Assert.assertEquals(
+        1, rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.tt.sg.dd.m1"), false));
+    Assert.assertEquals(2, rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false));
+
+    // test device number
+    Assert.assertEquals(
+        0, rSchemaRegion.getDevicesNum(new PartialPath("root.inner1.inner2"), false));
+    Assert.assertEquals(
+        0, rSchemaRegion.getDevicesNum(new PartialPath("root.inner1.inner2.**"), false));
+    Assert.assertEquals(2, rSchemaRegion.getDevicesNum(new PartialPath("root.tt.sg.**"), false));
+    Assert.assertEquals(1, rSchemaRegion.getDevicesNum(new PartialPath("root.tt.sg.dd"), false));
+
+    // todo wildcard
+
+    // test nodes count in given level
+    Assert.assertEquals(
+        2, rSchemaRegion.getNodesCountInGivenLevel(new PartialPath("root.tt.sg"), 3, false));
+  }
+
+  @Test
+  public void testPathPatternMatch() throws MetadataException, IOException {
+    List<PartialPath> timeseries = new ArrayList<>();
+    timeseries.add(new PartialPath("root.sg.d1.m1"));
+    timeseries.add(new PartialPath("root.sg.d1.m2"));
+    timeseries.add(new PartialPath("root.sg.d2.m1"));
+    timeseries.add(new PartialPath("root.sg.d2.m2"));
+    timeseries.add(new PartialPath("root.sg1.d1.m1"));
+    timeseries.add(new PartialPath("root.sg1.d1.m2"));
+    timeseries.add(new PartialPath("root.sg1.d2.m1"));
+    timeseries.add(new PartialPath("root.sg1.d2.m2"));
+
+    for (PartialPath path : timeseries) {
+      rSchemaRegion.createTimeseries(
+          path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+    }
+
+    //    mRocksDBManager.traverseByPatternPath(new PartialPath("root.sg.d1.*"));
+  }
+
+  @Test
+  public void testDeleteTimeseries() throws MetadataException, IOException {
+    List<PartialPath> timeseries = new ArrayList<>();
+    timeseries.add(new PartialPath("root.sg.d1.m1"));
+    timeseries.add(new PartialPath("root.sg.d1.m2"));
+    timeseries.add(new PartialPath("root.sg.d2.m1"));
+    timeseries.add(new PartialPath("root.sg.d2.m2"));
+    timeseries.add(new PartialPath("root.sg.d3.m1"));
+    timeseries.add(new PartialPath("root.sg.d3.m2"));
+    timeseries.add(new PartialPath("root.sg1.d1.m1"));
+    timeseries.add(new PartialPath("root.sg1.d1.m2"));
+    timeseries.add(new PartialPath("root.sg1.d2.m1"));
+    timeseries.add(new PartialPath("root.sg1.d2.m2"));
+
+    for (PartialPath path : timeseries) {
+      rSchemaRegion.createTimeseries(
+          path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+    }
+
+    Assert.assertEquals(
+        rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false), timeseries.size());
+
+    int count = timeseries.size();
+    rSchemaRegion.deleteTimeseries(new PartialPath("root.sg.d1.*"));
+    Assert.assertEquals(
+        rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false), count - 2);
+
+    count = count - 2;
+    rSchemaRegion.deleteTimeseries(new PartialPath("root.sg1.**"));
+    Assert.assertEquals(
+        rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false), count - 4);
+
+    count = count - 4;
+    rSchemaRegion.deleteTimeseries(new PartialPath("root.sg.*.m1"));
+    Assert.assertEquals(
+        rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false), count - 2);
+
+    rSchemaRegion.printScanAllKeys();
+  }
+
+  @Test
+  public void testUpsert() throws MetadataException, IOException {
+    PartialPath path2 = new PartialPath("root.tt.sg.dd.m2");
+    rSchemaRegion.createTimeseries(
+        path2, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, "ma");
+
+    IMeasurementMNode m1 = rSchemaRegion.getMeasurementMNode(new PartialPath("root.tt.sg.dd.m2"));
+    Assert.assertEquals(m1.getAlias(), "ma");
+
+    rSchemaRegion.changeAlias(new PartialPath("root.tt.sg.dd.m2"), "test");
+
+    IMeasurementMNode m2 = rSchemaRegion.getMeasurementMNode(new PartialPath("root.tt.sg.dd.m2"));
+    Assert.assertEquals(m2.getAlias(), "test");
+
+    IMeasurementMNode m3 = rSchemaRegion.getMeasurementMNode(new PartialPath("root.tt.sg.dd.test"));
+    Assert.assertEquals(m3.getAlias(), "test");
+  }
+
+  @After
+  public void clean() throws MetadataException {
+    rSchemaRegion.deleteSchemaRegion();
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandlerTest.java
new file mode 100644
index 0000000000..b0663e45eb
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandlerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler.ROCKSDB_PATH;
+
+@Ignore
+public class RSchemaReadWriteHandlerTest {
+
+  private RSchemaReadWriteHandler readWriteHandler;
+
+  @Before
+  public void setUp() throws MetadataException, RocksDBException {
+    File file = new File(ROCKSDB_PATH);
+    if (!file.exists()) {
+      file.mkdirs();
+    }
+    readWriteHandler = new RSchemaReadWriteHandler();
+  }
+
+  @Test
+  public void testKeyExistByTypes() throws IllegalPathException, RocksDBException {
+    List<PartialPath> timeseries = new ArrayList<>();
+    timeseries.add(new PartialPath("root.sg.d1.m1"));
+    timeseries.add(new PartialPath("root.sg.d1.m2"));
+    timeseries.add(new PartialPath("root.sg.d2.m1"));
+    timeseries.add(new PartialPath("root.sg.d2.m2"));
+    timeseries.add(new PartialPath("root.sg1.d1.m1"));
+    timeseries.add(new PartialPath("root.sg1.d1.m2"));
+    timeseries.add(new PartialPath("root.sg1.d2.m1"));
+    timeseries.add(new PartialPath("root.sg1.d2.m2"));
+
+    for (PartialPath path : timeseries) {
+      String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+      readWriteHandler.createNode(levelPath, RMNodeType.MEASUREMENT, path.getFullPath().getBytes());
+    }
+
+    for (PartialPath path : timeseries) {
+      String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+      CheckKeyResult result = readWriteHandler.keyExistByAllTypes(levelPath);
+      Assert.assertTrue(result.existAnyKey());
+      Assert.assertNotNull(result.getValue());
+      Assert.assertEquals(path.getFullPath(), new String(result.getValue()));
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegionAdvancedTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegionAdvancedTest.java
new file mode 100644
index 0000000000..f388f8099a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegionAdvancedTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RStorageGroupMNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Collections;
+
+@Ignore
+public class RSchemaRegionAdvancedTest {
+
+  private static RSchemaRegion schemaRegion = null;
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    PartialPath storageGroupPath = new PartialPath("root.vehicle.s0");
+    SchemaRegionId schemaRegionId = new SchemaRegionId(1);
+    RSchemaReadWriteHandler readWriteHandler = new RSchemaReadWriteHandler();
+    RStorageGroupMNode storageGroupMNode =
+        new RStorageGroupMNode(storageGroupPath.getFullPath(), -1, readWriteHandler);
+    schemaRegion = new RSchemaRegion(storageGroupPath, schemaRegionId, storageGroupMNode);
+
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s0.d0.s0"),
+        TSDataType.INT32,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s0.d0.s1"),
+        TSDataType.INT64,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s0.d0.s2"),
+        TSDataType.FLOAT,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s0.d0.s3"),
+        TSDataType.DOUBLE,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s0.d0.s4"),
+        TSDataType.BOOLEAN,
+        TSEncoding.PLAIN,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s0.d0.s5"),
+        TSDataType.TEXT,
+        TSEncoding.PLAIN,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s1.d1.s0"),
+        TSDataType.INT32,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s1.d1.s1"),
+        TSDataType.INT64,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s1.d1.s2"),
+        TSDataType.FLOAT,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s1.d1.s3"),
+        TSDataType.DOUBLE,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s1.d1.s4"),
+        TSDataType.BOOLEAN,
+        TSEncoding.PLAIN,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s1.d1.s5"),
+        TSDataType.TEXT,
+        TSEncoding.PLAIN,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testCache() throws MetadataException {
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s2.d2.s0"),
+        TSDataType.DOUBLE,
+        TSEncoding.RLE,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s2.d2.s1"),
+        TSDataType.BOOLEAN,
+        TSEncoding.PLAIN,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s2.d2.s2.g0"),
+        TSDataType.TEXT,
+        TSEncoding.PLAIN,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+    schemaRegion.createTimeseries(
+        new PartialPath("root.vehicle.s2.d2.s3"),
+        TSDataType.TEXT,
+        TSEncoding.PLAIN,
+        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        Collections.emptyMap());
+
+    IMNode node = schemaRegion.getDeviceNode(new PartialPath("root.vehicle.s0.d0"));
+    Assert.assertEquals(
+        TSDataType.INT32, node.getChild("s0").getAsMeasurementMNode().getSchema().getType());
+
+    Assert.assertFalse(schemaRegion.isPathExist(new PartialPath("root.vehicle.d100")));
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkEngine.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkEngine.java
new file mode 100644
index 0000000000..55d0fb355b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkEngine.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.logfile.MLogReader;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.FileUtils;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler.ROCKSDB_PATH;
+
+public class RocksDBBenchmarkEngine {
+  private static final Logger logger = LoggerFactory.getLogger(RocksDBBenchmarkEngine.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+  private static final int BIN_CAPACITY = 100 * 1000;
+
+  private File logFile;
+  public static List<List<CreateTimeSeriesPlan>> timeSeriesSet = new ArrayList<>();
+  public static Set<String> measurementPathSet = new HashSet<>();
+  public static Set<String> innerPathSet = new HashSet<>();
+  public static List<SetStorageGroupPlan> storageGroups = new ArrayList<>();
+
+  public RocksDBBenchmarkEngine() {
+    String schemaDir = config.getSchemaDir();
+    String logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG;
+    logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    System.out.println(logFile.getAbsolutePath());
+  }
+
+  public static void main(String[] args) {
+    RocksDBBenchmarkEngine engine = new RocksDBBenchmarkEngine();
+    engine.startTest();
+  }
+
+  @Test
+  public void startTest() {
+    RocksDBTestUtils.printMemInfo("Benchmark rocksdb start");
+    try {
+      prepareBenchmark();
+      RocksDBTestUtils.printBenchmarkBaseline(
+          storageGroups, timeSeriesSet, measurementPathSet, innerPathSet);
+      /** rocksdb benchmark * */
+      RSchemaRegion rocksDBManager = new RSchemaRegion();
+      MRocksDBBenchmark mRocksDBBenchmark = new MRocksDBBenchmark(rocksDBManager);
+      mRocksDBBenchmark.testTimeSeriesCreation(timeSeriesSet);
+      mRocksDBBenchmark.testMeasurementNodeQuery(measurementPathSet);
+      RocksDBTestUtils.printReport(mRocksDBBenchmark.benchmarkResults, "rocksDB");
+      RocksDBTestUtils.printMemInfo("Benchmark finished");
+    } catch (IOException | MetadataException e) {
+      logger.error("Error happened when run benchmark", e);
+    }
+  }
+
+  public void prepareBenchmark() throws IOException {
+    long time = System.currentTimeMillis();
+    if (!logFile.exists()) {
+      throw new FileNotFoundException("we need a mlog.bin to init the benchmark test");
+    }
+    try (MLogReader mLogReader =
+        new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG)) {
+      parseForTestSet(mLogReader);
+      System.out.println("spend " + (System.currentTimeMillis() - time) + "ms to prepare dataset");
+    } catch (Exception e) {
+      throw new IOException("Failed to parser mlog.bin for err:" + e);
+    }
+  }
+
+  private static void parseForTestSet(MLogReader mLogReader) throws IllegalPathException {
+    List<CreateTimeSeriesPlan> currentList = null;
+    SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan();
+    setStorageGroupPlan.setPath(new PartialPath("root.iotcloud"));
+    storageGroups.add(setStorageGroupPlan);
+    while (mLogReader.hasNext()) {
+      PhysicalPlan plan = null;
+      try {
+        plan = mLogReader.next();
+        if (plan == null) {
+          continue;
+        }
+        switch (plan.getOperatorType()) {
+          case CREATE_TIMESERIES:
+            CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
+            PartialPath path = createTimeSeriesPlan.getPath();
+            if (currentList == null) {
+              currentList = new ArrayList<>(BIN_CAPACITY);
+              timeSeriesSet.add(currentList);
+            }
+            measurementPathSet.add(path.getFullPath());
+
+            innerPathSet.add(path.getDevice());
+            String[] subNodes = ArrayUtils.subarray(path.getNodes(), 0, path.getNodes().length - 2);
+            innerPathSet.add(String.join(".", subNodes));
+
+            currentList.add(createTimeSeriesPlan);
+            if (currentList.size() >= BIN_CAPACITY) {
+              currentList = null;
+            }
+            break;
+          default:
+            break;
+        }
+      } catch (Exception e) {
+        logger.error(
+            "Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
+      }
+    }
+  }
+
+  private void resetEnv() throws IOException {
+    File rockdDbFile = new File(ROCKSDB_PATH);
+    if (rockdDbFile.exists() && rockdDbFile.isDirectory()) {
+      FileUtils.deleteDirectory(rockdDbFile);
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkTask.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkTask.java
new file mode 100644
index 0000000000..f57008975a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkTask.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class RocksDBBenchmarkTask<T> {
+  private Collection<T> dataSet;
+  private int workCount;
+  private int timeoutInMin;
+
+  RocksDBBenchmarkTask(Collection<T> dataSet, int workCount, int timeoutInMin) {
+    this.dataSet = dataSet;
+    this.workCount = workCount;
+    this.timeoutInMin = timeoutInMin;
+  }
+
+  public BenchmarkResult runBatchWork(Function<T, TaskResult> work, String name) {
+    ExecutorService executor = Executors.newFixedThreadPool(workCount);
+    AtomicInteger sucCounter = new AtomicInteger(0);
+    AtomicInteger failCounter = new AtomicInteger(0);
+    StopWatch stopWatch = new StopWatch();
+    stopWatch.start();
+    for (T input : dataSet) {
+      executor.submit(
+          () -> {
+            TaskResult result = work.apply(input);
+            sucCounter.addAndGet(result.success);
+            failCounter.addAndGet(result.failure);
+          });
+    }
+    try {
+      executor.shutdown();
+      executor.awaitTermination(timeoutInMin, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    stopWatch.stop();
+    return new BenchmarkResult(name, sucCounter.get(), failCounter.get(), stopWatch.getTime());
+  }
+
+  public BenchmarkResult runWork(Function<T, Boolean> work, String name) {
+    ExecutorService executor = Executors.newFixedThreadPool(workCount);
+    AtomicInteger sucCounter = new AtomicInteger(0);
+    AtomicInteger failCounter = new AtomicInteger(0);
+    StopWatch stopWatch = new StopWatch();
+    stopWatch.start();
+    for (T input : dataSet) {
+      executor.submit(
+          () -> {
+            if (work.apply(input)) {
+              sucCounter.incrementAndGet();
+            } else {
+              failCounter.incrementAndGet();
+            }
+          });
+    }
+    try {
+      executor.shutdown();
+      executor.awaitTermination(timeoutInMin, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    stopWatch.stop();
+    return new BenchmarkResult(name, sucCounter.get(), failCounter.get(), stopWatch.getTime());
+  }
+
+  public static class TaskResult {
+    public int success = 0;
+    public int failure = 0;
+  }
+
+  public static class BenchmarkResult {
+    public String name;
+    public long successCount;
+    public long failCount;
+    public long costInMs;
+
+    BenchmarkResult(String name, long successCount, long failCount, long cost) {
+      this.name = name;
+      this.successCount = successCount;
+      this.failCount = failCount;
+      this.costInMs = cost;
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBTestUtils.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBTestUtils.java
new file mode 100644
index 0000000000..85fd8da08f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBTestUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+
+import java.util.Collection;
+import java.util.List;
+
+public class RocksDBTestUtils {
+
+  public static int WRITE_CLIENT_NUM = 200;
+  public static int READ_CLIENT_NUM = 50;
+
+  public static void printBenchmarkBaseline(
+      List storageGroups,
+      List<List<CreateTimeSeriesPlan>> timeSeriesSet,
+      Collection queryTsSet,
+      Collection innerPathSet) {
+    System.out.println(
+        "#################################Benchmark configuration#################################");
+    System.out.println("-----------Configuration---------");
+    System.out.println("Write client num: " + WRITE_CLIENT_NUM);
+    System.out.println("Query client num: " + READ_CLIENT_NUM);
+    System.out.println("-----------Benchmark Data Set Size---------");
+    System.out.println("StorageGroup: " + storageGroups.size());
+    int count = 0;
+    for (List l : timeSeriesSet) {
+      count += l.size();
+    }
+    System.out.println("TimeSeries: " + count);
+    System.out.println("MeasurementNodeQuery: " + queryTsSet.size());
+    System.out.println("ChildrenNodeQuery: " + innerPathSet.size());
+  }
+
+  public static void printMemInfo(String stageInfo) {
+    System.out.printf(
+        "[%s] Memory used: %d%n",
+        stageInfo, Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
+  }
+
+  public static void printReport(
+      List<RocksDBBenchmarkTask.BenchmarkResult> results, String category) {
+    System.out.println(
+        String.format(
+            "\n\n#################################%s benchmark statistics#################################",
+            category));
+    System.out.println(String.format("%25s %15s %10s %15s", "", "success", "fail", "cost-in-ms"));
+    for (RocksDBBenchmarkTask.BenchmarkResult result : results) {
+      System.out.println(
+          String.format(
+              "%25s %15d %10d %15d",
+              result.name, result.successCount, result.failCount, result.costInMs));
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 76feee6bcf..ef8ce93535 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -113,14 +113,13 @@ public class ReadWriteIOUtils {
     return readBool(buffer);
   }
 
-  public static int write(Map<String, String> map, DataOutputStream stream) throws IOException {
+  public static int write(Map<String, String> map, OutputStream stream) throws IOException {
     if (map == null) {
       return write(NO_BYTE_TO_READ, stream);
     }
 
     int length = 0;
-    stream.writeInt(map.size());
-    length += 4;
+    length += write(map.size(), stream);
     for (Entry<String, String> entry : map.entrySet()) {
       length += write(entry.getKey(), stream);
       length += write(entry.getValue(), stream);
@@ -128,8 +127,7 @@ public class ReadWriteIOUtils {
     return length;
   }
 
-  public static void write(List<Map<String, String>> maps, DataOutputStream stream)
-      throws IOException {
+  public static void write(List<Map<String, String>> maps, OutputStream stream) throws IOException {
     for (Map<String, String> map : maps) {
       write(map, stream);
     }