You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/24 12:20:57 UTC

[iotdb] branch master updated: Add interface to support loading external properties and to do external series number checking (#7104)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ad3915bd3 Add interface to support loading external properties and to do external series number checking (#7104)
4ad3915bd3 is described below

commit 4ad3915bd38456122b5e4f23280a7e155cb0ffd9
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Aug 24 20:20:51 2022 +0800

    Add interface to support loading external properties and to do external series number checking (#7104)
---
 external-api/pom.xml                               |   60 +
 .../iotdb/external/api/IPropertiesLoader.java      |   37 +
 .../iotdb/external/api/ISeriesNumerLimiter.java    |   48 +
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |    4 +
 pom.xml                                            |    1 +
 server/pom.xml                                     |    5 +
 .../resources/conf/iotdb-datanode.properties       |   25 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   31 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 1388 ++++++++++----------
 .../metadata/SeriesNumberOverflowException.java    |   29 +
 .../db/metadata/schemaregion/SchemaEngine.java     |   30 +-
 .../schemaregion/SchemaRegionMemoryImpl.java       |  104 +-
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |   87 +-
 .../db/protocol/mqtt/PayloadFormatManager.java     |   13 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |    5 +
 .../java/org/apache/iotdb/db/service/NewIoTDB.java |    5 +
 .../org/apache/iotdb/db/utils/JarLoaderUtil.java   |  150 +++
 .../iotdb/tsfile/common/conf/TSFileDescriptor.java |    2 +-
 18 files changed, 1280 insertions(+), 744 deletions(-)

diff --git a/external-api/pom.xml b/external-api/pom.xml
new file mode 100644
index 0000000000..354c820513
--- /dev/null
+++ b/external-api/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>iotdb-parent</artifactId>
+        <groupId>org.apache.iotdb</groupId>
+        <version>0.14.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>external-api</artifactId>
+    <profiles>
+        <profile>
+            <id>get-jar-with-dependencies</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <version>${maven.assembly.version}</version>
+                        <configuration>
+                            <descriptorRefs>
+                                <descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <!-- this is used for inheritance merges -->
+                                <phase>package</phase>
+                                <!-- bind to the packaging phase -->
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
diff --git a/external-api/src/main/java/org/apche/iotdb/external/api/IPropertiesLoader.java b/external-api/src/main/java/org/apche/iotdb/external/api/IPropertiesLoader.java
new file mode 100644
index 0000000000..10c5b21e7e
--- /dev/null
+++ b/external-api/src/main/java/org/apche/iotdb/external/api/IPropertiesLoader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apche.iotdb.external.api;
+
+import java.nio.file.Path;
+import java.util.Properties;
+
+/**
+ * An interface to load properties from external properties file to override the default
+ * configurations
+ */
+public interface IPropertiesLoader {
+
+  /**
+   * Load Properties from specific file
+   *
+   * @param file The path of the properties file to open
+   * @return a property list with values in file.
+   */
+  Properties loadProperties(Path file);
+}
diff --git a/external-api/src/main/java/org/apche/iotdb/external/api/ISeriesNumerLimiter.java b/external-api/src/main/java/org/apche/iotdb/external/api/ISeriesNumerLimiter.java
new file mode 100644
index 0000000000..7b36b5d8b7
--- /dev/null
+++ b/external-api/src/main/java/org/apche/iotdb/external/api/ISeriesNumerLimiter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apche.iotdb.external.api;
+
+import java.util.Properties;
+
+/** An interface for series number limiting, users can implement their own limitation strategy */
+public interface ISeriesNumerLimiter {
+
+  /**
+   * do the necessary initialization
+   *
+   * @param properties Properties containing all the parameters needed to init
+   */
+  void init(Properties properties);
+
+  /**
+   * add time series
+   *
+   * @param number time series number for current createTimeSeries operation
+   * @return true if totalTimeSeriesNumber doesn't exceed the limit and current createTimeSeries
+   *     operation is allowed, otherwise false
+   */
+  boolean addTimeSeries(int number);
+
+  /**
+   * delete time series
+   *
+   * @param number time series number for current deleteTimeSeries operation
+   */
+  void deleteTimeSeries(int number);
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 41013228ac..4fa1095cb1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -201,6 +201,10 @@ public class IoTDBConstant {
   public static final String WAL_FOLDER_NAME = "wal";
   public static final String EXT_PIPE_FOLDER_NAME = "extPipe";
 
+  public static final String EXT_PROPERTIES_LOADER_FOLDER_NAME = "loader";
+
+  public static final String EXT_LIMITER = "limiter";
+
   // mqtt
   public static final String ENABLE_MQTT = "enable_mqtt_service";
   public static final String MQTT_HOST_NAME = "mqtt_host";
diff --git a/pom.xml b/pom.xml
index dd24724a43..f43d7aa313 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@
         <module>schema-engine-rocksdb</module>
         <module>udf-api</module>
         <module>rewrite-tsfile-tool</module>
+        <module>external-api</module>
     </modules>
     <!-- Properties Management -->
     <properties>
diff --git a/server/pom.xml b/server/pom.xml
index 347268084a..9d29b53919 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -75,6 +75,11 @@
             <artifactId>external-pipe-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>external-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-collections4</artifactId>
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 42d67f75f0..07c3b9e661 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -1096,4 +1096,27 @@ trigger_forward_http_pool_size=200
 # Trigger HTTP forward pool max connection for per route
 trigger_forward_http_pool_max_per_route=20
 # Trigger MQTT forward pool size
-trigger_forward_mqtt_pool_size=4
\ No newline at end of file
+trigger_forward_mqtt_pool_size=4
+
+
+####################
+### External Lib Configuration
+####################
+
+# external lib directory for properties loader
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# external_properties_loader_dir=ext\\loader
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# external_properties_loader_dir=ext/loader
+
+# external lib directory for limiter
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# external_limiter_dir=ext\\limiter
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# external_limiter_dir=ext/limiter
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 379ba1fe6c..302fcebaf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -62,7 +62,8 @@ public class IoTDBConfig {
 
   /* Names of Watermark methods */
   public static final String WATERMARK_GROUPED_LSB = "GroupBasedLSBMethod";
-  static final String CONFIG_NAME = "iotdb-datanode.properties";
+  public static final String CONFIG_NAME = "iotdb-datanode.properties";
+  public static final String EXTERNAL_CONFIG_NAME = "iotdb-datanode-external.properties";
   private static final Logger logger = LoggerFactory.getLogger(IoTDBConfig.class);
   private static final String MULTI_DIR_STRATEGY_PREFIX =
       "org.apache.iotdb.db.conf.directories.strategy.";
@@ -272,6 +273,16 @@ public class IoTDBConfig {
   private String mqttDir =
       IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.MQTT_FOLDER_NAME;
 
+  /** External lib directory for properties loader, stores user-uploaded JAR files */
+  private String externalPropertiesLoaderDir =
+      IoTDBConstant.EXT_FOLDER_NAME
+          + File.separator
+          + IoTDBConstant.EXT_PROPERTIES_LOADER_FOLDER_NAME;
+
+  /** External lib directory for limiter, stores user uploaded JAR files */
+  private String externalLimiterDir =
+      IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.EXT_LIMITER;
+
   /** Data directories. It can be settled as dataDirs = {"data1", "data2", "data3"}; */
   private String[] dataDirs = {
     IoTDBConstant.DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME
@@ -1077,6 +1088,8 @@ public class IoTDBConfig {
     temporaryLibDir = addHomeDir(temporaryLibDir);
     triggerDir = addHomeDir(triggerDir);
     mqttDir = addHomeDir(mqttDir);
+    externalPropertiesLoaderDir = addHomeDir(externalPropertiesLoaderDir);
+    externalLimiterDir = addHomeDir(externalLimiterDir);
     for (int i = 0; i < walDirs.length; i++) {
       walDirs[i] = addHomeDir(walDirs[i]);
     }
@@ -1314,6 +1327,22 @@ public class IoTDBConfig {
     this.mqttDir = mqttDir;
   }
 
+  public String getExternalPropertiesLoaderDir() {
+    return externalPropertiesLoaderDir;
+  }
+
+  public void setExternalPropertiesLoaderDir(String externalPropertiesLoaderDir) {
+    this.externalPropertiesLoaderDir = externalPropertiesLoaderDir;
+  }
+
+  public String getExternalLimiterDir() {
+    return externalLimiterDir;
+  }
+
+  public void setExternalLimiterDir(String externalLimiterDir) {
+    this.externalLimiterDir = externalLimiterDir;
+  }
+
   public String getMultiDirStrategyClassName() {
     return multiDirStrategyClassName;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6376b09224..182c126c56 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -58,8 +58,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Properties;
 
 public class IoTDBDescriptor {
@@ -129,6 +132,52 @@ public class IoTDBDescriptor {
     }
   }
 
+  /**
+   * get props url location
+   *
+   * @return url object if location exit, otherwise null.
+   */
+  public Path getExternalPropsPath() {
+    // Check if a config-directory was specified first.
+    String urlString = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
+    // If it wasn't, check if a home directory was provided (This usually contains a config)
+    if (urlString == null) {
+      urlString = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
+      if (urlString != null) {
+        urlString =
+            urlString
+                + File.separatorChar
+                + "conf"
+                + File.separatorChar
+                + IoTDBConfig.EXTERNAL_CONFIG_NAME;
+      } else {
+        // If this too wasn't provided, try to find a default config in the root of the classpath.
+        URL uri = IoTDBConfig.class.getResource("/" + IoTDBConfig.EXTERNAL_CONFIG_NAME);
+        if (uri != null) {
+          try {
+            return Paths.get(uri.toURI());
+          } catch (URISyntaxException e) {
+            return null;
+          }
+        }
+        logger.warn(
+            "Cannot find IOTDB_HOME or IOTDB_EXTERNAL_CONF environment variable when loading "
+                + "config file {}, use default configuration",
+            IoTDBConfig.EXTERNAL_CONFIG_NAME);
+        // update all data seriesPath
+        conf.updatePath();
+        return null;
+      }
+    }
+    // If a config location was provided, but it doesn't end with a properties file,
+    // append the default location.
+    else if (!urlString.endsWith(".properties")) {
+      urlString += (File.separatorChar + IoTDBConfig.EXTERNAL_CONFIG_NAME);
+    }
+
+    return Paths.get(urlString);
+  }
+
   /** load an property file and set TsfileDBConfig variables. */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private void loadProps() {
@@ -144,796 +193,789 @@ public class IoTDBDescriptor {
       Properties properties = new Properties();
       properties.load(inputStream);
 
-      conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress()));
+      loadProperties(properties);
 
-      // TODO: Use FQDN  to identify our nodes afterwards
-      try {
-        replaceHostnameWithIP();
-      } catch (Exception e) {
-        logger.info(String.format("replace hostname with ip failed, %s", e.getMessage()));
-      }
+    } catch (FileNotFoundException e) {
+      logger.warn("Fail to find config file {}", url, e);
+    } catch (IOException e) {
+      logger.warn("Cannot load config file, use default configuration", e);
+    } catch (Exception e) {
+      logger.warn("Incorrect format in config file, use default configuration", e);
+    } finally {
+      // update all data seriesPath
+      conf.updatePath();
+      commonDescriptor.getConfig().updatePath(System.getProperty(IoTDBConstant.IOTDB_HOME, null));
+      MetricConfigDescriptor.getInstance()
+          .getMetricConfig()
+          .updateRpcInstance(conf.getRpcAddress(), conf.getRpcPort());
+    }
+  }
 
-      conf.setRpcThriftCompressionEnable(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "rpc_thrift_compression_enable",
-                  Boolean.toString(conf.isRpcThriftCompressionEnable()))));
+  public void loadProperties(Properties properties) {
 
-      conf.setRpcAdvancedCompressionEnable(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "rpc_advanced_compression_enable",
-                  Boolean.toString(conf.isRpcAdvancedCompressionEnable()))));
+    conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress()));
 
-      conf.setConnectionTimeoutInMS(
-          Integer.parseInt(
-              properties.getProperty(
-                  "connection_timeout_ms", String.valueOf(conf.getConnectionTimeoutInMS()))));
+    // TODO: Use FQDN  to identify our nodes afterwards
+    try {
+      replaceHostnameWithIP();
+    } catch (Exception e) {
+      logger.info(String.format("replace hostname with ip failed, %s", e.getMessage()));
+    }
 
-      conf.setSelectorNumOfClientManager(
-          Integer.parseInt(
-              properties.getProperty(
-                  "selector_thread_nums_of_client_manager",
-                  String.valueOf(conf.getSelectorNumOfClientManager()))));
+    conf.setRpcThriftCompressionEnable(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "rpc_thrift_compression_enable",
+                Boolean.toString(conf.isRpcThriftCompressionEnable()))));
 
