You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/24 12:20:57 UTC
[iotdb] branch master updated: Add interface to support loading external properties and to do external series number checking (#7104)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4ad3915bd3 Add interface to support loading external properties and to do external series number checking (#7104)
4ad3915bd3 is described below
commit 4ad3915bd38456122b5e4f23280a7e155cb0ffd9
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Aug 24 20:20:51 2022 +0800
Add interface to support loading external properties and to do external series number checking (#7104)
---
external-api/pom.xml | 60 +
.../iotdb/external/api/IPropertiesLoader.java | 37 +
.../iotdb/external/api/ISeriesNumerLimiter.java | 48 +
.../apache/iotdb/commons/conf/IoTDBConstant.java | 4 +
pom.xml | 1 +
server/pom.xml | 5 +
.../resources/conf/iotdb-datanode.properties | 25 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 31 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 1388 ++++++++++----------
.../metadata/SeriesNumberOverflowException.java | 29 +
.../db/metadata/schemaregion/SchemaEngine.java | 30 +-
.../schemaregion/SchemaRegionMemoryImpl.java | 104 +-
.../schemaregion/SchemaRegionSchemaFileImpl.java | 87 +-
.../db/protocol/mqtt/PayloadFormatManager.java | 13 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 5 +
.../java/org/apache/iotdb/db/service/NewIoTDB.java | 5 +
.../org/apache/iotdb/db/utils/JarLoaderUtil.java | 150 +++
.../iotdb/tsfile/common/conf/TSFileDescriptor.java | 2 +-
18 files changed, 1280 insertions(+), 744 deletions(-)
diff --git a/external-api/pom.xml b/external-api/pom.xml
new file mode 100644
index 0000000000..354c820513
--- /dev/null
+++ b/external-api/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-parent</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>0.14.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>external-api</artifactId>
+ <profiles>
+ <profile>
+ <id>get-jar-with-dependencies</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly.version}</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/external-api/src/main/java/org/apche/iotdb/external/api/IPropertiesLoader.java b/external-api/src/main/java/org/apche/iotdb/external/api/IPropertiesLoader.java
new file mode 100644
index 0000000000..10c5b21e7e
--- /dev/null
+++ b/external-api/src/main/java/org/apche/iotdb/external/api/IPropertiesLoader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apche.iotdb.external.api;
+
+import java.nio.file.Path;
+import java.util.Properties;
+
+/**
+ * An interface to load properties from external properties file to override the default
+ * configurations
+ */
+public interface IPropertiesLoader {
+
+ /**
+ * Load Properties from specific file
+ *
+ * @param file The path of the properties file to open
+ * @return a property list with values in file.
+ */
+ Properties loadProperties(Path file);
+}
diff --git a/external-api/src/main/java/org/apche/iotdb/external/api/ISeriesNumerLimiter.java b/external-api/src/main/java/org/apche/iotdb/external/api/ISeriesNumerLimiter.java
new file mode 100644
index 0000000000..7b36b5d8b7
--- /dev/null
+++ b/external-api/src/main/java/org/apche/iotdb/external/api/ISeriesNumerLimiter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apche.iotdb.external.api;
+
+import java.util.Properties;
+
+/** An interface for series number limiting, users can implement their own limitation strategy */
+public interface ISeriesNumerLimiter {
+
+ /**
+ * do the necessary initialization
+ *
+ * @param properties Properties containing all the parameters needed to init
+ */
+ void init(Properties properties);
+
+ /**
+ * add time series
+ *
+ * @param number time series number for current createTimeSeries operation
+ * @return true if totalTimeSeriesNumber doesn't exceed the limit and current createTimeSeries
+ * operation is allowed, otherwise false
+ */
+ boolean addTimeSeries(int number);
+
+ /**
+ * delete time series
+ *
+ * @param number time series number for current deleteTimeSeries operation
+ */
+ void deleteTimeSeries(int number);
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 41013228ac..4fa1095cb1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -201,6 +201,10 @@ public class IoTDBConstant {
public static final String WAL_FOLDER_NAME = "wal";
public static final String EXT_PIPE_FOLDER_NAME = "extPipe";
+ public static final String EXT_PROPERTIES_LOADER_FOLDER_NAME = "loader";
+
+ public static final String EXT_LIMITER = "limiter";
+
// mqtt
public static final String ENABLE_MQTT = "enable_mqtt_service";
public static final String MQTT_HOST_NAME = "mqtt_host";
diff --git a/pom.xml b/pom.xml
index dd24724a43..f43d7aa313 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@
<module>schema-engine-rocksdb</module>
<module>udf-api</module>
<module>rewrite-tsfile-tool</module>
+ <module>external-api</module>
</modules>
<!-- Properties Management -->
<properties>
diff --git a/server/pom.xml b/server/pom.xml
index 347268084a..9d29b53919 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -75,6 +75,11 @@
<artifactId>external-pipe-api</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>external-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 42d67f75f0..07c3b9e661 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -1096,4 +1096,27 @@ trigger_forward_http_pool_size=200
# Trigger HTTP forward pool max connection for per route
trigger_forward_http_pool_max_per_route=20
# Trigger MQTT forward pool size
-trigger_forward_mqtt_pool_size=4
\ No newline at end of file
+trigger_forward_mqtt_pool_size=4
+
+
+####################
+### External Lib Configuration
+####################
+
+# external lib directory for properties loader
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# external_properties_loader_dir=ext\\loader
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# external_properties_loader_dir=ext/loader
+
+# external lib directory for limiter
+# For Window platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
+# absolute. Otherwise, it is relative.
+# external_limiter_dir=ext\\limiter
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# external_limiter_dir=ext/limiter
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 379ba1fe6c..302fcebaf4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -62,7 +62,8 @@ public class IoTDBConfig {
/* Names of Watermark methods */
public static final String WATERMARK_GROUPED_LSB = "GroupBasedLSBMethod";
- static final String CONFIG_NAME = "iotdb-datanode.properties";
+ public static final String CONFIG_NAME = "iotdb-datanode.properties";
+ public static final String EXTERNAL_CONFIG_NAME = "iotdb-datanode-external.properties";
private static final Logger logger = LoggerFactory.getLogger(IoTDBConfig.class);
private static final String MULTI_DIR_STRATEGY_PREFIX =
"org.apache.iotdb.db.conf.directories.strategy.";
@@ -272,6 +273,16 @@ public class IoTDBConfig {
private String mqttDir =
IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.MQTT_FOLDER_NAME;
+ /** External lib directory for properties loader, stores user-uploaded JAR files */
+ private String externalPropertiesLoaderDir =
+ IoTDBConstant.EXT_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.EXT_PROPERTIES_LOADER_FOLDER_NAME;
+
+ /** External lib directory for limiter, stores user uploaded JAR files */
+ private String externalLimiterDir =
+ IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.EXT_LIMITER;
+
/** Data directories. It can be settled as dataDirs = {"data1", "data2", "data3"}; */
private String[] dataDirs = {
IoTDBConstant.DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME
@@ -1077,6 +1088,8 @@ public class IoTDBConfig {
temporaryLibDir = addHomeDir(temporaryLibDir);
triggerDir = addHomeDir(triggerDir);
mqttDir = addHomeDir(mqttDir);
+ externalPropertiesLoaderDir = addHomeDir(externalPropertiesLoaderDir);
+ externalLimiterDir = addHomeDir(externalLimiterDir);
for (int i = 0; i < walDirs.length; i++) {
walDirs[i] = addHomeDir(walDirs[i]);
}
@@ -1314,6 +1327,22 @@ public class IoTDBConfig {
this.mqttDir = mqttDir;
}
+ public String getExternalPropertiesLoaderDir() {
+ return externalPropertiesLoaderDir;
+ }
+
+ public void setExternalPropertiesLoaderDir(String externalPropertiesLoaderDir) {
+ this.externalPropertiesLoaderDir = externalPropertiesLoaderDir;
+ }
+
+ public String getExternalLimiterDir() {
+ return externalLimiterDir;
+ }
+
+ public void setExternalLimiterDir(String externalLimiterDir) {
+ this.externalLimiterDir = externalLimiterDir;
+ }
+
public String getMultiDirStrategyClassName() {
return multiDirStrategyClassName;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6376b09224..182c126c56 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -58,8 +58,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.MalformedURLException;
+import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Properties;
public class IoTDBDescriptor {
@@ -129,6 +132,52 @@ public class IoTDBDescriptor {
}
}
+ /**
+ * get props url location
+ *
+ * @return url object if location exit, otherwise null.
+ */
+ public Path getExternalPropsPath() {
+ // Check if a config-directory was specified first.
+ String urlString = System.getProperty(IoTDBConstant.IOTDB_CONF, null);
+ // If it wasn't, check if a home directory was provided (This usually contains a config)
+ if (urlString == null) {
+ urlString = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
+ if (urlString != null) {
+ urlString =
+ urlString
+ + File.separatorChar
+ + "conf"
+ + File.separatorChar
+ + IoTDBConfig.EXTERNAL_CONFIG_NAME;
+ } else {
+ // If this too wasn't provided, try to find a default config in the root of the classpath.
+ URL uri = IoTDBConfig.class.getResource("/" + IoTDBConfig.EXTERNAL_CONFIG_NAME);
+ if (uri != null) {
+ try {
+ return Paths.get(uri.toURI());
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+ logger.warn(
+ "Cannot find IOTDB_HOME or IOTDB_EXTERNAL_CONF environment variable when loading "
+ + "config file {}, use default configuration",
+ IoTDBConfig.EXTERNAL_CONFIG_NAME);
+ // update all data seriesPath
+ conf.updatePath();
+ return null;
+ }
+ }
+ // If a config location was provided, but it doesn't end with a properties file,
+ // append the default location.
+ else if (!urlString.endsWith(".properties")) {
+ urlString += (File.separatorChar + IoTDBConfig.EXTERNAL_CONFIG_NAME);
+ }
+
+ return Paths.get(urlString);
+ }
+
/** load an property file and set TsfileDBConfig variables. */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private void loadProps() {
@@ -144,796 +193,789 @@ public class IoTDBDescriptor {
Properties properties = new Properties();
properties.load(inputStream);
- conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress()));
+ loadProperties(properties);
- // TODO: Use FQDN to identify our nodes afterwards
- try {
- replaceHostnameWithIP();
- } catch (Exception e) {
- logger.info(String.format("replace hostname with ip failed, %s", e.getMessage()));
- }
+ } catch (FileNotFoundException e) {
+ logger.warn("Fail to find config file {}", url, e);
+ } catch (IOException e) {
+ logger.warn("Cannot load config file, use default configuration", e);
+ } catch (Exception e) {
+ logger.warn("Incorrect format in config file, use default configuration", e);
+ } finally {
+ // update all data seriesPath
+ conf.updatePath();
+ commonDescriptor.getConfig().updatePath(System.getProperty(IoTDBConstant.IOTDB_HOME, null));
+ MetricConfigDescriptor.getInstance()
+ .getMetricConfig()
+ .updateRpcInstance(conf.getRpcAddress(), conf.getRpcPort());
+ }
+ }
- conf.setRpcThriftCompressionEnable(
- Boolean.parseBoolean(
- properties.getProperty(
- "rpc_thrift_compression_enable",
- Boolean.toString(conf.isRpcThriftCompressionEnable()))));
+ public void loadProperties(Properties properties) {
- conf.setRpcAdvancedCompressionEnable(
- Boolean.parseBoolean(
- properties.getProperty(
- "rpc_advanced_compression_enable",
- Boolean.toString(conf.isRpcAdvancedCompressionEnable()))));
+ conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress()));
- conf.setConnectionTimeoutInMS(
- Integer.parseInt(
- properties.getProperty(
- "connection_timeout_ms", String.valueOf(conf.getConnectionTimeoutInMS()))));
+ // TODO: Use FQDN to identify our nodes afterwards
+ try {
+ replaceHostnameWithIP();
+ } catch (Exception e) {
+ logger.info(String.format("replace hostname with ip failed, %s", e.getMessage()));
+ }
- conf.setSelectorNumOfClientManager(
- Integer.parseInt(
- properties.getProperty(
- "selector_thread_nums_of_client_manager",
- String.valueOf(conf.getSelectorNumOfClientManager()))));
+ conf.setRpcThriftCompressionEnable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "rpc_thrift_compression_enable",
+ Boolean.toString(conf.isRpcThriftCompressionEnable()))));
- conf.setRpcPort(
- Integer.parseInt(
- properties.getProperty(IoTDBConstant.RPC_PORT, Integer.toString(conf.getRpcPort()))));
+ conf.setRpcAdvancedCompressionEnable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "rpc_advanced_compression_enable",
+ Boolean.toString(conf.isRpcAdvancedCompressionEnable()))));
- conf.setEnableInfluxDBRpcService(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_influxdb_rpc_service",
- Boolean.toString(conf.isEnableInfluxDBRpcService()))));
+ conf.setConnectionTimeoutInMS(
+ Integer.parseInt(
+ properties.getProperty(
+ "connection_timeout_ms", String.valueOf(conf.getConnectionTimeoutInMS()))));
- conf.setInfluxDBRpcPort(
- Integer.parseInt(
- properties.getProperty(
- "influxdb_rpc_port", Integer.toString(conf.getInfluxDBRpcPort()))));
+ conf.setSelectorNumOfClientManager(
+ Integer.parseInt(
+ properties.getProperty(
+ "selector_thread_nums_of_client_manager",
+ String.valueOf(conf.getSelectorNumOfClientManager()))));
- conf.setTimestampPrecision(
- properties.getProperty("timestamp_precision", conf.getTimestampPrecision()));
+ conf.setRpcPort(
+ Integer.parseInt(
+ properties.getProperty(IoTDBConstant.RPC_PORT, Integer.toString(conf.getRpcPort()))));
- conf.setBufferedArraysMemoryProportion(
- Double.parseDouble(
- properties.getProperty(
- "buffered_arrays_memory_proportion",
- Double.toString(conf.getBufferedArraysMemoryProportion()))));
+ conf.setEnableInfluxDBRpcService(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_influxdb_rpc_service",
+ Boolean.toString(conf.isEnableInfluxDBRpcService()))));
- conf.setFlushProportion(
- Double.parseDouble(
- properties.getProperty(
- "flush_proportion", Double.toString(conf.getFlushProportion()))));
+ conf.setInfluxDBRpcPort(
+ Integer.parseInt(
+ properties.getProperty(
+ "influxdb_rpc_port", Integer.toString(conf.getInfluxDBRpcPort()))));
- conf.setRejectProportion(
- Double.parseDouble(
- properties.getProperty(
- "reject_proportion", Double.toString(conf.getRejectProportion()))));
+ conf.setTimestampPrecision(
+ properties.getProperty("timestamp_precision", conf.getTimestampPrecision()));
- conf.setStorageGroupSizeReportThreshold(
- Long.parseLong(
- properties.getProperty(
- "storage_group_report_threshold",
- Long.toString(conf.getStorageGroupSizeReportThreshold()))));
+ conf.setBufferedArraysMemoryProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "buffered_arrays_memory_proportion",
+ Double.toString(conf.getBufferedArraysMemoryProportion()))));
- conf.setMetaDataCacheEnable(
- Boolean.parseBoolean(
- properties.getProperty(
- "meta_data_cache_enable", Boolean.toString(conf.isMetaDataCacheEnable()))));
+ conf.setFlushProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "flush_proportion", Double.toString(conf.getFlushProportion()))));
- initMemoryAllocate(properties);
+ conf.setRejectProportion(
+ Double.parseDouble(
+ properties.getProperty(
+ "reject_proportion", Double.toString(conf.getRejectProportion()))));
- loadWALProps(properties);
+ conf.setStorageGroupSizeReportThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "storage_group_report_threshold",
+ Long.toString(conf.getStorageGroupSizeReportThreshold()))));
- String systemDir = properties.getProperty("system_dir");
- if (systemDir == null) {
- systemDir = properties.getProperty("base_dir");
- if (systemDir != null) {
- systemDir = FilePathUtils.regularizePath(systemDir) + IoTDBConstant.SYSTEM_FOLDER_NAME;
- } else {
- systemDir = conf.getSystemDir();
- }
- }
- conf.setSystemDir(systemDir);
+ conf.setMetaDataCacheEnable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "meta_data_cache_enable", Boolean.toString(conf.isMetaDataCacheEnable()))));
- conf.setSchemaDir(
- FilePathUtils.regularizePath(conf.getSystemDir()) + IoTDBConstant.SCHEMA_FOLDER_NAME);
+ initMemoryAllocate(properties);
- conf.setQueryDir(
- FilePathUtils.regularizePath(conf.getSystemDir() + IoTDBConstant.QUERY_FOLDER_NAME));
+ loadWALProps(properties);
- conf.setTracingDir(properties.getProperty("tracing_dir", conf.getTracingDir()));
+ String systemDir = properties.getProperty("system_dir");
+ if (systemDir == null) {
+ systemDir = properties.getProperty("base_dir");
+ if (systemDir != null) {
+ systemDir = FilePathUtils.regularizePath(systemDir) + IoTDBConstant.SYSTEM_FOLDER_NAME;
+ } else {
+ systemDir = conf.getSystemDir();
+ }
+ }
+ conf.setSystemDir(systemDir);
- conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0]).split(","));
+ conf.setSchemaDir(
+ FilePathUtils.regularizePath(conf.getSystemDir()) + IoTDBConstant.SCHEMA_FOLDER_NAME);
- conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
+ conf.setQueryDir(
+ FilePathUtils.regularizePath(conf.getSystemDir() + IoTDBConstant.QUERY_FOLDER_NAME));
- int mlogBufferSize =
- Integer.parseInt(
- properties.getProperty(
- "mlog_buffer_size", Integer.toString(conf.getMlogBufferSize())));
- if (mlogBufferSize > 0) {
- conf.setMlogBufferSize(mlogBufferSize);
- }
+ conf.setTracingDir(properties.getProperty("tracing_dir", conf.getTracingDir()));
- long forceMlogPeriodInMs =
- Long.parseLong(
- properties.getProperty(
- "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
- if (forceMlogPeriodInMs > 0) {
- conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
- }
+ conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0]).split(","));
- conf.setMultiDirStrategyClassName(
- properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
+ conf.setConsensusDir(properties.getProperty("consensus_dir", conf.getConsensusDir()));
- conf.setBatchSize(
- Integer.parseInt(
- properties.getProperty("batch_size", Integer.toString(conf.getBatchSize()))));
+ int mlogBufferSize =
+ Integer.parseInt(
+ properties.getProperty("mlog_buffer_size", Integer.toString(conf.getMlogBufferSize())));
+ if (mlogBufferSize > 0) {
+ conf.setMlogBufferSize(mlogBufferSize);
+ }
- conf.setEnableMemControl(
- (Boolean.parseBoolean(
- properties.getProperty(
- "enable_mem_control", Boolean.toString(conf.isEnableMemControl())))));
- logger.info("IoTDB enable memory control: {}", conf.isEnableMemControl());
+ long forceMlogPeriodInMs =
+ Long.parseLong(
+ properties.getProperty(
+ "sync_mlog_period_in_ms", Long.toString(conf.getSyncMlogPeriodInMs())));
+ if (forceMlogPeriodInMs > 0) {
+ conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
+ }
- long seqTsFileSize =
- Long.parseLong(
- properties
- .getProperty("seq_tsfile_size", Long.toString(conf.getSeqTsFileSize()))
- .trim());
- if (seqTsFileSize >= 0) {
- conf.setSeqTsFileSize(seqTsFileSize);
- }
+ conf.setMultiDirStrategyClassName(
+ properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
- long unSeqTsFileSize =
- Long.parseLong(
- properties
- .getProperty("unseq_tsfile_size", Long.toString(conf.getUnSeqTsFileSize()))
- .trim());
- if (unSeqTsFileSize >= 0) {
- conf.setUnSeqTsFileSize(unSeqTsFileSize);
- }
+ conf.setBatchSize(
+ Integer.parseInt(
+ properties.getProperty("batch_size", Integer.toString(conf.getBatchSize()))));
- long memTableSizeThreshold =
- Long.parseLong(
- properties
- .getProperty(
- "memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold()))
- .trim());
- if (memTableSizeThreshold > 0) {
- conf.setMemtableSizeThreshold(memTableSizeThreshold);
- }
+ conf.setEnableMemControl(
+ (Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_mem_control", Boolean.toString(conf.isEnableMemControl())))));
+ logger.info("IoTDB enable memory control: {}", conf.isEnableMemControl());
- conf.setAvgSeriesPointNumberThreshold(
- Integer.parseInt(
- properties.getProperty(
- "avg_series_point_number_threshold",
- Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
+ long seqTsFileSize =
+ Long.parseLong(
+ properties
+ .getProperty("seq_tsfile_size", Long.toString(conf.getSeqTsFileSize()))
+ .trim());
+ if (seqTsFileSize >= 0) {
+ conf.setSeqTsFileSize(seqTsFileSize);
+ }
- conf.setCheckPeriodWhenInsertBlocked(
- Integer.parseInt(
- properties.getProperty(
- "check_period_when_insert_blocked",
- Integer.toString(conf.getCheckPeriodWhenInsertBlocked()))));
+ long unSeqTsFileSize =
+ Long.parseLong(
+ properties
+ .getProperty("unseq_tsfile_size", Long.toString(conf.getUnSeqTsFileSize()))
+ .trim());
+ if (unSeqTsFileSize >= 0) {
+ conf.setUnSeqTsFileSize(unSeqTsFileSize);
+ }
- conf.setMaxWaitingTimeWhenInsertBlocked(
- Integer.parseInt(
- properties.getProperty(
- "max_waiting_time_when_insert_blocked",
- Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
+ long memTableSizeThreshold =
+ Long.parseLong(
+ properties
+ .getProperty(
+ "memtable_size_threshold", Long.toString(conf.getMemtableSizeThreshold()))
+ .trim());
+ if (memTableSizeThreshold > 0) {
+ conf.setMemtableSizeThreshold(memTableSizeThreshold);
+ }
- conf.setIoTaskQueueSizeForFlushing(
- Integer.parseInt(
- properties.getProperty(
- "io_task_queue_size_for_flushing",
- Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
+ conf.setAvgSeriesPointNumberThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "avg_series_point_number_threshold",
+ Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
- conf.setCompactionScheduleIntervalInMs(
- Long.parseLong(
- properties.getProperty(
- "compaction_schedule_interval_in_ms",
- Long.toString(conf.getCompactionScheduleIntervalInMs()))));
+ conf.setCheckPeriodWhenInsertBlocked(
+ Integer.parseInt(
+ properties.getProperty(
+ "check_period_when_insert_blocked",
+ Integer.toString(conf.getCheckPeriodWhenInsertBlocked()))));
- conf.setCompactionSubmissionIntervalInMs(
- Long.parseLong(
- properties.getProperty(
- "compaction_submission_interval_in_ms",
- Long.toString(conf.getCompactionSubmissionIntervalInMs()))));
+ conf.setMaxWaitingTimeWhenInsertBlocked(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_waiting_time_when_insert_blocked",
+ Integer.toString(conf.getMaxWaitingTimeWhenInsertBlocked()))));
- conf.setEnableCrossSpaceCompaction(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_cross_space_compaction",
- Boolean.toString(conf.isEnableCrossSpaceCompaction()))));
+ conf.setIoTaskQueueSizeForFlushing(
+ Integer.parseInt(
+ properties.getProperty(
+ "io_task_queue_size_for_flushing",
+ Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
- conf.setEnableSeqSpaceCompaction(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_seq_space_compaction",
- Boolean.toString(conf.isEnableSeqSpaceCompaction()))));
+ conf.setCompactionScheduleIntervalInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "compaction_schedule_interval_in_ms",
+ Long.toString(conf.getCompactionScheduleIntervalInMs()))));
- conf.setEnableUnseqSpaceCompaction(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_unseq_space_compaction",
- Boolean.toString(conf.isEnableUnseqSpaceCompaction()))));
+ conf.setCompactionSubmissionIntervalInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "compaction_submission_interval_in_ms",
+ Long.toString(conf.getCompactionSubmissionIntervalInMs()))));
- conf.setCrossCompactionSelector(
- CrossCompactionSelector.getCrossCompactionSelector(
- properties.getProperty(
- "cross_selector", conf.getCrossCompactionSelector().toString())));
+ conf.setEnableCrossSpaceCompaction(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_cross_space_compaction",
+ Boolean.toString(conf.isEnableCrossSpaceCompaction()))));
- conf.setInnerSequenceCompactionSelector(
- InnerSequenceCompactionSelector.getInnerSequenceCompactionSelector(
- properties.getProperty(
- "inner_seq_selector", conf.getInnerSequenceCompactionSelector().toString())));
+ conf.setEnableSeqSpaceCompaction(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_seq_space_compaction",
+ Boolean.toString(conf.isEnableSeqSpaceCompaction()))));
- conf.setInnerUnsequenceCompactionSelector(
- InnerUnsequenceCompactionSelector.getInnerUnsequenceCompactionSelector(
- properties.getProperty(
- "inner_unseq_selector", conf.getInnerUnsequenceCompactionSelector().toString())));
+ conf.setEnableUnseqSpaceCompaction(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_unseq_space_compaction",
+ Boolean.toString(conf.isEnableUnseqSpaceCompaction()))));
- conf.setInnerSeqCompactionPerformer(
- InnerSeqCompactionPerformer.getInnerSeqCompactionPerformer(
- properties.getProperty(
- "inner_seq_performer", conf.getInnerSeqCompactionPerformer().toString())));
+ conf.setCrossCompactionSelector(
+ CrossCompactionSelector.getCrossCompactionSelector(
+ properties.getProperty(
+ "cross_selector", conf.getCrossCompactionSelector().toString())));
- conf.setInnerUnseqCompactionPerformer(
- InnerUnseqCompactionPerformer.getInnerUnseqCompactionPerformer(
- properties.getProperty(
- "inner_unseq_performer", conf.getInnerUnseqCompactionPerformer().toString())));
+ conf.setInnerSequenceCompactionSelector(
+ InnerSequenceCompactionSelector.getInnerSequenceCompactionSelector(
+ properties.getProperty(
+ "inner_seq_selector", conf.getInnerSequenceCompactionSelector().toString())));
- conf.setCrossCompactionPerformer(
- CrossCompactionPerformer.getCrossCompactionPerformer(
- properties.getProperty(
- "cross_performer", conf.getCrossCompactionPerformer().toString())));
+ conf.setInnerUnsequenceCompactionSelector(
+ InnerUnsequenceCompactionSelector.getInnerUnsequenceCompactionSelector(
+ properties.getProperty(
+ "inner_unseq_selector", conf.getInnerUnsequenceCompactionSelector().toString())));
- conf.setCompactionPriority(
- CompactionPriority.valueOf(
- properties.getProperty(
- "compaction_priority", conf.getCompactionPriority().toString())));
+ conf.setInnerSeqCompactionPerformer(
+ InnerSeqCompactionPerformer.getInnerSeqCompactionPerformer(
+ properties.getProperty(
+ "inner_seq_performer", conf.getInnerSeqCompactionPerformer().toString())));
- int subtaskNum =
- Integer.parseInt(
- properties.getProperty(
- "sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum())));
- subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum;
- conf.setSubCompactionTaskNum(subtaskNum);
+ conf.setInnerUnseqCompactionPerformer(
+ InnerUnseqCompactionPerformer.getInnerUnseqCompactionPerformer(
+ properties.getProperty(
+ "inner_unseq_performer", conf.getInnerUnseqCompactionPerformer().toString())));
- conf.setQueryTimeoutThreshold(
- Long.parseLong(
- properties.getProperty(
- "query_timeout_threshold", Long.toString(conf.getQueryTimeoutThreshold()))));
+ conf.setCrossCompactionPerformer(
+ CrossCompactionPerformer.getCrossCompactionPerformer(
+ properties.getProperty(
+ "cross_performer", conf.getCrossCompactionPerformer().toString())));
- conf.setSessionTimeoutThreshold(
- Integer.parseInt(
- properties.getProperty(
- "session_timeout_threshold",
- Integer.toString(conf.getSessionTimeoutThreshold()))));
- conf.setMaxNumberOfSyncFileRetry(
- Integer.parseInt(
- properties
- .getProperty(
- "max_number_of_sync_file_retry",
- Integer.toString(conf.getMaxNumberOfSyncFileRetry()))
- .trim()));
+ conf.setCompactionPriority(
+ CompactionPriority.valueOf(
+ properties.getProperty(
+ "compaction_priority", conf.getCompactionPriority().toString())));
- conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
+ int subtaskNum =
+ Integer.parseInt(
+ properties.getProperty(
+ "sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum())));
+ subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum;
+ conf.setSubCompactionTaskNum(subtaskNum);
- conf.setConcurrentFlushThread(
- Integer.parseInt(
- properties.getProperty(
- "concurrent_flush_thread", Integer.toString(conf.getConcurrentFlushThread()))));
+ conf.setQueryTimeoutThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "query_timeout_threshold", Long.toString(conf.getQueryTimeoutThreshold()))));
- if (conf.getConcurrentFlushThread() <= 0) {
- conf.setConcurrentFlushThread(Runtime.getRuntime().availableProcessors());
- }
+ conf.setSessionTimeoutThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "session_timeout_threshold", Integer.toString(conf.getSessionTimeoutThreshold()))));
+ conf.setMaxNumberOfSyncFileRetry(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "max_number_of_sync_file_retry",
+ Integer.toString(conf.getMaxNumberOfSyncFileRetry()))
+ .trim()));
- // start: index parameter setting
- conf.setIndexRootFolder(properties.getProperty("index_root_dir", conf.getIndexRootFolder()));
+ conf.setIpWhiteList(properties.getProperty("ip_white_list", conf.getIpWhiteList()));
- conf.setEnableIndex(
- Boolean.parseBoolean(
- properties.getProperty("enable_index", Boolean.toString(conf.isEnableIndex()))));
+ conf.setConcurrentFlushThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_flush_thread", Integer.toString(conf.getConcurrentFlushThread()))));
- conf.setConcurrentIndexBuildThread(
- Integer.parseInt(
- properties.getProperty(
- "concurrent_index_build_thread",
- Integer.toString(conf.getConcurrentIndexBuildThread()))));
- if (conf.getConcurrentIndexBuildThread() <= 0) {
- conf.setConcurrentIndexBuildThread(Runtime.getRuntime().availableProcessors());
- }
+ if (conf.getConcurrentFlushThread() <= 0) {
+ conf.setConcurrentFlushThread(Runtime.getRuntime().availableProcessors());
+ }
- conf.setDefaultIndexWindowRange(
- Integer.parseInt(
- properties.getProperty(
- "default_index_window_range",
- Integer.toString(conf.getDefaultIndexWindowRange()))));
+ // start: index parameter setting
+ conf.setIndexRootFolder(properties.getProperty("index_root_dir", conf.getIndexRootFolder()));
- conf.setConcurrentQueryThread(
- Integer.parseInt(
- properties.getProperty(
- "concurrent_query_thread", Integer.toString(conf.getConcurrentQueryThread()))));
+ conf.setEnableIndex(
+ Boolean.parseBoolean(
+ properties.getProperty("enable_index", Boolean.toString(conf.isEnableIndex()))));
- if (conf.getConcurrentQueryThread() <= 0) {
- conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
- }
+ conf.setConcurrentIndexBuildThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_index_build_thread",
+ Integer.toString(conf.getConcurrentIndexBuildThread()))));
+ if (conf.getConcurrentIndexBuildThread() <= 0) {
+ conf.setConcurrentIndexBuildThread(Runtime.getRuntime().availableProcessors());
+ }
- conf.setMaxAllowedConcurrentQueries(
- Integer.parseInt(
- properties.getProperty(
- "max_allowed_concurrent_queries",
- Integer.toString(conf.getMaxAllowedConcurrentQueries()))));
+ conf.setDefaultIndexWindowRange(
+ Integer.parseInt(
+ properties.getProperty(
+ "default_index_window_range",
+ Integer.toString(conf.getDefaultIndexWindowRange()))));
- if (conf.getMaxAllowedConcurrentQueries() <= 0) {
- conf.setMaxAllowedConcurrentQueries(1000);
- }
+ conf.setConcurrentQueryThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_query_thread", Integer.toString(conf.getConcurrentQueryThread()))));
- conf.setConcurrentSubRawQueryThread(
- Integer.parseInt(
- properties.getProperty(
- "concurrent_sub_rawQuery_thread",
- Integer.toString(conf.getConcurrentSubRawQueryThread()))));
+ if (conf.getConcurrentQueryThread() <= 0) {
+ conf.setConcurrentQueryThread(Runtime.getRuntime().availableProcessors());
+ }
- if (conf.getConcurrentSubRawQueryThread() <= 0) {
- conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
- }
+ conf.setMaxAllowedConcurrentQueries(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_allowed_concurrent_queries",
+ Integer.toString(conf.getMaxAllowedConcurrentQueries()))));
- conf.setRawQueryBlockingQueueCapacity(
- Integer.parseInt(
- properties.getProperty(
- "raw_query_blocking_queue_capacity",
- Integer.toString(conf.getRawQueryBlockingQueueCapacity()))));
+ if (conf.getMaxAllowedConcurrentQueries() <= 0) {
+ conf.setMaxAllowedConcurrentQueries(1000);
+ }
- conf.setSchemaRegionDeviceNodeCacheSize(
- Integer.parseInt(
- properties
- .getProperty(
- "schema_region_device_node_cache_size",
- Integer.toString(conf.getSchemaRegionDeviceNodeCacheSize()))
- .trim()));
+ conf.setConcurrentSubRawQueryThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_sub_rawQuery_thread",
+ Integer.toString(conf.getConcurrentSubRawQueryThread()))));
- conf.setmRemoteSchemaCacheSize(
- Integer.parseInt(
- properties
- .getProperty(
- "remote_schema_cache_size",
- Integer.toString(conf.getmRemoteSchemaCacheSize()))
- .trim()));
+ if (conf.getConcurrentSubRawQueryThread() <= 0) {
+ conf.setConcurrentSubRawQueryThread(Runtime.getRuntime().availableProcessors());
+ }
- conf.setLanguageVersion(
- properties.getProperty("language_version", conf.getLanguageVersion()).trim());
+ conf.setRawQueryBlockingQueueCapacity(
+ Integer.parseInt(
+ properties.getProperty(
+ "raw_query_blocking_queue_capacity",
+ Integer.toString(conf.getRawQueryBlockingQueueCapacity()))));
- if (properties.containsKey("chunk_buffer_pool_enable")) {
- conf.setChunkBufferPoolEnable(
- Boolean.parseBoolean(properties.getProperty("chunk_buffer_pool_enable")));
- }
+ conf.setSchemaRegionDeviceNodeCacheSize(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "schema_region_device_node_cache_size",
+ Integer.toString(conf.getSchemaRegionDeviceNodeCacheSize()))
+ .trim()));
- conf.setEnableExternalSort(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_external_sort", Boolean.toString(conf.isEnableExternalSort()))));
- conf.setExternalSortThreshold(
- Integer.parseInt(
- properties.getProperty(
- "external_sort_threshold", Integer.toString(conf.getExternalSortThreshold()))));
- conf.setUpgradeThreadNum(
- Integer.parseInt(
- properties.getProperty(
- "upgrade_thread_num", Integer.toString(conf.getUpgradeThreadNum()))));
- conf.setCrossCompactionMemoryBudget(
- Long.parseLong(
- properties.getProperty(
- "cross_compaction_memory_budget",
- Long.toString(conf.getCrossCompactionMemoryBudget()))));
- conf.setCrossCompactionFileSelectionTimeBudget(
- Long.parseLong(
- properties.getProperty(
- "cross_compaction_file_selection_time_budget",
- Long.toString(conf.getCrossCompactionFileSelectionTimeBudget()))));
- conf.setMergeIntervalSec(
- Long.parseLong(
- properties.getProperty(
- "merge_interval_sec", Long.toString(conf.getMergeIntervalSec()))));
- conf.setConcurrentCompactionThread(
- Integer.parseInt(
- properties.getProperty(
- "concurrent_compaction_thread",
- Integer.toString(conf.getConcurrentCompactionThread()))));
- conf.setTargetCompactionFileSize(
- Long.parseLong(
- properties.getProperty(
- "target_compaction_file_size",
- Long.toString(conf.getTargetCompactionFileSize()))));
- conf.setTargetChunkSize(
- Long.parseLong(
- properties.getProperty(
- "target_chunk_size", Long.toString(conf.getTargetChunkSize()))));
- conf.setTargetChunkPointNum(
- Long.parseLong(
- properties.getProperty(
- "target_chunk_point_num", Long.toString(conf.getTargetChunkPointNum()))));
- conf.setChunkPointNumLowerBoundInCompaction(
- Long.parseLong(
- properties.getProperty(
- "chunk_size_lower_bound_in_compaction",
- Long.toString(conf.getChunkPointNumLowerBoundInCompaction()))));
- conf.setChunkSizeLowerBoundInCompaction(
- Long.parseLong(
- properties.getProperty(
- "chunk_size_lower_bound_in_compaction",
- Long.toString(conf.getChunkSizeLowerBoundInCompaction()))));
- conf.setMaxInnerCompactionCandidateFileNum(
- Integer.parseInt(
- properties.getProperty(
- "max_inner_compaction_candidate_file_num",
- Integer.toString(conf.getMaxInnerCompactionCandidateFileNum()))));
- conf.setMaxCrossCompactionCandidateFileNum(
- Integer.parseInt(
- properties.getProperty(
- "max_cross_compaction_candidate_file_num",
- Integer.toString(conf.getMaxCrossCompactionCandidateFileNum()))));
+ conf.setmRemoteSchemaCacheSize(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "remote_schema_cache_size", Integer.toString(conf.getmRemoteSchemaCacheSize()))
+ .trim()));
- conf.setCompactionWriteThroughputMbPerSec(
- Integer.parseInt(
- properties.getProperty(
- "compaction_write_throughput_mb_per_sec",
- Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+ conf.setLanguageVersion(
+ properties.getProperty("language_version", conf.getLanguageVersion()).trim());
- conf.setEnablePartialInsert(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_partial_insert", String.valueOf(conf.isEnablePartialInsert()))));
+ if (properties.containsKey("chunk_buffer_pool_enable")) {
+ conf.setChunkBufferPoolEnable(
+ Boolean.parseBoolean(properties.getProperty("chunk_buffer_pool_enable")));
+ }
- int rpcSelectorThreadNum =
- Integer.parseInt(
- properties.getProperty(
- "rpc_selector_thread_num",
- Integer.toString(conf.getRpcSelectorThreadNum()).trim()));
- if (rpcSelectorThreadNum <= 0) {
- rpcSelectorThreadNum = 1;
- }
+ conf.setEnableExternalSort(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_external_sort", Boolean.toString(conf.isEnableExternalSort()))));
+ conf.setExternalSortThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "external_sort_threshold", Integer.toString(conf.getExternalSortThreshold()))));
+ conf.setUpgradeThreadNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "upgrade_thread_num", Integer.toString(conf.getUpgradeThreadNum()))));
+ conf.setCrossCompactionMemoryBudget(
+ Long.parseLong(
+ properties.getProperty(
+ "cross_compaction_memory_budget",
+ Long.toString(conf.getCrossCompactionMemoryBudget()))));
+ conf.setCrossCompactionFileSelectionTimeBudget(
+ Long.parseLong(
+ properties.getProperty(
+ "cross_compaction_file_selection_time_budget",
+ Long.toString(conf.getCrossCompactionFileSelectionTimeBudget()))));
+ conf.setMergeIntervalSec(
+ Long.parseLong(
+ properties.getProperty(
+ "merge_interval_sec", Long.toString(conf.getMergeIntervalSec()))));
+ conf.setConcurrentCompactionThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_compaction_thread",
+ Integer.toString(conf.getConcurrentCompactionThread()))));
+ conf.setTargetCompactionFileSize(
+ Long.parseLong(
+ properties.getProperty(
+ "target_compaction_file_size", Long.toString(conf.getTargetCompactionFileSize()))));
+ conf.setTargetChunkSize(
+ Long.parseLong(
+ properties.getProperty("target_chunk_size", Long.toString(conf.getTargetChunkSize()))));
+ conf.setTargetChunkPointNum(
+ Long.parseLong(
+ properties.getProperty(
+ "target_chunk_point_num", Long.toString(conf.getTargetChunkPointNum()))));
+ conf.setChunkPointNumLowerBoundInCompaction(
+ Long.parseLong(
+ properties.getProperty(
+ "chunk_size_lower_bound_in_compaction",
+ Long.toString(conf.getChunkPointNumLowerBoundInCompaction()))));
+ conf.setChunkSizeLowerBoundInCompaction(
+ Long.parseLong(
+ properties.getProperty(
+ "chunk_size_lower_bound_in_compaction",
+ Long.toString(conf.getChunkSizeLowerBoundInCompaction()))));
+ conf.setMaxInnerCompactionCandidateFileNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_inner_compaction_candidate_file_num",
+ Integer.toString(conf.getMaxInnerCompactionCandidateFileNum()))));
+ conf.setMaxCrossCompactionCandidateFileNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_cross_compaction_candidate_file_num",
+ Integer.toString(conf.getMaxCrossCompactionCandidateFileNum()))));
- conf.setRpcSelectorThreadNum(rpcSelectorThreadNum);
+ conf.setCompactionWriteThroughputMbPerSec(
+ Integer.parseInt(
+ properties.getProperty(
+ "compaction_write_throughput_mb_per_sec",
+ Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
- int minConcurrentClientNum =
- Integer.parseInt(
- properties.getProperty(
- "rpc_min_concurrent_client_num",
- Integer.toString(conf.getRpcMinConcurrentClientNum()).trim()));
- if (minConcurrentClientNum <= 0) {
- minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
- }
+ conf.setEnablePartialInsert(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_partial_insert", String.valueOf(conf.isEnablePartialInsert()))));
- conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
+ int rpcSelectorThreadNum =
+ Integer.parseInt(
+ properties.getProperty(
+ "rpc_selector_thread_num",
+ Integer.toString(conf.getRpcSelectorThreadNum()).trim()));
+ if (rpcSelectorThreadNum <= 0) {
+ rpcSelectorThreadNum = 1;
+ }
- int maxConcurrentClientNum =
- Integer.parseInt(
- properties.getProperty(
- "rpc_max_concurrent_client_num",
- Integer.toString(conf.getRpcMaxConcurrentClientNum()).trim()));
- if (maxConcurrentClientNum <= 0) {
- maxConcurrentClientNum = 65535;
- }
+ conf.setRpcSelectorThreadNum(rpcSelectorThreadNum);
- conf.setRpcMaxConcurrentClientNum(maxConcurrentClientNum);
+ int minConcurrentClientNum =
+ Integer.parseInt(
+ properties.getProperty(
+ "rpc_min_concurrent_client_num",
+ Integer.toString(conf.getRpcMinConcurrentClientNum()).trim()));
+ if (minConcurrentClientNum <= 0) {
+ minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
+ }
- conf.setEnableWatermark(
- Boolean.parseBoolean(
- properties.getProperty(
- "watermark_module_opened", Boolean.toString(conf.isEnableWatermark()).trim())));
- conf.setWatermarkSecretKey(
- properties.getProperty("watermark_secret_key", conf.getWatermarkSecretKey()));
- conf.setWatermarkBitString(
- properties.getProperty("watermark_bit_string", conf.getWatermarkBitString()));
- conf.setWatermarkMethod(
- properties.getProperty("watermark_method", conf.getWatermarkMethod()));
+ conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
- loadAutoCreateSchemaProps(properties);
+ int maxConcurrentClientNum =
+ Integer.parseInt(
+ properties.getProperty(
+ "rpc_max_concurrent_client_num",
+ Integer.toString(conf.getRpcMaxConcurrentClientNum()).trim()));
+ if (maxConcurrentClientNum <= 0) {
+ maxConcurrentClientNum = 65535;
+ }
- conf.setTsFileStorageFs(
- properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().toString()));
- conf.setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
- conf.setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
- conf.setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
- conf.setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
- conf.setDfsNameServices(
- properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
- conf.setDfsHaNamenodes(
- properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
- conf.setDfsHaAutomaticFailoverEnabled(
- Boolean.parseBoolean(
- properties.getProperty(
- "dfs_ha_automatic_failover_enabled",
- String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
- conf.setDfsClientFailoverProxyProvider(
- properties.getProperty(
- "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
- conf.setUseKerberos(
- Boolean.parseBoolean(
- properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
- conf.setKerberosKeytabFilePath(
- properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
- conf.setKerberosPrincipal(
- properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
+ conf.setRpcMaxConcurrentClientNum(maxConcurrentClientNum);
- conf.setAllowReadOnlyWhenErrorsOccur(
- Boolean.parseBoolean(
- properties.getProperty(
- "allow_read_only_when_errors_occur",
- String.valueOf(conf.isAllowReadOnlyWhenErrorsOccur()))));
+ conf.setEnableWatermark(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "watermark_module_opened", Boolean.toString(conf.isEnableWatermark()).trim())));
+ conf.setWatermarkSecretKey(
+ properties.getProperty("watermark_secret_key", conf.getWatermarkSecretKey()));
+ conf.setWatermarkBitString(
+ properties.getProperty("watermark_bit_string", conf.getWatermarkBitString()));
+ conf.setWatermarkMethod(properties.getProperty("watermark_method", conf.getWatermarkMethod()));
+
+ loadAutoCreateSchemaProps(properties);
+
+ conf.setTsFileStorageFs(
+ properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().toString()));
+ conf.setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
+ conf.setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
+ conf.setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
+ conf.setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
+ conf.setDfsNameServices(properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
+ conf.setDfsHaNamenodes(
+ properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
+ conf.setDfsHaAutomaticFailoverEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "dfs_ha_automatic_failover_enabled",
+ String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
+ conf.setDfsClientFailoverProxyProvider(
+ properties.getProperty(
+ "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
+ conf.setUseKerberos(
+ Boolean.parseBoolean(
+ properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
+ conf.setKerberosKeytabFilePath(
+ properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
+ conf.setKerberosPrincipal(
+ properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
- // the num of memtables in each storage group
- conf.setConcurrentWritingTimePartition(
- Integer.parseInt(
- properties.getProperty(
- "concurrent_writing_time_partition",
- String.valueOf(conf.getConcurrentWritingTimePartition()))));
+ conf.setAllowReadOnlyWhenErrorsOccur(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "allow_read_only_when_errors_occur",
+ String.valueOf(conf.isAllowReadOnlyWhenErrorsOccur()))));
- // the default fill interval in LinearFill and PreviousFill
- conf.setDefaultFillInterval(
- Integer.parseInt(
- properties.getProperty(
- "default_fill_interval", String.valueOf(conf.getDefaultFillInterval()))));
+ // the num of memtables in each storage group
+ conf.setConcurrentWritingTimePartition(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_writing_time_partition",
+ String.valueOf(conf.getConcurrentWritingTimePartition()))));
- conf.setTagAttributeTotalSize(
- Integer.parseInt(
- properties.getProperty(
- "tag_attribute_total_size", String.valueOf(conf.getTagAttributeTotalSize()))));
+ // the default fill interval in LinearFill and PreviousFill
+ conf.setDefaultFillInterval(
+ Integer.parseInt(
+ properties.getProperty(
+ "default_fill_interval", String.valueOf(conf.getDefaultFillInterval()))));
- conf.setTagAttributeFlushInterval(
- Integer.parseInt(
- properties.getProperty(
- "tag_attribute_flush_interval",
- String.valueOf(conf.getTagAttributeFlushInterval()))));
+ conf.setTagAttributeTotalSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "tag_attribute_total_size", String.valueOf(conf.getTagAttributeTotalSize()))));
- conf.setPrimitiveArraySize(
- (Integer.parseInt(
- properties.getProperty(
- "primitive_array_size", String.valueOf(conf.getPrimitiveArraySize())))));
+ conf.setTagAttributeFlushInterval(
+ Integer.parseInt(
+ properties.getProperty(
+ "tag_attribute_flush_interval",
+ String.valueOf(conf.getTagAttributeFlushInterval()))));
- conf.setThriftMaxFrameSize(
- Integer.parseInt(
- properties.getProperty(
- "thrift_max_frame_size", String.valueOf(conf.getThriftMaxFrameSize()))));
+ conf.setPrimitiveArraySize(
+ (Integer.parseInt(
+ properties.getProperty(
+ "primitive_array_size", String.valueOf(conf.getPrimitiveArraySize())))));
- if (conf.getThriftMaxFrameSize() < IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2) {
- conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2);
- }
+ conf.setThriftMaxFrameSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "thrift_max_frame_size", String.valueOf(conf.getThriftMaxFrameSize()))));
- conf.setThriftDefaultBufferSize(
- Integer.parseInt(
- properties.getProperty(
- "thrift_init_buffer_size", String.valueOf(conf.getThriftDefaultBufferSize()))));
+ if (conf.getThriftMaxFrameSize() < IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2) {
+ conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2);
+ }
- conf.setFrequencyIntervalInMinute(
- Integer.parseInt(
- properties.getProperty(
- "frequency_interval_in_minute",
- String.valueOf(conf.getFrequencyIntervalInMinute()))));
+ conf.setThriftDefaultBufferSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "thrift_init_buffer_size", String.valueOf(conf.getThriftDefaultBufferSize()))));
- conf.setSlowQueryThreshold(
- Long.parseLong(
- properties.getProperty(
- "slow_query_threshold", String.valueOf(conf.getSlowQueryThreshold()))));
+ conf.setFrequencyIntervalInMinute(
+ Integer.parseInt(
+ properties.getProperty(
+ "frequency_interval_in_minute",
+ String.valueOf(conf.getFrequencyIntervalInMinute()))));
- conf.setDataRegionNum(
- Integer.parseInt(
- properties.getProperty("data_region_num", String.valueOf(conf.getDataRegionNum()))));
+ conf.setSlowQueryThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "slow_query_threshold", String.valueOf(conf.getSlowQueryThreshold()))));
- conf.setRecoveryLogIntervalInMs(
- Long.parseLong(
- properties.getProperty(
- "recovery_log_interval_in_ms",
- String.valueOf(conf.getRecoveryLogIntervalInMs()))));
+ conf.setDataRegionNum(
+ Integer.parseInt(
+ properties.getProperty("data_region_num", String.valueOf(conf.getDataRegionNum()))));
- conf.setEnableDiscardOutOfOrderData(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_discard_out_of_order_data",
- Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
+ conf.setRecoveryLogIntervalInMs(
+ Long.parseLong(
+ properties.getProperty(
+ "recovery_log_interval_in_ms", String.valueOf(conf.getRecoveryLogIntervalInMs()))));
- conf.setConcurrentWindowEvaluationThread(
- Integer.parseInt(
- properties.getProperty(
- "concurrent_window_evaluation_thread",
- Integer.toString(conf.getConcurrentWindowEvaluationThread()))));
- if (conf.getConcurrentWindowEvaluationThread() <= 0) {
- conf.setConcurrentWindowEvaluationThread(Runtime.getRuntime().availableProcessors());
- }
+ conf.setEnableDiscardOutOfOrderData(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_discard_out_of_order_data",
+ Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
- conf.setMaxPendingWindowEvaluationTasks(
- Integer.parseInt(
- properties.getProperty(
- "max_pending_window_evaluation_tasks",
- Integer.toString(conf.getMaxPendingWindowEvaluationTasks()))));
- if (conf.getMaxPendingWindowEvaluationTasks() <= 0) {
- conf.setMaxPendingWindowEvaluationTasks(64);
- }
+ conf.setConcurrentWindowEvaluationThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "concurrent_window_evaluation_thread",
+ Integer.toString(conf.getConcurrentWindowEvaluationThread()))));
+ if (conf.getConcurrentWindowEvaluationThread() <= 0) {
+ conf.setConcurrentWindowEvaluationThread(Runtime.getRuntime().availableProcessors());
+ }
- // id table related configuration
- conf.setDeviceIDTransformationMethod(
- properties.getProperty(
- "device_id_transformation_method", conf.getDeviceIDTransformationMethod()));
+ conf.setMaxPendingWindowEvaluationTasks(
+ Integer.parseInt(
+ properties.getProperty(
+ "max_pending_window_evaluation_tasks",
+ Integer.toString(conf.getMaxPendingWindowEvaluationTasks()))));
+ if (conf.getMaxPendingWindowEvaluationTasks() <= 0) {
+ conf.setMaxPendingWindowEvaluationTasks(64);
+ }
- conf.setEnableIDTable(
- Boolean.parseBoolean(
- properties.getProperty("enable_id_table", String.valueOf(conf.isEnableIDTable()))));
+ // id table related configuration
+ conf.setDeviceIDTransformationMethod(
+ properties.getProperty(
+ "device_id_transformation_method", conf.getDeviceIDTransformationMethod()));
- conf.setEnableIDTableLogFile(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_id_table_log_file", String.valueOf(conf.isEnableIDTableLogFile()))));
+ conf.setEnableIDTable(
+ Boolean.parseBoolean(
+ properties.getProperty("enable_id_table", String.valueOf(conf.isEnableIDTable()))));
- conf.setSchemaEngineMode(
- properties.getProperty("schema_engine_mode", String.valueOf(conf.getSchemaEngineMode())));
+ conf.setEnableIDTableLogFile(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_id_table_log_file", String.valueOf(conf.isEnableIDTableLogFile()))));
- conf.setEnableLastCache(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_last_cache", Boolean.toString(conf.isLastCacheEnabled()))));
+ conf.setSchemaEngineMode(
+ properties.getProperty("schema_engine_mode", String.valueOf(conf.getSchemaEngineMode())));
- if (conf.getSchemaEngineMode().equals("Rocksdb_based")) {
- conf.setEnableLastCache(false);
- }
+ conf.setEnableLastCache(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_last_cache", Boolean.toString(conf.isLastCacheEnabled()))));
- conf.setCachedMNodeSizeInSchemaFileMode(
- Integer.parseInt(
- properties.getProperty(
- "cached_mnode_size_in_schema_file_mode",
- String.valueOf(conf.getCachedMNodeSizeInSchemaFileMode()))));
+ if (conf.getSchemaEngineMode().equals("Rocksdb_based")) {
+ conf.setEnableLastCache(false);
+ }
- conf.setMinimumSegmentInSchemaFile(
- Short.parseShort(
- properties.getProperty(
- "minimum_schema_file_segment_in_bytes",
- String.valueOf(conf.getMinimumSegmentInSchemaFile()))));
+ conf.setCachedMNodeSizeInSchemaFileMode(
+ Integer.parseInt(
+ properties.getProperty(
+ "cached_mnode_size_in_schema_file_mode",
+ String.valueOf(conf.getCachedMNodeSizeInSchemaFileMode()))));
- conf.setPageCacheSizeInSchemaFile(
- Short.parseShort(
- properties.getProperty(
- "page_cache_in_schema_file",
- String.valueOf(conf.getPageCacheSizeInSchemaFile()))));
+ conf.setMinimumSegmentInSchemaFile(
+ Short.parseShort(
+ properties.getProperty(
+ "minimum_schema_file_segment_in_bytes",
+ String.valueOf(conf.getMinimumSegmentInSchemaFile()))));
- // mqtt
- loadMqttProps(properties);
+ conf.setPageCacheSizeInSchemaFile(
+ Short.parseShort(
+ properties.getProperty(
+ "page_cache_in_schema_file", String.valueOf(conf.getPageCacheSizeInSchemaFile()))));
- conf.setEnablePartition(
- Boolean.parseBoolean(
- properties.getProperty(
- "enable_partition", String.valueOf(conf.isEnablePartition()))));
+ // mqtt
+ loadMqttProps(properties);
- conf.setPartitionInterval(
- Long.parseLong(
- properties.getProperty(
- "partition_interval", String.valueOf(conf.getPartitionInterval()))));
+ conf.setEnablePartition(
+ Boolean.parseBoolean(
+ properties.getProperty("enable_partition", String.valueOf(conf.isEnablePartition()))));
- conf.setSelectIntoInsertTabletPlanRowLimit(
- Integer.parseInt(
- properties.getProperty(
- "select_into_insert_tablet_plan_row_limit",
- String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+ conf.setPartitionInterval(
+ Long.parseLong(
+ properties.getProperty(
+ "partition_interval", String.valueOf(conf.getPartitionInterval()))));
- conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
+ conf.setSelectIntoInsertTabletPlanRowLimit(
+ Integer.parseInt(
+ properties.getProperty(
+ "select_into_insert_tablet_plan_row_limit",
+ String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
- conf.setInsertMultiTabletEnableMultithreadingColumnThreshold(
- Integer.parseInt(
- properties.getProperty(
- "insert_multi_tablet_enable_multithreading_column_threshold",
- String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
+ conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
- // At the same time, set TSFileConfig
- TSFileDescriptor.getInstance()
- .getConfig()
- .setTSFileStorageFs(
- FSType.valueOf(
- properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name())));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setDfsNameServices(
- properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setDfsHaNamenodes(
- properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setDfsHaAutomaticFailoverEnabled(
- Boolean.parseBoolean(
- properties.getProperty(
- "dfs_ha_automatic_failover_enabled",
- String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setDfsClientFailoverProxyProvider(
- properties.getProperty(
- "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setUseKerberos(
- Boolean.parseBoolean(
- properties.getProperty(
- "hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setKerberosKeytabFilePath(
- properties.getProperty(
- "kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
- TSFileDescriptor.getInstance()
- .getConfig()
- .setKerberosPrincipal(
- properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
- TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+ conf.setInsertMultiTabletEnableMultithreadingColumnThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "insert_multi_tablet_enable_multithreading_column_threshold",
+ String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
- conf.setCoordinatorReadExecutorSize(
- Integer.parseInt(
- properties.getProperty(
- "coordinator_read_executor_size",
- Integer.toString(conf.getCoordinatorReadExecutorSize()))));
- conf.setCoordinatorWriteExecutorSize(
- Integer.parseInt(
- properties.getProperty(
- "coordinator_write_executor_size",
- Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+ // At the same time, set TSFileConfig
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setTSFileStorageFs(
+ FSType.valueOf(
+ properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name())));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setHdfsPort(properties.getProperty("hdfs_port", conf.getHdfsPort()));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setDfsNameServices(properties.getProperty("dfs_nameservices", conf.getDfsNameServices()));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setDfsHaNamenodes(
+ properties.getProperty("dfs_ha_namenodes", conf.getRawDfsHaNamenodes()).split(","));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setDfsHaAutomaticFailoverEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "dfs_ha_automatic_failover_enabled",
+ String.valueOf(conf.isDfsHaAutomaticFailoverEnabled()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setDfsClientFailoverProxyProvider(
+ properties.getProperty(
+ "dfs_client_failover_proxy_provider", conf.getDfsClientFailoverProxyProvider()));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setUseKerberos(
+ Boolean.parseBoolean(
+ properties.getProperty("hdfs_use_kerberos", String.valueOf(conf.isUseKerberos()))));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setKerberosKeytabFilePath(
+ properties.getProperty("kerberos_keytab_file_path", conf.getKerberosKeytabFilePath()));
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setKerberosPrincipal(
+ properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
+ TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
- // commons
- commonDescriptor.loadCommonProps(properties);
- commonDescriptor.initCommonConfigDir(conf.getSystemDir());
+ conf.setCoordinatorReadExecutorSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "coordinator_read_executor_size",
+ Integer.toString(conf.getCoordinatorReadExecutorSize()))));
+ conf.setCoordinatorWriteExecutorSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "coordinator_write_executor_size",
+ Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
- // timed flush memtable
- loadTimedService(properties);
+ // commons
+ commonDescriptor.loadCommonProps(properties);
+ commonDescriptor.initCommonConfigDir(conf.getSystemDir());
- // set tsfile-format config
- loadTsFileProps(properties);
+ // timed flush memtable
+ loadTimedService(properties);
- // make RPCTransportFactory taking effect.
- RpcTransportFactory.reInit();
+ // set tsfile-format config
+ loadTsFileProps(properties);
- // UDF
- loadUDFProps(properties);
+ // make RPCTransportFactory taking effect.
+ RpcTransportFactory.reInit();
- // trigger
- loadTriggerProps(properties);
+ // UDF
+ loadUDFProps(properties);
- // CQ
- loadCQProps(properties);
+ // trigger
+ loadTriggerProps(properties);
- // cluster
- loadClusterProps(properties);
+ // CQ
+ loadCQProps(properties);
- // shuffle
- loadShuffleProps(properties);
+ // cluster
+ loadClusterProps(properties);
- // author cache
- loadAuthorCache(properties);
- } catch (FileNotFoundException e) {
- logger.warn("Fail to find config file {}", url, e);
- } catch (IOException e) {
- logger.warn("Cannot load config file, use default configuration", e);
- } catch (Exception e) {
- logger.warn("Incorrect format in config file, use default configuration", e);
- } finally {
- // update all data seriesPath
- conf.updatePath();
- commonDescriptor.getConfig().updatePath(System.getProperty(IoTDBConstant.IOTDB_HOME, null));
- MetricConfigDescriptor.getInstance()
- .getMetricConfig()
- .updateRpcInstance(conf.getRpcAddress(), conf.getRpcPort());
- }
+ // shuffle
+ loadShuffleProps(properties);
+
+ // author cache
+ loadAuthorCache(properties);
}
private void loadAuthorCache(Properties properties) {
@@ -1270,6 +1312,16 @@ public class IoTDBDescriptor {
}
}
+ private void loadExternalLibProps(Properties properties) {
+
+ conf.setExternalPropertiesLoaderDir(
+ properties.getProperty(
+ "external_properties_loader_dir", conf.getExternalPropertiesLoaderDir()));
+
+ conf.setExternalLimiterDir(
+ properties.getProperty("external_limiter_dir", conf.getExternalLimiterDir()));
+ }
+
// timed flush memtable
private void loadTimedService(Properties properties) {
conf.setEnableTimedFlushSeqMemtable(
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesNumberOverflowException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesNumberOverflowException.java
new file mode 100644
index 0000000000..7f5e2b4e17
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/SeriesNumberOverflowException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class SeriesNumberOverflowException extends MetadataException {
+
+ public SeriesNumberOverflowException() {
+ super("exceed max allowed series number.", TSStatusCode.SERIES_OVERFLOW.getStatusCode());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 46e5903514..bc535516d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apche.iotdb.external.api.ISeriesNumerLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -69,6 +71,23 @@ public class SchemaEngine {
private ScheduledExecutorService timedForceMLogThread;
+ private ISeriesNumerLimiter seriesNumerLimiter =
+ new ISeriesNumerLimiter() {
+ @Override
+ public void init(Properties properties) {}
+
+ @Override
+ public boolean addTimeSeries(int number) {
+ // always return true, don't limit the number of series
+ return true;
+ }
+
+ @Override
+ public void deleteTimeSeries(int number) {
+ // do nothing
+ }
+ };
+
public TSStatus write(SchemaRegionId schemaRegionId, PlanNode planNode) {
return planNode.accept(new SchemaExecutionVisitor(), schemaRegionMap.get(schemaRegionId));
}
@@ -290,11 +309,14 @@ public class SchemaEngine {
IStorageGroupMNode storageGroupMNode = ensureStorageGroupByStorageGroupPath(storageGroup);
switch (this.schemaRegionStoredMode) {
case Memory:
- schemaRegion = new SchemaRegionMemoryImpl(storageGroup, schemaRegionId, storageGroupMNode);
+ schemaRegion =
+ new SchemaRegionMemoryImpl(
+ storageGroup, schemaRegionId, storageGroupMNode, seriesNumerLimiter);
break;
case Schema_File:
schemaRegion =
- new SchemaRegionSchemaFileImpl(storageGroup, schemaRegionId, storageGroupMNode);
+ new SchemaRegionSchemaFileImpl(
+ storageGroup, schemaRegionId, storageGroupMNode, seriesNumerLimiter);
break;
case Rocksdb_based:
schemaRegion =
@@ -363,4 +385,8 @@ public class SchemaEngine {
sharedPrefixTree.deleteStorageGroup(new PartialPath(schemaRegion.getStorageGroupFullPath()));
}
}
+
+ public void setSeriesNumerLimiter(ISeriesNumerLimiter seriesNumerLimiter) {
+ this.seriesNumerLimiter = seriesNumerLimiter;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
index a4c2c8a8fb..6c70c6865c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SeriesNumberOverflowException;
import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
@@ -90,6 +91,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apche.iotdb.external.api.ISeriesNumerLimiter;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -178,9 +180,14 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
private TagManager tagManager;
private SchemaSyncManager syncManager = SchemaSyncManager.getInstance();
+ private final ISeriesNumerLimiter seriesNumerLimiter;
+
// region Interfaces and Implementation of initialization、snapshot、recover and clear
public SchemaRegionMemoryImpl(
- PartialPath storageGroup, SchemaRegionId schemaRegionId, IStorageGroupMNode storageGroupMNode)
+ PartialPath storageGroup,
+ SchemaRegionId schemaRegionId,
+ IStorageGroupMNode storageGroupMNode,
+ ISeriesNumerLimiter seriesNumerLimiter)
throws MetadataException {
storageGroupFullPath = storageGroup.getFullPath();
@@ -214,6 +221,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
}
}
+ this.seriesNumerLimiter = seriesNumerLimiter;
+
init();
}
@@ -454,7 +463,9 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
+ int seriesCount = leafMNodes.size();
+ schemaStatisticsManager.deleteTimeseries(seriesCount);
+ seriesNumerLimiter.deleteTimeSeries(seriesCount);
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -582,23 +593,35 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
throw new SeriesOverflowException();
}
+ if (!seriesNumerLimiter.addTimeSeries(1)) {
+ throw new SeriesNumberOverflowException();
+ }
+
try {
- PartialPath path = plan.getPath();
- SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
-
- TSDataType type = plan.getDataType();
- // create time series in MTree
- IMeasurementMNode leafMNode =
- mtree.createTimeseries(
- path,
- type,
- plan.getEncoding(),
- plan.getCompressor(),
- plan.getProps(),
- plan.getAlias());
-
- // the cached mNode may be replaced by new entityMNode in mtree
- mNodeCache.invalidate(path.getDevicePath());
+ IMeasurementMNode leafMNode;
+
+ // using try-catch to restore seriesNumerLimiter's state while create failed
+ try {
+ PartialPath path = plan.getPath();
+ SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
+
+ TSDataType type = plan.getDataType();
+ // create time series in MTree
+ leafMNode =
+ mtree.createTimeseries(
+ path,
+ type,
+ plan.getEncoding(),
+ plan.getCompressor(),
+ plan.getProps(),
+ plan.getAlias());
+
+ // the cached mNode may be replaced by new entityMNode in mtree
+ mNodeCache.invalidate(path.getDevicePath());
+ } catch (Throwable t) {
+ seriesNumerLimiter.deleteTimeSeries(1);
+ throw t;
+ }
// update statistics and schemaDataTypeNumMap
schemaStatisticsManager.addTimeseries(1);
@@ -688,10 +711,15 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
* @param plan CreateAlignedTimeSeriesPlan
*/
public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+ int seriesCount = plan.getMeasurements().size();
if (!memoryStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException();
}
+ if (!seriesNumerLimiter.addTimeSeries(seriesCount)) {
+ throw new SeriesNumberOverflowException();
+ }
+
try {
PartialPath prefixPath = plan.getPrefixPath();
List<String> measurements = plan.getMeasurements();
@@ -699,26 +727,33 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
List<TSEncoding> encodings = plan.getEncodings();
List<Map<String, String>> tagsList = plan.getTagsList();
List<Map<String, String>> attributesList = plan.getAttributesList();
+ List<IMeasurementMNode> measurementMNodeList;
- for (int i = 0; i < measurements.size(); i++) {
- SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
- }
-
- // create time series in MTree
- List<IMeasurementMNode> measurementMNodeList =
- mtree.createAlignedTimeseries(
- prefixPath,
- measurements,
- plan.getDataTypes(),
- plan.getEncodings(),
- plan.getCompressors(),
- plan.getAliasList());
+ // using try-catch to restore seriesNumerLimiter's state while create failed
+ try {
+ for (int i = 0; i < measurements.size(); i++) {
+ SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
+ }
- // the cached mNode may be replaced by new entityMNode in mtree
- mNodeCache.invalidate(prefixPath);
+ // create time series in MTree
+ measurementMNodeList =
+ mtree.createAlignedTimeseries(
+ prefixPath,
+ measurements,
+ plan.getDataTypes(),
+ plan.getEncodings(),
+ plan.getCompressors(),
+ plan.getAliasList());
+
+ // the cached mNode may be replaced by new entityMNode in mtree
+ mNodeCache.invalidate(prefixPath);
+ } catch (Throwable t) {
+ seriesNumerLimiter.deleteTimeSeries(seriesCount);
+ throw t;
+ }
// update statistics and schemaDataTypeNumMap
- schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(seriesCount);
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -861,6 +896,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
schemaStatisticsManager.deleteTimeseries(1);
+ seriesNumerLimiter.deleteTimeSeries(1);
return storageGroupPath;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 57129f5063..0a1fe43f4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.exception.metadata.SeriesNumberOverflowException;
import org.apache.iotdb.db.exception.metadata.SeriesOverflowException;
import org.apache.iotdb.db.exception.metadata.template.DifferentTemplateException;
import org.apache.iotdb.db.exception.metadata.template.NoTemplateOnMNodeException;
@@ -87,6 +88,7 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apche.iotdb.external.api.ISeriesNumerLimiter;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -172,9 +174,14 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
private TagManager tagManager;
private SchemaSyncManager syncManager = SchemaSyncManager.getInstance();
+ private final ISeriesNumerLimiter seriesNumerLimiter;
+
// region Interfaces and Implementation of initialization、snapshot、recover and clear
public SchemaRegionSchemaFileImpl(
- PartialPath storageGroup, SchemaRegionId schemaRegionId, IStorageGroupMNode storageGroupMNode)
+ PartialPath storageGroup,
+ SchemaRegionId schemaRegionId,
+ IStorageGroupMNode storageGroupMNode,
+ ISeriesNumerLimiter seriesNumerLimiter)
throws MetadataException {
storageGroupFullPath = storageGroup.getFullPath();
@@ -200,6 +207,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
}
});
this.storageGroupMNode = storageGroupMNode;
+ this.seriesNumerLimiter = seriesNumerLimiter;
init();
}
@@ -413,7 +421,9 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
// collect all the LeafMNode in this schema region
List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode();
- schemaStatisticsManager.deleteTimeseries(leafMNodes.size());
+ int seriesCount = leafMNodes.size();
+ schemaStatisticsManager.deleteTimeseries(seriesCount);
+ seriesNumerLimiter.deleteTimeSeries(seriesCount);
// drop triggers with no exceptions
TriggerEngine.drop(leafMNodes);
@@ -475,20 +485,31 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
throw new SeriesOverflowException();
}
+ if (!seriesNumerLimiter.addTimeSeries(1)) {
+ throw new SeriesNumberOverflowException();
+ }
+
try {
PartialPath path = plan.getPath();
- SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
-
- TSDataType type = plan.getDataType();
- // create time series in MTree
- IMeasurementMNode leafMNode =
- mtree.createTimeseriesWithPinnedReturn(
- path,
- type,
- plan.getEncoding(),
- plan.getCompressor(),
- plan.getProps(),
- plan.getAlias());
+ IMeasurementMNode leafMNode;
+ // using try-catch to restore seriesNumerLimiter's state while create failed
+ try {
+ SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
+
+ TSDataType type = plan.getDataType();
+ // create time series in MTree
+ leafMNode =
+ mtree.createTimeseriesWithPinnedReturn(
+ path,
+ type,
+ plan.getEncoding(),
+ plan.getCompressor(),
+ plan.getProps(),
+ plan.getAlias());
+ } catch (Throwable t) {
+ seriesNumerLimiter.deleteTimeSeries(1);
+ throw t;
+ }
try {
// the cached mNode may be replaced by new entityMNode in mtree
@@ -606,10 +627,15 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
* @param plan CreateAlignedTimeSeriesPlan
*/
public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+ int seriesCount = plan.getMeasurements().size();
if (!memoryStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException();
}
+ if (!seriesNumerLimiter.addTimeSeries(seriesCount)) {
+ throw new SeriesNumberOverflowException();
+ }
+
try {
PartialPath prefixPath = plan.getPrefixPath();
List<String> measurements = plan.getMeasurements();
@@ -617,27 +643,33 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
List<TSEncoding> encodings = plan.getEncodings();
List<Map<String, String>> tagsList = plan.getTagsList();
List<Map<String, String>> attributesList = plan.getAttributesList();
+ List<IMeasurementMNode> measurementMNodeList;
+ // using try-catch to restore seriesNumerLimiter's state while create failed
+ try {
+ for (int i = 0; i < measurements.size(); i++) {
+ SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
+ }
- for (int i = 0; i < measurements.size(); i++) {
- SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
+ // create time series in MTree
+ measurementMNodeList =
+ mtree.createAlignedTimeseries(
+ prefixPath,
+ measurements,
+ plan.getDataTypes(),
+ plan.getEncodings(),
+ plan.getCompressors(),
+ plan.getAliasList());
+ } catch (Throwable t) {
+ seriesNumerLimiter.deleteTimeSeries(seriesCount);
+ throw t;
}
- // create time series in MTree
- List<IMeasurementMNode> measurementMNodeList =
- mtree.createAlignedTimeseries(
- prefixPath,
- measurements,
- plan.getDataTypes(),
- plan.getEncodings(),
- plan.getCompressors(),
- plan.getAliasList());
-
try {
// the cached mNode may be replaced by new entityMNode in mtree
mNodeCache.invalidate(prefixPath);
// update statistics and schemaDataTypeNumMap
- schemaStatisticsManager.addTimeseries(plan.getMeasurements().size());
+ schemaStatisticsManager.addTimeseries(seriesCount);
List<Long> tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
@@ -786,6 +818,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
mNodeCache.invalidate(node.getPartialPath());
schemaStatisticsManager.deleteTimeseries(1);
+ seriesNumerLimiter.deleteTimeSeries(1);
return storageGroupPath;
}
// endregion
diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
index 0cd38ab9dd..b666c8be2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
@@ -30,10 +30,11 @@ import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.ServiceLoader;
+import static org.apache.iotdb.db.utils.JarLoaderUtil.getExternalJarURLs;
+
/** PayloadFormatManager loads payload formatter from SPI services. */
public class PayloadFormatManager {
private static final Logger logger = LoggerFactory.getLogger(PayloadFormatManager.class);
@@ -73,14 +74,6 @@ public class PayloadFormatManager {
FileUtils.forceMkdir(file);
}
- private static URL[] getPlugInJarURLs() throws IOException {
- HashSet<File> fileSet =
- new HashSet<>(
- FileUtils.listFiles(
- SystemFileFactory.INSTANCE.getFile(mqttDir), new String[] {"jar"}, true));
- return FileUtils.toURLs(fileSet.toArray(new File[0]));
- }
-
private static void buildMqttPluginMap() throws IOException {
ServiceLoader<PayloadFormatter> payloadFormatters = ServiceLoader.load(PayloadFormatter.class);
for (PayloadFormatter formatter : payloadFormatters) {
@@ -94,7 +87,7 @@ public class PayloadFormatManager {
logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName);
}
- URL[] jarURLs = getPlugInJarURLs();
+ URL[] jarURLs = getExternalJarURLs(mqttDir);
logger.debug("MQTT Plugin jarURLs: {}", jarURLs);
for (URL jarUrl : jarURLs) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index ff41b8e2d4..7850de4b43 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -57,6 +57,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import static org.apache.iotdb.db.utils.JarLoaderUtil.loadExternLib;
+
public class IoTDB implements IoTDBMBean {
private static final Logger logger = LoggerFactory.getLogger(IoTDB.class);
@@ -81,6 +83,9 @@ public class IoTDB implements IoTDBMBean {
System.exit(1);
}
IoTDB daemon = IoTDB.getInstance();
+
+ loadExternLib(config);
+
daemon.active();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
index a0ceaade07..4cf7ac0a0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
@@ -57,6 +57,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import static org.apache.iotdb.db.utils.JarLoaderUtil.loadExternLib;
+
public class NewIoTDB implements NewIoTDBMBean {
private static final Logger logger = LoggerFactory.getLogger(NewIoTDB.class);
@@ -81,6 +83,9 @@ public class NewIoTDB implements NewIoTDBMBean {
}
NewIoTDB daemon = NewIoTDB.getInstance();
config.setMppMode(true);
+
+ loadExternLib(config);
+
daemon.active();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/JarLoaderUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/JarLoaderUtil.java
new file mode 100644
index 0000000000..810d16e7fc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/JarLoaderUtil.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.apache.commons.io.FileUtils;
+import org.apche.iotdb.external.api.IPropertiesLoader;
+import org.apche.iotdb.external.api.ISeriesNumerLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+public class JarLoaderUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(JarLoaderUtil.class);
+
+ public static URL[] getExternalJarURLs(String jarDir) throws IOException {
+ HashSet<File> fileSet =
+ new HashSet<>(
+ FileUtils.listFiles(
+ SystemFileFactory.INSTANCE.getFile(jarDir), new String[] {"jar"}, true));
+ return FileUtils.toURLs(fileSet.toArray(new File[0]));
+ }
+
+ public static void loadExternLib(IoTDBConfig config) {
+ // load external properties
+ String loaderDir = config.getExternalPropertiesLoaderDir();
+
+ if (!(new File(loaderDir).exists())) {
+ return;
+ }
+
+ Path externalPropertiesFile = IoTDBDescriptor.getInstance().getExternalPropsPath();
+ URL[] loaderJarURLs;
+ List<Properties> externalPropertiesList = new ArrayList<>();
+ try {
+ loaderJarURLs = getExternalJarURLs(loaderDir);
+
+ if (loaderJarURLs == null || loaderJarURLs.length == 0) {
+ return;
+ }
+
+ ClassLoader classLoader = new URLClassLoader(loaderJarURLs);
+
+ // Use SPI to get all plugins' class
+ ServiceLoader<IPropertiesLoader> loaders =
+ ServiceLoader.load(IPropertiesLoader.class, classLoader);
+
+ for (IPropertiesLoader loader : loaders) {
+ if (loader == null) {
+ logger.error("IPropertiesLoader(), loader is null.");
+ continue;
+ }
+ Properties properties = loader.loadProperties(externalPropertiesFile.toAbsolutePath());
+ if (properties != null) {
+ externalPropertiesList.add(properties);
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("error happened while loading external loader. ", t);
+ // ignore
+ }
+
+ if (externalPropertiesList.size() != 1) {
+ return;
+ }
+
+ // overwrite the default properties;
+ for (Properties properties : externalPropertiesList) {
+ IoTDBDescriptor.getInstance().loadProperties(properties);
+ TSFileDescriptor.getInstance()
+ .overwriteConfigByCustomSettings(TSFileDescriptor.getInstance().getConfig(), properties);
+ }
+
+ String limiterDir = config.getExternalLimiterDir();
+
+ if (!(new File(loaderDir).exists())) {
+ return;
+ }
+
+ URL[] limiterJarURLs;
+
+ List<ISeriesNumerLimiter> limiterList = new ArrayList<>();
+
+ try {
+ limiterJarURLs = getExternalJarURLs(limiterDir);
+
+ if (limiterJarURLs == null || limiterJarURLs.length == 0) {
+ return;
+ }
+
+ ClassLoader classLoader = new URLClassLoader(limiterJarURLs);
+
+ // Use SPI to get all plugins' class
+ ServiceLoader<ISeriesNumerLimiter> limiters =
+ ServiceLoader.load(ISeriesNumerLimiter.class, classLoader);
+
+ for (ISeriesNumerLimiter limiter : limiters) {
+ if (limiter == null) {
+ logger.error("ISeriesNumerLimiter(), limiter is null.");
+ continue;
+ }
+ for (Properties properties : externalPropertiesList) {
+ limiter.init(properties);
+ }
+ limiterList.add(limiter);
+ }
+ } catch (Throwable t) {
+ // ignore
+ logger.error("error happened while loading external limiter. ", t);
+ }
+
+ if (limiterList.size() != 1) {
+ return;
+ }
+
+ SchemaEngine.getInstance().setSeriesNumerLimiter(limiterList.get(0));
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
index 965a12eefa..0ee217c32e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileDescriptor.java
@@ -61,7 +61,7 @@ public class TSFileDescriptor {
}
}
- private void overwriteConfigByCustomSettings(TSFileConfig conf, Properties properties) {
+ public void overwriteConfigByCustomSettings(TSFileConfig conf, Properties properties) {
PropertiesOverWriter writer = new PropertiesOverWriter(properties);
writer.setInt(conf::setGroupSizeInByte, "group_size_in_byte");