-      conf.setRpcPort(
-          Integer.parseInt(
-              properties.getProperty(IoTDBConstant.RPC_PORT, Integer.toString(conf.getRpcPort()))));
+    conf.setRpcAdvancedCompressionEnable(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "rpc_advanced_compression_enable",
+                Boolean.toString(conf.isRpcAdvancedCompressionEnable()))));
 
-      conf.setEnableInfluxDBRpcService(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_influxdb_rpc_service",
-                  Boolean.toString(conf.isEnableInfluxDBRpcService()))));
+    conf.setConnectionTimeoutInMS(
+        Integer.parseInt(
+            properties.getProperty(
+                "connection_timeout_ms", String.valueOf(conf.getConnectionTimeoutInMS()))));
 
-      conf.setInfluxDBRpcPort(
-          Integer.parseInt(
-              properties.getProperty(
-                  "influxdb_rpc_port", Integer.toString(conf.getInfluxDBRpcPort()))));
+    conf.setSelectorNumOfClientManager(
+        Integer.parseInt(
+            properties.getProperty(
+                "selector_thread_nums_of_client_manager",
+                String.valueOf(conf.getSelectorNumOfClientManager()))));
 
-      conf.setTimestampPrecision(
-          properties.getProperty("timestamp_precision", conf.getTimestampPrecision()));
+    conf.setRpcPort(
+        Integer.parseInt(
+            properties.getProperty(IoTDBConstant.RPC_PORT, Integer.toString(conf.getRpcPort()))));
 
-      conf.setBufferedArraysMemoryProportion(
-          Double.parseDouble(
-              properties.getProperty(
-                  "buffered_arrays_memory_proportion",
-                  Double.toString(conf.getBufferedArraysMemoryProportion()))));
+    conf.setEnableInfluxDBRpcService(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_influxdb_rpc_service",
+                Boolean.toString(conf.isEnableInfluxDBRpcService()))));
 
-      conf.setFlushProportion(
-          Double.parseDouble(
-              properties.getProperty(
-                  "flush_proportion", Double.toString(conf.getFlushProportion()))));
+    conf.setInfluxDBRpcPort(
+        Integer.parseInt(
+            properties.getProperty(
+                "influxdb_rpc_port", Integer.toString(conf.getInfluxDBRpcPort()))));
 
-      conf.setRejectProportion(
-          Double.parseDouble(
-              properties.getProperty(
-                  "reject_proportion", Double.toString(conf.getRejectProportion()))));
+    conf.setTimestampPrecision(
+        properties.getProperty("timestamp_precision", conf.getTimestampPrecision()));
 
-      conf.setStorageGroupSizeReportThreshold(
-          Long.parseLong(
-              properties.getProperty(
-                  "storage_group_report_threshold",
-                  Long.toString(conf.getStorageGroupSizeReportThreshold()))));
+    conf.setBufferedArraysMemoryProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "buffered_arrays_memory_proportion",
+                Double.toString(conf.getBufferedArraysMemoryProportion()))));
 
-      conf.setMetaDataCacheEnable(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "meta_data_cache_enable", Boolean.toString(conf.isMetaDataCacheEnable()))));
+    conf.setFlushProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "flush_proportion", Double.toString(conf.getFlushProportion()))));
 
-      initMemoryAllocate(properties);
+    conf.setRejectProportion(
+        Double.parseDouble(
+            properties.getProperty(
+                "reject_proportion", Double.toString(conf.getRejectProportion()))));
 
-      loadWALProps(properties);
+    conf.setStorageGroupSizeReportThreshold(
+        Long.parseLong(
+            properties.getProperty(
+                "storage_group_report_threshold",
+                Long.toString(conf.getStorageGroupSizeReportThreshold()))));
 
-      String systemDir = properties.getProperty("system_dir");
-      if (systemDir == null) {
-        systemDir = properties.getProperty("base_dir");
-        if (systemDir != null) {
-          systemDir = FilePathUtils.regularizePath(systemDir) + IoTDBConstant.SYSTEM_FOLDER_NAME;
-        } else {
-          systemDir = conf.getSystemDir();
-        }
-      }
-      conf.setSystemDir(systemDir);
+    conf.setMetaDataCacheEnable(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "meta_data_cache_enable", Boolean.toString(conf.isMetaDataCacheEnable()))));
 
-      conf.setSchemaDir(
-          FilePathUtils.regularizePath(conf.getSystemDir()) + IoTDBConstant.SCHEMA_FOLDER_NAME);
+    initMemoryAllocate(properties);
 
-      conf.setQueryDir(
-          FilePathUtils.regularizePath(conf.getSystemDir() + IoTDBConstant.QUERY_FOLDER_NAME));
+    loadWALProps(properties);
 
-      conf.setTracingDir(properties.getProperty("tracing_dir", conf.getTracingDir()));
+    String systemDir = properties.getProperty("system_dir");
+    if (systemDir == null) {
+      systemDir = properties.getProperty("base_dir");
+      if (systemDir != null) {
+        systemDir = FilePathUtils.regularizePath(systemDir) + IoTDBConstant.SYSTEM_FOLDER_NAME;
+      } else {
+        systemDir = conf.getSystemDir();
+      }
+    }
+    conf.setSystemDir(systemDir);
 
-      conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0]).split(","));
+    conf.setSchemaDir(
+        FilePathUtils.regularizePath(conf.getSystemDir()) + IoTDBConstant.SCHEMA_FOLDER_NAME);
 
-      conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
+    conf.setQueryDir(
+        FilePathUtils.regularizePath(conf.getSystemDir() + IoTDBConstant.QUERY_FOLDER_NAME));
 
-      int mlogBufferSize =
-          Integer.parseInt(
-              properties.getProperty(
-                  "mlog_buffer_size", Integer.toString(conf.getMlogBufferSize())));
-      if (mlogBufferSize > 0) {
-        conf.setMlogBufferSize(mlogBufferSize);
-      }
+    conf.setTracingDir(properties.getProperty("tracing_dir", conf.getTracingDir()));
 
-      long forceMlogPeriodInMs =
-          Long.parseLong(
-              properties.getProperty(
-                  "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
-      if (forceMlogPeriodInMs > 0) {
-        conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
-      }
+    conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0]).split(","));
 
-      conf.setMultiDirStrategyClassName(
-          properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
+    conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
 
-      conf.setBatchSize(
-          Integer.parseInt(
-              properties.getProperty("batch_size", Integer.toString(conf.getBatchSize()))));
+    int mlogBufferSize =
+        Integer.parseInt(
+            properties.getProperty("mlog_buffer_size", Integer.toString(conf.getMlogBufferSize())));
+    if (mlogBufferSize > 0) {
+      conf.setMlogBufferSize(mlogBufferSize);
+    }
 
-      conf.setEnableMemControl(
-          (Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_mem_control", Boolean.toString(conf.isEnableMemControl())))));
-      logger.info("IoTDB enable memory control: {}", conf.isEnableMemControl());
+    long forceMlogPeriodInMs =
+        Long.parseLong(
+            properties.getProperty(
+                "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
+    if (forceMlogPeriodInMs > 0) {
+      conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
+    }
 
-      long seqTsFileSize =
-          Long.parseLong(
-              properties
-                  .getProperty("seq_tsfile_size", Long.toString(conf.getSeqTsFileSize()))
-                  .trim());
-      if (seqTsFileSize >= 0) {
-        conf.setSeqTsFileSize(seqTsFileSize);
-      }
+    conf.setMultiDirStrategyClassName(
+        properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
 
-      long unSeqTsFileSize =
-          Long.parseLong(
-              properties
-                  .getProperty("unseq_tsfile_size", Long.toString(conf.getUnSeqTsFileSize()))
-                  .trim());
-      if (unSeqTsFileSize >= 0) {
-        conf.setUnSeqTsFileSize(unSeqTsFileSize);
-      }
+    conf.setBatchSize(
+        Integer.parseInt(
+            properties.getProperty("batch_size", Integer.toString(conf.getBatchSize()))));
 
-      long memTableSizeThreshold =
-          Long.parseLong(
-              properties
-                  .getProperty(
-                      "memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold()))
-                  .trim());
-      if (memTableSizeThreshold > 0) {
-        conf.setMemtableSizeThreshold(memTableSizeThreshold);
-      }
+    conf.setEnableMemControl(
+        (Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_mem_control", Boolean.toString(conf.isEnableMemControl())))));
+    logger.info("IoTDB enable memory control: {}", conf.isEnableMemControl());
 
-      conf.setAvgSeriesPointNumberThreshold(
-          Integer.parseInt(
-              properties.getProperty(
-                  "avg_series_point_number_threshold",
-                  Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
+    long seqTsFileSize =
+        Long.parseLong(
+            properties
+                .getProperty("seq_tsfile_size", Long.toString(conf.getSeqTsFileSize()))
+                .trim());
+    if (seqTsFileSize >= 0) {
+      conf.setSeqTsFileSize(seqTsFileSize);
+    }
 
-      conf.setCheckPeriodWhenInsertBlocked(
-          Integer.parseInt(
-              properties.getProperty(
-                  "check_period_when_insert_blocked",
-                  Integer.toString(conf.getCheckPeriodWhenInsertBlocked()))));
+    long unSeqTsFileSize =
+        Long.parseLong(
+            properties
+                .getProperty("unseq_tsfile_size", Long.toString(conf.getUnSeqTsFileSize()))
+                .trim());
+    if (unSeqTsFileSize >= 0) {
+      conf.setUnSeqTsFileSize(unSeqTsFileSize);
+    }
 
-      conf.setMaxWaitingTimeWhenInsertBlocked(
-          Integer.parseInt(
-              properties.getProperty(
-                  "max_waiting_time_when_insert_blocked",
-                  Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
+    long memTableSizeThreshold =
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold()))
+                .trim());
+    if (memTableSizeThreshold > 0) {
+      conf.setMemtableSizeThreshold(memTableSizeThreshold);
+    }
 
-      conf.setIoTaskQueueSizeForFlushing(
-          Integer.parseInt(
-              properties.getProperty(
-                  "io_task_queue_size_for_flushing",
-                  Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
+    conf.setAvgSeriesPointNumberThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "avg_series_point_number_threshold",
+                Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
 
-      conf.setCompactionScheduleIntervalInMs(
-          Long.parseLong(
-              properties.getProperty(
-                  "compaction_schedule_interval_in_ms",
-                  Long.toString(conf.getCompactionScheduleIntervalInMs()))));
+    conf.setCheckPeriodWhenInsertBlocked(
+        Integer.parseInt(
+            properties.getProperty(
+                "check_period_when_insert_blocked",
+                Integer.toString(conf.getCheckPeriodWhenInsertBlocked()))));
 
-      conf.setCompactionSubmissionIntervalInMs(
-          Long.parseLong(
-              properties.getProperty(
-                  "compaction_submission_interval_in_ms",
-                  Long.toString(conf.getCompactionSubmissionIntervalInMs()))));
+    conf.setMaxWaitingTimeWhenInsertBlocked(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_waiting_time_when_insert_blocked",
+                Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
 
-      conf.setEnableCrossSpaceCompaction(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_cross_space_compaction",
-                  Boolean.toString(conf.isEnableCrossSpaceCompaction()))));
+    conf.setIoTaskQueueSizeForFlushing(
+        Integer.parseInt(
+            properties.getProperty(
+                "io_task_queue_size_for_flushing",
+                Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
 
-      conf.setEnableSeqSpaceCompaction(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_seq_space_compaction",
-                  Boolean.toString(conf.isEnableSeqSpaceCompaction()))));
+    conf.setCompactionScheduleIntervalInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "compaction_schedule_interval_in_ms",
+                Long.toString(conf.getCompactionScheduleIntervalInMs()))));
 
-      conf.setEnableUnseqSpaceCompaction(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_unseq_space_compaction",
-                  Boolean.toString(conf.isEnableUnseqSpaceCompaction()))));
+    conf.setCompactionSubmissionIntervalInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "compaction_submission_interval_in_ms",
+                Long.toString(conf.getCompactionSubmissionIntervalInMs()))));
 
-      conf.setCrossCompactionSelector(
-          CrossCompactionSelector.getCrossCompactionSelector(
-              properties.getProperty(
-                  "cross_selector", conf.getCrossCompactionSelector().toString())));
+    conf.setEnableCrossSpaceCompaction(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_cross_space_compaction",
+                Boolean.toString(conf.isEnableCrossSpaceCompaction()))));
 
-      conf.setInnerSequenceCompactionSelector(
-          InnerSequenceCompactionSelector.getInnerSequenceCompactionSelector(
-              properties.getProperty(
-                  "inner_seq_selector", conf.getInnerSequenceCompactionSelector().toString())));
+    conf.setEnableSeqSpaceCompaction(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_seq_space_compaction",
+                Boolean.toString(conf.isEnableSeqSpaceCompaction()))));
 
-      conf.setInnerUnsequenceCompactionSelector(
-          InnerUnsequenceCompactionSelector.getInnerUnsequenceCompactionSelector(
-              properties.getProperty(
-                  "inner_unseq_selector", conf.getInnerUnsequenceCompactionSelector().toString())));
+    conf.setEnableUnseqSpaceCompaction(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_unseq_space_compaction",
+                Boolean.toString(conf.isEnableUnseqSpaceCompaction()))));
 
-      conf.setInnerSeqCompactionPerformer(
-          InnerSeqCompactionPerformer.getInnerSeqCompactionPerformer(
-              properties.getProperty(
-                  "inner_seq_performer", conf.getInnerSeqCompactionPerformer().toString())));
+    conf.setCrossCompactionSelector(
+        CrossCompactionSelector.getCrossCompactionSelector(
+            properties.getProperty(
+                "cross_selector", conf.getCrossCompactionSelector().toString())));
 
-      conf.setInnerUnseqCompactionPerformer(
-          InnerUnseqCompactionPerformer.getInnerUnseqCompactionPerformer(
-              properties.getProperty(
-                  "inner_unseq_performer", conf.getInnerUnseqCompactionPerformer().toString())));
+    conf.setInnerSequenceCompactionSelector(
+        InnerSequenceCompactionSelector.getInnerSequenceCompactionSelector(
+            properties.getProperty(
+                "inner_seq_selector", conf.getInnerSequenceCompactionSelector().toString())));
 
-      conf.setCrossCompactionPerformer(
-          CrossCompactionPerformer.getCrossCompactionPerformer(
-              properties.getProperty(
-                  "cross_performer", conf.getCrossCompactionPerformer().toString())));
+    conf.setInnerUnsequenceCompactionSelector(
+        InnerUnsequenceCompactionSelector.getInnerUnsequenceCompactionSelector(
+            properties.getProperty(
+                "inner_unseq_selector", conf.getInnerUnsequenceCompactionSelector().toString())));
 
-      conf.setCompactionPriority(
-          CompactionPriority.valueOf(
-              properties.getProperty(
-                  "compaction_priority", conf.getCompactionPriority().toString())));
+    conf.setInnerSeqCompactionPerformer(
+        InnerSeqCompactionPerformer.getInnerSeqCompactionPerformer(
+            properties.getProperty(
+                "inner_seq_performer", conf.getInnerSeqCompactionPerformer().toString())));
 
-      int subtaskNum =
-          Integer.parseInt(
-              properties.getProperty(
-                  "sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum())));
-      subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum;
-      conf.setSubCompactionTaskNum(subtaskNum);
+    conf.setInnerUnseqCompactionPerformer(
+        InnerUnseqCompactionPerformer.getInnerUnseqCompactionPerformer(
+            properties.getProperty(
+                "inner_unseq_performer", conf.getInnerUnseqCompactionPerformer().toString())));
 
-      conf.setQueryTimeoutThreshold(
-          Long.parseLong(
-              properties.getProperty(
-                  "query_timeout_threshold", Long.toString(conf.getQueryTimeoutThreshold()))));
+    conf.setCrossCompactionPerformer(
+        CrossCompactionPerformer.getCrossCompactionPerformer(
+            properties.getProperty(
+                "cross_performer", conf.getCrossCompactionPerformer().toString())));
 
-      conf.setSessionTimeoutThreshold(
-          Integer.parseInt(
-              properties.getProperty(
-                  "session_timeout_threshold",
-                  Integer.toString(conf.getSessionTimeoutThreshold()))));
-      conf.setMaxNumberOfSyncFileRetry(
-          Integer.parseInt(
-              properties
-                  .getProperty(
-                      "max_number_of_sync_file_retry",
-                      Integer.toString(conf.getMaxNumberOfSyncFileRetry()))
-                  .trim()));
+    conf.setCompactionPriority(
+        CompactionPriority.valueOf(
+            properties.getProperty(
+                "compaction_priority", conf.getCompactionPriority().toString())));
 
-      conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
+    int subtaskNum =
+        Integer.parseInt(
+            properties.getProperty(
+                "sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum())));
+    subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum;
+    conf.setSubCompactionTaskNum(subtaskNum);
 
-      conf.setConcurrentFlushThread(
-          Integer.parseInt(
-              properties.getProperty(
-                  "concurrent_flush_thread", Integer.toString(conf.getConcurrentFlushThread()))));
+    conf.setQueryTimeoutThreshold(
+        Long.parseLong(
+            properties.getProperty(
+                "query_timeout_threshold", Long.toString(conf.getQueryTimeoutThreshold()))));
 
-      if (conf.getConcurrentFlushThread() <= 0) {
-        conf.setConcurrentFlushThread(Runtime.getRuntime().availableProcessors());
-      }
+    conf.setSessionTimeoutThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "session_timeout_threshold", Integer.toString(conf.getSessionTimeoutThreshold()))));
+    conf.setMaxNumberOfSyncFileRetry(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "max_number_of_sync_file_retry",
+                    Integer.toString(conf.getMaxNumberOfSyncFileRetry()))
+                .trim()));
 
-      // start: index parameter setting
-      conf.setIndexRootFolder(properties.getProperty("index_root_dir", conf.getIndexRootFolder()));
+    conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
 
-      conf.setEnableIndex(
-          Boolean.parseBoolean(
-              properties.getProperty("enable_index", Boolean.toString(conf.isEnableIndex()))));
+    conf.setConcurrentFlushThread(
+        Integer.parseInt(
+            properties.getProperty(
+                "concurrent_flush_thread", Integer.toString(conf.getConcurrentFlushThread()))));
 
-      conf.setConcurrentIndexBuildThread(
-          Integer.parseInt(
-              properties.getProperty(
-                  "concurrent_index_build_thread",
-                  Integer.toString(conf.getConcurrentIndexBuildThread()))));
-      if (conf.getConcurrentIndexBuildThread() <= 0) {
-        conf.setConcurrentIndexBuildThread(Runtime.getRuntime().availableProcessors());
-      }
+    if (conf.getConcurrentFlushThread() <= 0) {
+      conf.setConcurrentFlushThread(Runtime.getRuntime().availableProcessors());
+    }
 
-      conf.setDefaultIndexWindowRange(
-          Integer.parseInt(
-              properties.getProperty(
-                  "default_index_window_range",
-                  Integer.toString(conf.getDefaultIndexWindowRange()))));
+    // start: index parameter setting
+    conf.setIndexRootFolder(properties.getProperty("index_root_dir", conf.getIndexRootFolder()));
 
-      conf.setConcurrentQueryThread(
-          Integer.parseInt(
-              properties.getProperty(
-                  "concurrent_query_thread", Integer.toString(conf.getConcurrentQueryThread()))));
+    conf.setEnableIndex(
+        Boolean.parseBoolean(
+            properties.getProperty("enable_index", Boolean.toString(conf.isEnableIndex()))));
 
-      if (conf.getConcurrentQueryThread() <= 0) {
-        conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
-      }
+    conf.setConcurrentIndexBuildThread(
+        Integer.parseInt(
+            properties.getProperty(
+                "concurrent_index_build_thread",
+                Integer.toString(conf.getConcurrentIndexBuildThread()))));
+    if (conf.getConcurrentIndexBuildThread() <= 0) {
+      conf.setConcurrentIndexBuildThread(Runtime.getRuntime().availableProcessors());
+    }
 
-      conf.setMaxAllowedConcurrentQueries(
-          Integer.parseInt(
-              properties.getProperty(
-                  "max_allowed_concurrent_queries",
-                  Integer.toString(conf.getMaxAllowedConcurrentQueries()))));
+    conf.setDefaultIndexWindowRange(
+        Integer.parseInt(
+            properties.getProperty(
+                "default_index_window_range",
+                Integer.toString(conf.getDefaultIndexWindowRange()))));
 
-      if (conf.getMaxAllowedConcurrentQueries() <= 0) {
-        conf.setMaxAllowedConcurrentQueries(1000);
-      }
+    conf.setConcurrentQueryThread(
+        Integer.parseInt(
+            properties.getProperty(
+                "concurrent_query_thread", Integer.toString(conf.getConcurrentQueryThread()))));
 
-      conf.setConcurrentSubRawQueryThread(
-          Integer.parseInt(
-              properties.getProperty(
-                  "concurrent_sub_rawQuery_thread",
-                  Integer.toString(conf.getConcurrentSubRawQueryThread()))));
+    if (conf.getConcurrentQueryThread() <= 0) {
+      conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
+    }
 
-      if (conf.getConcurrentSubRawQueryThread() <= 0) {
-        conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
-      }
+    conf.setMaxAllowedConcurrentQueries(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_allowed_concurrent_queries",
+                Integer.toString(conf.getMaxAllowedConcurrentQueries()))));
 
-      conf.setRawQueryBlockingQueueCapacity(
-          Integer.parseInt(
-              properties.getProperty(
-                  "raw_query_blocking_queue_capacity",
-                  Integer.toString(conf.getRawQueryBlockingQueueCapacity()))));
+    if (conf.getMaxAllowedConcurrentQueries() <= 0) {
+      conf.setMaxAllowedConcurrentQueries(1000);
+    }
 
-      conf.setSchemaRegionDeviceNodeCacheSize(
-          Integer.parseInt(
-              properties
-                  .getProperty(
-                      "schema_region_device_node_cache_size",
-                      Integer.toString(conf.getSchemaRegionDeviceNodeCacheSize()))
-                  .trim()));
+    conf.setConcurrentSubRawQueryThread(
+        Integer.parseInt(
+            properties.getProperty(
+                "concurrent_sub_rawQuery_thread",
+                Integer.toString(conf.getConcurrentSubRawQueryThread()))));
 
-      conf.setmRemoteSchemaCacheSize(
-          Integer.parseInt(
-              properties
-                  .getProperty(
-                      "remote_schema_cache_size",
-                      Integer.toString(conf.getmRemoteSchemaCacheSize()))
-                  .trim()));
+    if (conf.getConcurrentSubRawQueryThread() <= 0) {
+      conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
+    }
 
-      conf.setLanguageVersion(
-          properties.getProperty("language_version", conf.getLanguageVersion()).trim());
+    conf.setRawQueryBlockingQueueCapacity(
+        Integer.parseInt(
+            properties.getProperty(
+                "raw_query_blocking_queue_capacity",
+                Integer.toString(conf.getRawQueryBlockingQueueCapacity()))));
 
-      if (properties.containsKey("chunk_buffer_pool_enable")) {
-        conf.setChunkBufferPoolEnable(
-            Boolean.parseBoolean(properties.getProperty("chunk_buffer_pool_enable")));
-      }
+    conf.setSchemaRegionDeviceNodeCacheSize(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "schema_region_device_node_cache_size",
+                    Integer.toString(conf.getSchemaRegionDeviceNodeCacheSize()))
+                .trim()));
 
-      conf.setEnableExternalSort(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_external_sort", Boolean.toString(conf.isEnableExternalSort()))));
-      conf.setExternalSortThreshold(
-          Integer.parseInt(
-              properties.getProperty(
-                  "external_sort_threshold", Integer.toString(conf.getExternalSortThreshold()))));
-      conf.setUpgradeThreadNum(
-          Integer.parseInt(
-              properties.getProperty(
-                  "upgrade_thread_num", Integer.toString(conf.getUpgradeThreadNum()))));
-      conf.setCrossCompactionMemoryBudget(
-          Long.parseLong(
-              properties.getProperty(
-                  "cross_compaction_memory_budget",
-                  Long.toString(conf.getCrossCompactionMemoryBudget()))));
-      conf.setCrossCompactionFileSelectionTimeBudget(
-          Long.parseLong(
-              properties.getProperty(
-                  "cross_compaction_file_selection_time_budget",
-                  Long.toString(conf.getCrossCompactionFileSelectionTimeBudget()))));
-      conf.setMergeIntervalSec(
-          Long.parseLong(
-              properties.getProperty(
-                  "merge_interval_sec", Long.toString(conf.getMergeIntervalSec()))));
-      conf.setConcurrentCompactionThread(
-          Integer.parseInt(
-              properties.getProperty(
-                  "concurrent_compaction_thread",
-                  Integer.toString(conf.getConcurrentCompactionThread()))));
-      conf.setTargetCompactionFileSize(
-          Long.parseLong(
-              properties.getProperty(
-                  "target_compaction_file_size",
-                  Long.toString(conf.getTargetCompactionFileSize()))));
-      conf.setTargetChunkSize(
-          Long.parseLong(
-              properties.getProperty(
-                  "target_chunk_size", Long.toString(conf.getTargetChunkSize()))));
-      conf.setTargetChunkPointNum(
-          Long.parseLong(
-              properties.getProperty(
-                  "target_chunk_point_num", Long.toString(conf.getTargetChunkPointNum()))));
-      conf.setChunkPointNumLowerBoundInCompaction(
-          Long.parseLong(
-              properties.getProperty(
-                  "chunk_size_lower_bound_in_compaction",
-                  Long.toString(conf.getChunkPointNumLowerBoundInCompaction()))));
-      conf.setChunkSizeLowerBoundInCompaction(
-          Long.parseLong(
-              properties.getProperty(
-                  "chunk_size_lower_bound_in_compaction",
-                  Long.toString(conf.getChunkSizeLowerBoundInCompaction()))));
-      conf.setMaxInnerCompactionCandidateFileNum(
-          Integer.parseInt(
-              properties.getProperty(
-                  "max_inner_compaction_candidate_file_num",
-                  Integer.toString(conf.getMaxInnerCompactionCandidateFileNum()))));
-      conf.setMaxCrossCompactionCandidateFileNum(
-          Integer.parseInt(
-              properties.getProperty(
-                  "max_cross_compaction_candidate_file_num",
-                  Integer.toString(conf.getMaxCrossCompactionCandidateFileNum()))));
+    conf.setmRemoteSchemaCacheSize(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "remote_schema_cache_size", Integer.toString(conf.getmRemoteSchemaCacheSize()))
+                .trim()));
 
-      conf.setCompactionWriteThroughputMbPerSec(
-          Integer.parseInt(
-              properties.getProperty(
-                  "compaction_write_throughput_mb_per_sec",
-                  Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+    conf.setLanguageVersion(
+        properties.getProperty("language_version", conf.getLanguageVersion()).trim());
 
-      conf.setEnablePartialInsert(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_partial_insert", String.valueOf(conf.isEnablePartialInsert()))));
+    if (properties.containsKey("chunk_buffer_pool_enable")) {
+      conf.setChunkBufferPoolEnable(
+          Boolean.parseBoolean(properties.getProperty("chunk_buffer_pool_enable")));
+    }
 
-      int rpcSelectorThreadNum =
-          Integer.parseInt(
-              properties.getProperty(
-                  "rpc_selector_thread_num",
-                  Integer.toString(conf.getRpcSelectorThreadNum()).trim()));
-      if (rpcSelectorThreadNum <= 0) {
-        rpcSelectorThreadNum = 1;
-      }
+    conf.setEnableExternalSort(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_external_sort", Boolean.toString(conf.isEnableExternalSort()))));
+    conf.setExternalSortThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "external_sort_threshold", Integer.toString(conf.getExternalSortThreshold()))));
+    conf.setUpgradeThreadNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "upgrade_thread_num", Integer.toString(conf.getUpgradeThreadNum()))));
+    conf.setCrossCompactionMemoryBudget(
+        Long.parseLong(
+            properties.getProperty(
+                "cross_compaction_memory_budget",
+                Long.toString(conf.getCrossCompactionMemoryBudget()))));
+    conf.setCrossCompactionFileSelectionTimeBudget(
+        Long.parseLong(
+            properties.getProperty(
+                "cross_compaction_file_selection_time_budget",
+                Long.toString(conf.getCrossCompactionFileSelectionTimeBudget()))));
+    conf.setMergeIntervalSec(
+        Long.parseLong(
+            properties.getProperty(
+                "merge_interval_sec", Long.toString(conf.getMergeIntervalSec()))));
+    conf.setConcurrentCompactionThread(
+        Integer.parseInt(
+            properties.getProperty(
+                "concurrent_compaction_thread",
+                Integer.toString(conf.getConcurrentCompactionThread()))));
+    conf.setTargetCompactionFileSize(
+        Long.parseLong(
+            properties.getProperty(
+                "target_compaction_file_size", Long.toString(conf.getTargetCompactionFileSize()))));
+    conf.setTargetChunkSize(
+        Long.parseLong(
+            properties.getProperty("target_chunk_size", Long.toString(conf.getTargetChunkSize()))));
+    conf.setTargetChunkPointNum(
+        Long.parseLong(
+            properties.getProperty(
+                "target_chunk_point_num", Long.toString(conf.getTargetChunkPointNum()))));
+    conf.setChunkPointNumLowerBoundInCompaction(
+        Long.parseLong(
+            properties.getProperty(
+                "chunk_size_lower_bound_in_compaction",
+                Long.toString(conf.getChunkPointNumLowerBoundInCompaction()))));
+    conf.setChunkSizeLowerBoundInCompaction(
+        Long.parseLong(
+            properties.getProperty(
+                "chunk_size_lower_bound_in_compaction",
+                Long.toString(conf.getChunkSizeLowerBoundInCompaction()))));
+    conf.setMaxInnerCompactionCandidateFileNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_inner_compaction_candidate_file_num",
+                Integer.toString(conf.getMaxInnerCompactionCandidateFileNum()))));
+    conf.setMaxCrossCompactionCandidateFileNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_cross_compaction_candidate_file_num",
+                Integer.toString(conf.getMaxCrossCompactionCandidateFileNum()))));
 
-      conf.setRpcSelectorThreadNum(rpcSelectorThreadNum);
+    conf.setCompactionWriteThroughputMbPerSec(
+        Integer.parseInt(
+            properties.getProperty(
+                "compaction_write_throughput_mb_per_sec",
+                Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
 
-      int minConcurrentClientNum =
-          Integer.parseInt(
-              properties.getProperty(
-                  "rpc_min_concurrent_client_num",
-                  Integer.toString(conf.getRpcMinConcurrentClientNum()).trim()));
-      if (minConcurrentClientNum <= 0) {
-        minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
-      }
+    conf.setEnablePartialInsert(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_partial_insert", String.valueOf(conf.isEnablePartialInsert()))));
 
-      conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
+    int rpcSelectorThreadNum =
+        Integer.parseInt(
+            properties.getProperty(
+                "rpc_selector_thread_num",
+                Integer.toString(conf.getRpcSelectorThreadNum()).trim()));
+    if (rpcSelectorThreadNum <= 0) {
+      rpcSelectorThreadNum = 1;
+    }
 
-      int maxConcurrentClientNum =
-          Integer.parseInt(
-              properties.getProperty(
-                  "rpc_max_concurrent_client_num",
-                  Integer.toString(conf.getRpcMaxConcurrentClientNum()).trim()));
-      if (maxConcurrentClientNum <= 0) {
-        maxConcurrentClientNum = 65535;
-      }
+    conf.setRpcSelectorThreadNum(rpcSelectorThreadNum);
 
-      conf.setRpcMaxConcurrentClientNum(maxConcurrentClientNum);
+    int minConcurrentClientNum =
+        Integer.parseInt(
+            properties.getProperty(
+                "rpc_min_concurrent_client_num",
+                Integer.toString(conf.getRpcMinConcurrentClientNum()).trim()));
+    if (minConcurrentClientNum <= 0) {
+      minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
+    }
 
-      conf.setEnableWatermark(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "watermark_module_opened", Boolean.toString(conf.isEnableWatermark()).trim())));
-      conf.setWatermarkSecretKey(
-          properties.getProperty("watermark_secret_key", conf.getWatermarkSecretKey()));
-      conf.setWatermarkBitString(
-          properties.getProperty("watermark_bit_string", conf.getWatermarkBitString()));
-      conf.setWatermarkMethod(
-          properties.getProperty("watermark_method", conf.getWatermarkMethod()));
+    conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
 
-      loadAutoCreateSchemaProps(properties);
+    int maxConcurrentClientNum =
+        Integer.parseInt(
+            properties.getProperty(
+                "rpc_max_concurrent_client_num",
+                Integer.toString(conf.getRpcMaxConcurrentClientNum()).trim()));
+    if (maxConcurrentClientNum <= 0) {
+      maxConcurrentClientNum = 65535;
+    }
 
-      conf.setTsFileStorageFs(
-          properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().toString()));
-      conf.setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
-      conf.setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
-      conf.setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
-      conf.setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
-      conf.setDfsNameServices(
-          properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
-      conf.setDfsHaNamenodes(
-          properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
-      conf.setDfsHaAutomaticFailoverEnabled(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "dfs_ha_automatic_failover_enabled",
-                  String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
-      conf.setDfsClientFailoverProxyProvider(
-          properties.getProperty(
-              "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
-      conf.setUseKerberos(
-          Boolean.parseBoolean(
-              properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
-      conf.setKerberosKeytabFilePath(
-          properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
-      conf.setKerberosPrincipal(
-          properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
+    conf.setRpcMaxConcurrentClientNum(maxConcurrentClientNum);
 
-      conf.setAllowReadOnlyWhenErrorsOccur(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "allow_read_only_when_errors_occur",
-                  String.valueOf(conf.isAllowReadOnlyWhenErrorsOccur()))));
+    conf.setEnableWatermark(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "watermark_module_opened", Boolean.toString(conf.isEnableWatermark()).trim())));
+    conf.setWatermarkSecretKey(
+        properties.getProperty("watermark_secret_key", conf.getWatermarkSecretKey()));
+    conf.setWatermarkBitString(
+        properties.getProperty("watermark_bit_string", conf.getWatermarkBitString()));
+    conf.setWatermarkMethod(properties.getProperty("watermark_method", conf.getWatermarkMethod()));
+
+    loadAutoCreateSchemaProps(properties);
+
+    conf.setTsFileStorageFs(
+        properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().toString()));
+    conf.setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
+    conf.setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
+    conf.setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
+    conf.setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
+    conf.setDfsNameServices(properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
+    conf.setDfsHaNamenodes(
+        properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
+    conf.setDfsHaAutomaticFailoverEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "dfs_ha_automatic_failover_enabled",
+                String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
+    conf.setDfsClientFailoverProxyProvider(
+        properties.getProperty(
+            "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
+    conf.setUseKerberos(
+        Boolean.parseBoolean(
+            properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
+    conf.setKerberosKeytabFilePath(
+        properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
+    conf.setKerberosPrincipal(
+        properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
 
-      // the num of memtables in each storage group
-      conf.setConcurrentWritingTimePartition(
-          Integer.parseInt(
-              properties.getProperty(
-                  "concurrent_writing_time_partition",
-                  String.valueOf(conf.getConcurrentWritingTimePartition()))));
+    conf.setAllowReadOnlyWhenErrorsOccur(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "allow_read_only_when_errors_occur",
+                String.valueOf(conf.isAllowReadOnlyWhenErrorsOccur()))));
 
-      // the default fill interval in LinearFill and PreviousFill
-      conf.setDefaultFillInterval(
-          Integer.parseInt(
-              properties.getProperty(
-                  "default_fill_interval", String.valueOf(conf.getDefaultFillInterval()))));
+    // the num of memtables in each storage group
+    conf.setConcurrentWritingTimePartition(
+        Integer.parseInt(
+            properties.getProperty(
+                "concurrent_writing_time_partition",
+                String.valueOf(conf.getConcurrentWritingTimePartition()))));
 
-      conf.setTagAttributeTotalSize(
-          Integer.parseInt(
-              properties.getProperty(
-                  "tag_attribute_total_size", String.valueOf(conf.getTagAttributeTotalSize()))));
+    // the default fill interval in LinearFill and PreviousFill
+    conf.setDefaultFillInterval(
+        Integer.parseInt(
+            properties.getProperty(
+                "default_fill_interval", String.valueOf(conf.getDefaultFillInterval()))));
 
-      conf.setTagAttributeFlushInterval(
-          Integer.parseInt(
-              properties.getProperty(
-                  "tag_attribute_flush_interval",
-                  String.valueOf(conf.getTagAttributeFlushInterval()))));
+    conf.setTagAttributeTotalSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "tag_attribute_total_size", String.valueOf(conf.getTagAttributeTotalSize()))));
 
-      conf.setPrimitiveArraySize(
-          (Integer.parseInt(
-              properties.getProperty(
-                  "primitive_array_size", String.valueOf(conf.getPrimitiveArraySize())))));
+    conf.setTagAttributeFlushInterval(
+        Integer.parseInt(
+            properties.getProperty(
+                "tag_attribute_flush_interval",
+                String.valueOf(conf.getTagAttributeFlushInterval()))));
 
-      conf.setThriftMaxFrameSize(
-          Integer.parseInt(
-              properties.getProperty(
-                  "thrift_max_frame_size", String.valueOf(conf.getThriftMaxFrameSize()))));
+    conf.setPrimitiveArraySize(
+        (Integer.parseInt(
+            properties.getProperty(
+                "primitive_array_size", String.valueOf(conf.getPrimitiveArraySize())))));
 
-      if (conf.getThriftMaxFrameSize() < IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2) {
-        conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2);
-      }
+    conf.setThriftMaxFrameSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "thrift_max_frame_size", String.valueOf(conf.getThriftMaxFrameSize()))));
 
-      conf.setThriftDefaultBufferSize(
-          Integer.parseInt(
-              properties.getProperty(
-                  "thrift_init_buffer_size", String.valueOf(conf.getThriftDefaultBufferSize()))));
+    if (conf.getThriftMaxFrameSize() < IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2) {
+      conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2);
+    }
 
-      conf.setFrequencyIntervalInMinute(
-          Integer.parseInt(
-              properties.getProperty(
-                  "frequency_interval_in_minute",
-                  String.valueOf(conf.getFrequencyIntervalInMinute()))));
+    conf.setThriftDefaultBufferSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "thrift_init_buffer_size", String.valueOf(conf.getThriftDefaultBufferSize()))));
 
-      conf.setSlowQueryThreshold(
-          Long.parseLong(
-              properties.getProperty(
-                  "slow_query_threshold", String.valueOf(conf.getSlowQueryThreshold()))));
+    conf.setFrequencyIntervalInMinute(
+        Integer.parseInt(
+            properties.getProperty(
+                "frequency_interval_in_minute",
+                String.valueOf(conf.getFrequencyIntervalInMinute()))));
 
-      conf.setDataRegionNum(
-          Integer.parseInt(
-              properties.getProperty("data_region_num", String.valueOf(conf.getDataRegionNum()))));
+    conf.setSlowQueryThreshold(
+        Long.parseLong(
+            properties.getProperty(
+                "slow_query_threshold", String.valueOf(conf.getSlowQueryThreshold()))));
 
-      conf.setRecoveryLogIntervalInMs(
-          Long.parseLong(
-              properties.getProperty(
-                  "recovery_log_interval_in_ms",
-                  String.valueOf(conf.getRecoveryLogIntervalInMs()))));
+    conf.setDataRegionNum(
+        Integer.parseInt(
+            properties.getProperty("data_region_num", String.valueOf(conf.getDataRegionNum()))));
 
-      conf.setEnableDiscardOutOfOrderData(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_discard_out_of_order_data",
-                  Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
+    conf.setRecoveryLogIntervalInMs(
+        Long.parseLong(
+            properties.getProperty(
+                "recovery_log_interval_in_ms", String.valueOf(conf.getRecoveryLogIntervalInMs()))));
 
-      conf.setConcurrentWindowEvaluationThread(
-          Integer.parseInt(
-              properties.getProperty(
-                  "concurrent_window_evaluation_thread",
-                  Integer.toString(conf.getConcurrentWindowEvaluationThread()))));
-      if (conf.getConcurrentWindowEvaluationThread() <= 0) {
-        conf.setConcurrentWindowEvaluationThread(Runtime.getRuntime().availableProcessors());
-      }
+    conf.setEnableDiscardOutOfOrderData(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_discard_out_of_order_data",
+                Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
 
-      conf.setMaxPendingWindowEvaluationTasks(
-          Integer.parseInt(
-              properties.getProperty(
-                  "max_pending_window_evaluation_tasks",
-                  Integer.toString(conf.getMaxPendingWindowEvaluationTasks()))));
-      if (conf.getMaxPendingWindowEvaluationTasks() <= 0) {
-        conf.setMaxPendingWindowEvaluationTasks(64);
-      }
+    conf.setConcurrentWindowEvaluationThread(
+        Integer.parseInt(
+            properties.getProperty(
+                "concurrent_window_evaluation_thread",
+                Integer.toString(conf.getConcurrentWindowEvaluationThread()))));
+    if (conf.getConcurrentWindowEvaluationThread() <= 0) {
+      conf.setConcurrentWindowEvaluationThread(Runtime.getRuntime().availableProcessors());
+    }
 
-      // id table related configuration
-      conf.setDeviceIDTransformationMethod(
-          properties.getProperty(
-              "device_id_transformation_method", conf.getDeviceIDTransformationMethod()));
+    conf.setMaxPendingWindowEvaluationTasks(
+        Integer.parseInt(
+            properties.getProperty(
+                "max_pending_window_evaluation_tasks",
+                Integer.toString(conf.getMaxPendingWindowEvaluationTasks()))));
+    if (conf.getMaxPendingWindowEvaluationTasks() <= 0) {
+      conf.setMaxPendingWindowEvaluationTasks(64);
+    }
 
-      conf.setEnableIDTable(
-          Boolean.parseBoolean(
-              properties.getProperty("enable_id_table", String.valueOf(conf.isEnableIDTable()))));
+    // id table related configuration
+    conf.setDeviceIDTransformationMethod(
+        properties.getProperty(
+            "device_id_transformation_method", conf.getDeviceIDTransformationMethod()));
 
-      conf.setEnableIDTableLogFile(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_id_table_log_file", String.valueOf(conf.isEnableIDTableLogFile()))));
+    conf.setEnableIDTable(
+        Boolean.parseBoolean(
+            properties.getProperty("enable_id_table", String.valueOf(conf.isEnableIDTable()))));
 
-      conf.setSchemaEngineMode(
-          properties.getProperty("schema_engine_mode", String.valueOf(conf.getSchemaEngineMode())));
+    conf.setEnableIDTableLogFile(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_id_table_log_file", String.valueOf(conf.isEnableIDTableLogFile()))));
 
-      conf.setEnableLastCache(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_last_cache", Boolean.toString(conf.isLastCacheEnabled()))));
+    conf.setSchemaEngineMode(
+        properties.getProperty("schema_engine_mode", String.valueOf(conf.getSchemaEngineMode())));
 
-      if (conf.getSchemaEngineMode().equals("Rocksdb_based")) {
-        conf.setEnableLastCache(false);
-      }
+    conf.setEnableLastCache(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_last_cache", Boolean.toString(conf.isLastCacheEnabled()))));
 
-      conf.setCachedMNodeSizeInSchemaFileMode(
-          Integer.parseInt(
-              properties.getProperty(
-                  "cached_mnode_size_in_schema_file_mode",
-                  String.valueOf(conf.getCachedMNodeSizeInSchemaFileMode()))));
+    if (conf.getSchemaEngineMode().equals("Rocksdb_based")) {
+      conf.setEnableLastCache(false);
+    }
 
-      conf.setMinimumSegmentInSchemaFile(
-          Short.parseShort(
-              properties.getProperty(
-                  "minimum_schema_file_segment_in_bytes",
-                  String.valueOf(conf.getMinimumSegmentInSchemaFile()))));
+    conf.setCachedMNodeSizeInSchemaFileMode(
+        Integer.parseInt(
+            properties.getProperty(
+                "cached_mnode_size_in_schema_file_mode",
+                String.valueOf(conf.getCachedMNodeSizeInSchemaFileMode()))));
 
-      conf.setPageCacheSizeInSchemaFile(
-          Short.parseShort(
-              properties.getProperty(
-                  "page_cache_in_schema_file",
-                  String.valueOf(conf.getPageCacheSizeInSchemaFile()))));
+    conf.setMinimumSegmentInSchemaFile(
+        Short.parseShort(
+            properties.getProperty(
+                "minimum_schema_file_segment_in_bytes",
+                String.valueOf(conf.getMinimumSegmentInSchemaFile()))));
 
-      // mqtt
-      loadMqttProps(properties);
+    conf.setPageCacheSizeInSchemaFile(
+        Short.parseShort(
+            properties.getProperty(
+                "page_cache_in_schema_file", String.valueOf(conf.getPageCacheSizeInSchemaFile()))));
 
-      conf.setEnablePartition(
-          Boolean.parseBoolean(
-              properties.getProperty(
-                  "enable_partition", String.valueOf(conf.isEnablePartition()))));
+    // mqtt
+    loadMqttProps(properties);
 
-      conf.setPartitionInterval(
-          Long.parseLong(
-              properties.getProperty(
-                  "partition_interval", String.valueOf(conf.getPartitionInterval()))));
+    conf.setEnablePartition(
+        Boolean.parseBoolean(
+            properties.getProperty("enable_partition", String.valueOf(conf.isEnablePartition()))));
 
-      conf.setSelectIntoInsertTabletPlanRowLimit(
-          Integer.parseInt(
-              properties.getProperty(
-                  "select_into_insert_tablet_plan_row_limit",
-                  String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+    conf.setPartitionInterval(
+        Long.parseLong(
+            properties.getProperty(
+                "partition_interval", String.valueOf(conf.getPartitionInterval()))));
 
-      conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
+    conf.setSelectIntoInsertTabletPlanRowLimit(
+        Integer.parseInt(
+            properties.getProperty(
+                "select_into_insert_tablet_plan_row_limit",
+                String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
 
-      conf.setInsertMultiTabletEnableMultithreadingColumnThreshold(
-          Integer.parseInt(
-              properties.getProperty(
-                  "insert_multi_tablet_enable_multithreading_column_threshold",
-                  String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
+    conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
 
-      // At the same time, set TSFileConfig
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setTSFileStorageFs(
-              FSType.valueOf(
-                  properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name())));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setDfsNameServices(
-              properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setDfsHaNamenodes(
-              properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setDfsHaAutomaticFailoverEnabled(
-              Boolean.parseBoolean(
-                  properties.getProperty(
-                      "dfs_ha_automatic_failover_enabled",
-                      String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setDfsClientFailoverProxyProvider(
-              properties.getProperty(
-                  "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setUseKerberos(
-              Boolean.parseBoolean(
-                  properties.getProperty(
-                      "hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setKerberosKeytabFilePath(
-              properties.getProperty(
-                  "kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
-      TSFileDescriptor.getInstance()
-          .getConfig()
-          .setKerberosPrincipal(
-              properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
-      TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+    conf.setInsertMultiTabletEnableMultithreadingColumnThreshold(
+        Integer.parseInt(
+            properties.getProperty(
+                "insert_multi_tablet_enable_multithreading_column_threshold",
+                String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
 
-      conf.setCoordinatorReadExecutorSize(
-          Integer.parseInt(
-              properties.getProperty(
-                  "coordinator_read_executor_size",
-                  Integer.toString(conf.getCoordinatorReadExecutorSize()))));
-      conf.setCoordinatorWriteExecutorSize(
-          Integer.parseInt(
-              properties.getProperty(
-                  "coordinator_write_executor_size",
-                  Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+    // At the same time, set TSFileConfig
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setTSFileStorageFs(
+            FSType.valueOf(
+                properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name())));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setDfsNameServices(properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setDfsHaNamenodes(
+            properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setDfsHaAutomaticFailoverEnabled(
+            Boolean.parseBoolean(
+                properties.getProperty(
+                    "dfs_ha_automatic_failover_enabled",
+                    String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setDfsClientFailoverProxyProvider(
+            properties.getProperty(
+                "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setUseKerberos(
+            Boolean.parseBoolean(
+                properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setKerberosKeytabFilePath(
+            properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
+    TSFileDescriptor.getInstance()
+        .getConfig()
+        .setKerberosPrincipal(
+            properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
+    TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
 
-      // commons
-      commonDescriptor.loadCommonProps(properties);
-      commonDescriptor.initCommonConfigDir(conf.getSystemDir());
+    conf.setCoordinatorReadExecutorSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "coordinator_read_executor_size",
+                Integer.toString(conf.getCoordinatorReadExecutorSize()))));
+    conf.setCoordinatorWriteExecutorSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "coordinator_write_executor_size",
+                Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
 
-      // timed flush memtable
-      loadTimedService(properties);
+    // commons
+    commonDescriptor.loadCommonProps(properties);
+    commonDescriptor.initCommonConfigDir(conf.getSystemDir());
 
-      // set tsfile-format config
-      loadTsFileProps(properties);
+    // timed flush memtable
+    loadTimedService(properties);
 
-      // make RPCTransportFactory taking effect.
-      RpcTransportFactory.reInit();
+    // set tsfile-format config
+    loadTsFileProps(properties);
 
-      // UDF
-      loadUDFProps(properties);
+    // make RPCTransportFactory taking effect.
+    RpcTransportFactory.reInit();
 
-      // trigger
-      loadTriggerProps(properties);
+    // UDF
+    loadUDFProps(properties);
 
-      // CQ
-      loadCQProps(properties);
+    // trigger
+    loadTriggerProps(properties);
 
-      // cluster
-      loadClusterProps(properties);
+    // CQ
+    loadCQProps(properties);
 
-      // shuffle
-      loadShuffleProps(properties);
+    // cluster
+    loadClusterProps(properties);
 
-      // author cache
-      loadAuthorCache(properties);
-    } catch (FileNotFoundException e) {
-      logger.warn("Fail to find config file {}", url, e);
-    } catch (IOException e) {
-      logger.warn("Cannot load config file, use default configuration", e);
-    } catch (Exception e) {
-      logger.warn("Incorrect format in config file, use default configuration", e);
-    } finally {
-      // update all data seriesPath
-      conf.updatePath();
-      commonDescriptor.getConfig().updatePath(System.getProperty(IoTDBConstant.IOTDB_HOME, null));
-      MetricConfigDescriptor.getInstance()
-          .getMetricConfig()
-          .updateRpcInstance(conf.getRpcAddress(), conf.getRpcPort());
-    }
+    // shuffle
+    loadShuffleProps(properties);
+
+    // author cache
+    loadAuthorCache(properties);
   }
 
   private void loadAuthorCache(Properties properties) {
@@ -1270,6 +1312,16 @@ public class IoTDBDescriptor {
     }
   }
 
+  private void loadExternalLibProps(Properties properties) {
+
+    conf.setExternalPropertiesLoaderDir(
+        properties.getProperty(
+            "external_properties_loader_dir", conf.getExternalPropertiesLoaderDir()));
+
+    conf.setExternalLimiterDir(
+        properties.getProperty("external_limiter_dir", conf.getExternalLimiterDir()));
+  }
+
   // timed flush memtable
   private void loadTimedService(Properties properties) {
     conf.setEnableTimedFlushSeqMemtable(
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesNumberOverflowException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesNumberOverflowException.java
new file mode 100644
index 0000000000..7f5e2b4e17
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesNumberOverflowException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class SeriesNumberOverflowException extends MetadataException {
+
+  public SeriesNumberOverflowException() {
+    super("exceed max allowed series number.", TSStatusCode.SERIES_OVERFLOW.getStatusCode());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 46e5903514..bc535516d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
 import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 
+import org.apche.iotdb.external.api.ISeriesNumerLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -69,6 +71,23 @@ public class SchemaEngine {
 
   private ScheduledExecutorService timedForceMLogThread;
 
+  private ISeriesNumerLimiter seriesNumerLimiter =
+      new ISeriesNumerLimiter() {
+        @Override
+        public void init(Properties properties) {}
+
+        @Override
+        public boolean addTimeSeries(int number) {
+          // always return true, don't limit the number of series
+          return true;
+        }
+
+        @Override
+        public void deleteTimeSeries(int number) {
+          // do nothing
+        }
+      };
+
   public TSStatus write(SchemaRegionId schemaRegionId, PlanNode planNode) {
     return planNode.accept(new SchemaExecutionVisitor(), schemaRegionMap.get(schemaRegionId));
   }
@@ -290,11 +309,14 @@ public class SchemaEngine {
     IStorageGroupMNode storageGroupMNode = ensureStorageGroupByStorageGroupPath(storageGroup);
     switch (this.schemaRegionStoredMode) {
       case Memory:
-        schemaRegion = new SchemaRegionMemoryImpl(storageGroup, schemaRegionId, storageGroupMNode);
+        schemaRegion =
+            new SchemaRegionMemoryImpl(
+                storageGroup, schemaRegionId, storageGroupMNode, seriesNumerLimiter);
         break;
       case Schema_File:
         schemaRegion =
-            new SchemaRegionSchemaFileImpl(storageGroup, schemaRegionId, storageGroupMNode);
+            new SchemaRegionSchemaFileImpl(
+                storageGroup, schemaRegionId, storageGroupMNode, seriesNumerLimiter);
         break;
       case Rocksdb_based:
         schemaRegion =
@@ -363,4 +385,8 @@ public class SchemaEngine {
       sharedPrefixTree.deleteStorageGroup(new PartialPath(schemaRegion.getStorageGroupFullPath()));
     }
   }
+
+  public void setSeriesNumerLimiter(ISeriesNumerLimiter seriesNumerLimiter) {
+    this.seriesNumerLimiter = seriesNumerLimiter;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index a4c2c8a8fb..6c70c6865c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SeriesNumberOverflowException;
 import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
 import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
 import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
@@ -90,6 +91,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apche.iotdb.external.api.ISeriesNumerLimiter;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -178,9 +180,14 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
   private TagManager tagManager;
   private SchemaSyncManager syncManager = SchemaSyncManager.getInstance();
 
+  private final ISeriesNumerLimiter seriesNumerLimiter;
+
   // region Interfaces and Implementation of initialization、snapshot、recover and clear
   public SchemaRegionMemoryImpl(
-      PartialPath storageGroup, SchemaRegionId schemaRegionId, IStorageGroupMNode storageGroupMNode)
+      PartialPath storageGroup,
+      SchemaRegionId schemaRegionId,
+      IStorageGroupMNode storageGroupMNode,
+      ISeriesNumerLimiter seriesNumerLimiter)
       throws MetadataException {
 
     storageGroupFullPath = storageGroup.getFullPath();
@@ -214,6 +221,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       }
     }
 
+    this.seriesNumerLimiter = seriesNumerLimiter;
+
     init();
   }
 
@@ -454,7 +463,9 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
+    int seriesCount = leafMNodes.size();
+    schemaStatisticsManager.deleteTimeseries(seriesCount);
+    seriesNumerLimiter.deleteTimeSeries(seriesCount);
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -582,23 +593,35 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       throw new SeriesOverflowException();
     }
 
+    if (!seriesNumerLimiter.addTimeSeries(1)) {
+      throw new SeriesNumberOverflowException();
+    }
+
     try {
-      PartialPath path = plan.getPath();
-      SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
-
-      TSDataType type = plan.getDataType();
-      // create time series in MTree
-      IMeasurementMNode leafMNode =
-          mtree.createTimeseries(
-              path,
-              type,
-              plan.getEncoding(),
-              plan.getCompressor(),
-              plan.getProps(),
-              plan.getAlias());
-
-      // the cached mNode may be replaced by new entityMNode in mtree
-      mNodeCache.invalidate(path.getDevicePath());
+      IMeasurementMNode leafMNode;
+
+      // using try-catch to restore seriesNumerLimiter's state while create failed
+      try {
+        PartialPath path = plan.getPath();
+        SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
+
+        TSDataType type = plan.getDataType();
+        // create time series in MTree
+        leafMNode =
+            mtree.createTimeseries(
+                path,
+                type,
+                plan.getEncoding(),
+                plan.getCompressor(),
+                plan.getProps(),
+                plan.getAlias());
+
+        // the cached mNode may be replaced by new entityMNode in mtree
+        mNodeCache.invalidate(path.getDevicePath());
+      } catch (Throwable t) {
+        seriesNumerLimiter.deleteTimeSeries(1);
+        throw t;
+      }
 
       // update statistics and schemaDataTypeNumMap
       schemaStatisticsManager.addTimeseries(1);
@@ -688,10 +711,15 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
    * @param plan CreateAlignedTimeSeriesPlan
    */
   public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+    int seriesCount = plan.getMeasurements().size();
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
       throw new SeriesOverflowException();
     }
 
+    if (!seriesNumerLimiter.addTimeSeries(seriesCount)) {
+      throw new SeriesNumberOverflowException();
+    }
+
     try {
       PartialPath prefixPath = plan.getPrefixPath();
       List<String> measurements = plan.getMeasurements();
@@ -699,26 +727,33 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
       List<TSEncoding> encodings = plan.getEncodings();
       List<Map<String, String>> tagsList = plan.getTagsList();
       List<Map<String, String>> attributesList = plan.getAttributesList();
+      List<IMeasurementMNode> measurementMNodeList;
 
-      for (int i = 0; i < measurements.size(); i++) {
-        SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
-      }
-
-      // create time series in MTree
-      List<IMeasurementMNode> measurementMNodeList =
-          mtree.createAlignedTimeseries(
-              prefixPath,
-              measurements,
-              plan.getDataTypes(),
-              plan.getEncodings(),
-              plan.getCompressors(),
-              plan.getAliasList());
+      // using try-catch to restore seriesNumerLimiter's state while create failed
+      try {
+        for (int i = 0; i < measurements.size(); i++) {
+          SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
+        }
 
-      // the cached mNode may be replaced by new entityMNode in mtree
-      mNodeCache.invalidate(prefixPath);
+        // create time series in MTree
+        measurementMNodeList =
+            mtree.createAlignedTimeseries(
+                prefixPath,
+                measurements,
+                plan.getDataTypes(),
+                plan.getEncodings(),
+                plan.getCompressors(),
+                plan.getAliasList());
+
+        // the cached mNode may be replaced by new entityMNode in mtree
+        mNodeCache.invalidate(prefixPath);
+      } catch (Throwable t) {
+        seriesNumerLimiter.deleteTimeSeries(seriesCount);
+        throw t;
+      }
 
       // update statistics and schemaDataTypeNumMap
-      schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
+      schemaStatisticsManager.addTimeseries(seriesCount);
 
       List<Long> tagOffsets = plan.getTagOffsets();
       for (int i = 0; i < measurements.size(); i++) {
@@ -861,6 +896,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
     mNodeCache.invalidate(node.getPartialPath());
 
     schemaStatisticsManager.deleteTimeseries(1);
+    seriesNumerLimiter.deleteTimeSeries(1);
     return storageGroupPath;
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 57129f5063..0a1fe43f4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SeriesNumberOverflowException;
 import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
 import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
 import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
@@ -87,6 +88,7 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apche.iotdb.external.api.ISeriesNumerLimiter;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -172,9 +174,14 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   private TagManager tagManager;
   private SchemaSyncManager syncManager = SchemaSyncManager.getInstance();
 
+  private final ISeriesNumerLimiter seriesNumerLimiter;
+
   // region Interfaces and Implementation of initialization、snapshot、recover and clear
   public SchemaRegionSchemaFileImpl(
-      PartialPath storageGroup, SchemaRegionId schemaRegionId, IStorageGroupMNode storageGroupMNode)
+      PartialPath storageGroup,
+      SchemaRegionId schemaRegionId,
+      IStorageGroupMNode storageGroupMNode,
+      ISeriesNumerLimiter seriesNumerLimiter)
       throws MetadataException {
 
     storageGroupFullPath = storageGroup.getFullPath();
@@ -200,6 +207,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
                   }
                 });
     this.storageGroupMNode = storageGroupMNode;
+    this.seriesNumerLimiter = seriesNumerLimiter;
     init();
   }
 
@@ -413,7 +421,9 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     // collect all the LeafMNode in this schema region
     List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
 
-    schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
+    int seriesCount = leafMNodes.size();
+    schemaStatisticsManager.deleteTimeseries(seriesCount);
+    seriesNumerLimiter.deleteTimeSeries(seriesCount);
 
     // drop triggers with no exceptions
     TriggerEngine.drop(leafMNodes);
@@ -475,20 +485,31 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       throw new SeriesOverflowException();
     }
 
+    if (!seriesNumerLimiter.addTimeSeries(1)) {
+      throw new SeriesNumberOverflowException();
+    }
+
     try {
       PartialPath path = plan.getPath();
-      SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
-
-      TSDataType type = plan.getDataType();
-      // create time series in MTree
-      IMeasurementMNode leafMNode =
-          mtree.createTimeseriesWithPinnedReturn(
-              path,
-              type,
-              plan.getEncoding(),
-              plan.getCompressor(),
-              plan.getProps(),
-              plan.getAlias());
+      IMeasurementMNode leafMNode;
+      // using try-catch to restore seriesNumerLimiter's state while create failed
+      try {
+        SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
+
+        TSDataType type = plan.getDataType();
+        // create time series in MTree
+        leafMNode =
+            mtree.createTimeseriesWithPinnedReturn(
+                path,
+                type,
+                plan.getEncoding(),
+                plan.getCompressor(),
+                plan.getProps(),
+                plan.getAlias());
+      } catch (Throwable t) {
+        seriesNumerLimiter.deleteTimeSeries(1);
+        throw t;
+      }
 
       try {
         // the cached mNode may be replaced by new entityMNode in mtree
@@ -606,10 +627,15 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
    * @param plan CreateAlignedTimeSeriesPlan
    */
   public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+    int seriesCount = plan.getMeasurements().size();
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
       throw new SeriesOverflowException();
     }
 
+    if (!seriesNumerLimiter.addTimeSeries(seriesCount)) {
+      throw new SeriesNumberOverflowException();
+    }
+
     try {
       PartialPath prefixPath = plan.getPrefixPath();
       List<String> measurements = plan.getMeasurements();
@@ -617,27 +643,33 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
       List<TSEncoding> encodings = plan.getEncodings();
       List<Map<String, String>> tagsList = plan.getTagsList();
       List<Map<String, String>> attributesList = plan.getAttributesList();
+      List<IMeasurementMNode> measurementMNodeList;
+      // using try-catch to restore seriesNumerLimiter's state while create failed
+      try {
+        for (int i = 0; i < measurements.size(); i++) {
+          SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
+        }
 
-      for (int i = 0; i < measurements.size(); i++) {
-        SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
+        // create time series in MTree
+        measurementMNodeList =
+            mtree.createAlignedTimeseries(
+                prefixPath,
+                measurements,
+                plan.getDataTypes(),
+                plan.getEncodings(),
+                plan.getCompressors(),
+                plan.getAliasList());
+      } catch (Throwable t) {
+        seriesNumerLimiter.deleteTimeSeries(seriesCount);
+        throw t;
       }
 
-      // create time series in MTree
-      List<IMeasurementMNode> measurementMNodeList =
-          mtree.createAlignedTimeseries(
-              prefixPath,
-              measurements,
-              plan.getDataTypes(),
-              plan.getEncodings(),
-              plan.getCompressors(),
-              plan.getAliasList());
-
       try {
         // the cached mNode may be replaced by new entityMNode in mtree
         mNodeCache.invalidate(prefixPath);
 
         // update statistics and schemaDataTypeNumMap
-        schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
+        schemaStatisticsManager.addTimeseries(seriesCount);
 
         List<Long> tagOffsets = plan.getTagOffsets();
         for (int i = 0; i < measurements.size(); i++) {
@@ -786,6 +818,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
     mNodeCache.invalidate(node.getPartialPath());
 
     schemaStatisticsManager.deleteTimeseries(1);
+    seriesNumerLimiter.deleteTimeSeries(1);
     return storageGroupPath;
   }
   // endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
index 0cd38ab9dd..b666c8be2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
@@ -30,10 +30,11 @@ import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.ServiceLoader;
 
+import static org.apache.iotdb.db.utils.JarLoaderUtil.getExternalJarURLs;
+
 /** PayloadFormatManager loads payload formatter from SPI services. */
 public class PayloadFormatManager {
   private static final Logger logger = LoggerFactory.getLogger(PayloadFormatManager.class);
@@ -73,14 +74,6 @@ public class PayloadFormatManager {
     FileUtils.forceMkdir(file);
   }
 
-  private static URL[] getPlugInJarURLs() throws IOException {
-    HashSet<File> fileSet =
-        new HashSet<>(
-            FileUtils.listFiles(
-                SystemFileFactory.INSTANCE.getFile(mqttDir), new String[] {"jar"}, true));
-    return FileUtils.toURLs(fileSet.toArray(new File[0]));
-  }
-
   private static void buildMqttPluginMap() throws IOException {
     ServiceLoader<PayloadFormatter> payloadFormatters = ServiceLoader.load(PayloadFormatter.class);
     for (PayloadFormatter formatter : payloadFormatters) {
@@ -94,7 +87,7 @@ public class PayloadFormatManager {
       logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName);
     }
 
-    URL[] jarURLs = getPlugInJarURLs();
+    URL[] jarURLs = getExternalJarURLs(mqttDir);
     logger.debug("MQTT Plugin jarURLs: {}", jarURLs);
 
     for (URL jarUrl : jarURLs) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index ff41b8e2d4..7850de4b43 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -57,6 +57,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 
+import static org.apache.iotdb.db.utils.JarLoaderUtil.loadExternLib;
+
 public class IoTDB implements IoTDBMBean {
 
   private static final Logger logger = LoggerFactory.getLogger(IoTDB.class);
@@ -81,6 +83,9 @@ public class IoTDB implements IoTDBMBean {
       System.exit(1);
     }
     IoTDB daemon = IoTDB.getInstance();
+
+    loadExternLib(config);
+
     daemon.active();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
index a0ceaade07..4cf7ac0a0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
@@ -57,6 +57,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 
+import static org.apache.iotdb.db.utils.JarLoaderUtil.loadExternLib;
+
 public class NewIoTDB implements NewIoTDBMBean {
 
   private static final Logger logger = LoggerFactory.getLogger(NewIoTDB.class);
@@ -81,6 +83,9 @@ public class NewIoTDB implements NewIoTDBMBean {
     }
     NewIoTDB daemon = NewIoTDB.getInstance();
     config.setMppMode(true);
+
+    loadExternLib(config);
+
     daemon.active();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/JarLoaderUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/JarLoaderUtil.java
new file mode 100644
index 0000000000..810d16e7fc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/JarLoaderUtil.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.apache.commons.io.FileUtils;
+import org.apche.iotdb.external.api.IPropertiesLoader;
+import org.apche.iotdb.external.api.ISeriesNumerLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+public class JarLoaderUtil {
+
+  private static final Logger logger = LoggerFactory.getLogger(JarLoaderUtil.class);
+
+  public static URL[] getExternalJarURLs(String jarDir) throws IOException {
+    HashSet<File> fileSet =
+        new HashSet<>(
+            FileUtils.listFiles(
+                SystemFileFactory.INSTANCE.getFile(jarDir), new String[] {"jar"}, true));
+    return FileUtils.toURLs(fileSet.toArray(new File[0]));
+  }
+
+  public static void loadExternLib(IoTDBConfig config) {
+    // load external properties
+    String loaderDir = config.getExternalPropertiesLoaderDir();
+
+    if (!(new File(loaderDir).exists())) {
+      return;
+    }
+
+    Path externalPropertiesFile = IoTDBDescriptor.getInstance().getExternalPropsPath();
+    URL[] loaderJarURLs;
+    List<Properties> externalPropertiesList = new ArrayList<>();
+    try {
+      loaderJarURLs = getExternalJarURLs(loaderDir);
+
+      if (loaderJarURLs == null || loaderJarURLs.length == 0) {
+        return;
+      }
+
+      ClassLoader classLoader = new URLClassLoader(loaderJarURLs);
+
+      // Use SPI to get all plugins' class
+      ServiceLoader<IPropertiesLoader> loaders =
+          ServiceLoader.load(IPropertiesLoader.class, classLoader);
+
+      for (IPropertiesLoader loader : loaders) {
+        if (loader == null) {
+          logger.error("IPropertiesLoader(), loader is null.");
+          continue;
+        }
+        Properties properties = loader.loadProperties(externalPropertiesFile.toAbsolutePath());
+        if (properties != null) {
+          externalPropertiesList.add(properties);
+        }
+      }
+    } catch (Throwable t) {
+      logger.error("error happened while loading external loader. ", t);
+      // ignore
+    }
+
+    if (externalPropertiesList.size() != 1) {
+      return;
+    }
+
+    // overwrite the default properties;
+    for (Properties properties : externalPropertiesList) {
+      IoTDBDescriptor.getInstance().loadProperties(properties);
+      TSFileDescriptor.getInstance()
+          .overwriteConfigByCustomSettings(TSFileDescriptor.getInstance().getConfig(), properties);
+    }
+
+    String limiterDir = config.getExternalLimiterDir();
+
+    if (!(new File(loaderDir).exists())) {
+      return;
+    }
+
+    URL[] limiterJarURLs;
+
+    List<ISeriesNumerLimiter> limiterList = new ArrayList<>();
+
+    try {
+      limiterJarURLs = getExternalJarURLs(limiterDir);
+
+      if (limiterJarURLs == null || limiterJarURLs.length == 0) {
+        return;
+      }
+
+      ClassLoader classLoader = new URLClassLoader(limiterJarURLs);
+
+      // Use SPI to get all plugins' class
+      ServiceLoader<ISeriesNumerLimiter> limiters =
+          ServiceLoader.load(ISeriesNumerLimiter.class, classLoader);
+
+      for (ISeriesNumerLimiter limiter : limiters) {
+        if (limiter == null) {
+          logger.error("ISeriesNumerLimiter(), limiter is null.");
+          continue;
+        }
+        for (Properties properties : externalPropertiesList) {
+          limiter.init(properties);
+        }
+        limiterList.add(limiter);
+      }
+    } catch (Throwable t) {
+      // ignore
+      logger.error("error happened while loading external limiter. ", t);
+    }
+
+    if (limiterList.size() != 1) {
+      return;
+    }
+
+    SchemaEngine.getInstance().setSeriesNumerLimiter(limiterList.get(0));
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
index 965a12eefa..0ee217c32e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
@@ -61,7 +61,7 @@ public class TSFileDescriptor {
     }
   }
 
-  private void overwriteConfigByCustomSettings(TSFileConfig conf, Properties properties) {
+  public void overwriteConfigByCustomSettings(TSFileConfig conf, Properties properties) {
     PropertiesOverWriter writer = new PropertiesOverWriter(properties);
 
     writer.setInt(conf::setGroupSizeInByte, "group_size_in_byte");