You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/12 03:05:25 UTC
[iotdb] branch master updated: Rocksdb-based metadata storage (#5295)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c6634a0df3 Rocksdb-based metadata storage (#5295)
c6634a0df3 is described below
commit c6634a0df3c56d88d673028891741a34820b30f3
Author: lisijia <44...@users.noreply.github.com>
AuthorDate: Tue Apr 12 11:05:21 2022 +0800
Rocksdb-based metadata storage (#5295)
---
server/pom.xml | 5 +
.../resources/conf/iotdb-engine.properties | 2 +-
.../assembly/resources/tools/rocksdb-transfer.bat | 126 ++
.../assembly/resources/tools/rocksdb-transfer.sh | 82 +
.../statemachine/SchemaRegionStateMachine.java | 6 +-
.../metadata/AcquireLockTimeoutException.java} | 9 +-
.../iotdb/db/metadata/LocalConfigManager.java | 30 +-
.../iotdb/db/metadata/LocalSchemaProcessor.java | 42 +-
.../iotdb/db/metadata/mnode/MeasurementMNode.java | 2 +-
.../db/metadata/schemaregion/ISchemaRegion.java | 193 ++
.../db/metadata/schemaregion/SchemaEngine.java | 35 +-
.../db/metadata/schemaregion/SchemaEngineMode.java | 3 +-
.../db/metadata/schemaregion/SchemaRegion.java | 32 +-
.../metadata/schemaregion/SchemaRegionUtils.java | 59 +
.../schemaregion/rocksdb/CheckKeyResult.java} | 39 +-
.../schemaregion/rocksdb/RSchemaConstants.java | 76 +
.../schemaregion/rocksdb/RSchemaLogger.java | 61 +
.../rocksdb/RSchemaReadWriteHandler.java | 514 +++++
.../schemaregion/rocksdb/RSchemaRegion.java | 1962 ++++++++++++++++++++
.../schemaregion/rocksdb/RSchemaUtils.java | 592 ++++++
.../schemaregion/rocksdb/mnode/REntityMNode.java | 124 ++
.../schemaregion/rocksdb/mnode/RInternalMNode.java | 197 ++
.../schemaregion/rocksdb/mnode/RMNode.java | 234 +++
.../schemaregion/rocksdb/mnode/RMNodeType.java | 49 +
.../rocksdb/mnode/RMNodeValueType.java | 47 +
.../rocksdb/mnode/RMeasurementMNode.java} | 185 +-
.../rocksdb/mnode/RStorageGroupMNode.java | 102 +
.../apache/iotdb/db/service/IoTDBShutdownHook.java | 6 +
.../schemaregion/rocksdb/MRocksDBBenchmark.java | 98 +
.../schemaregion/rocksdb/MRocksDBUnitTest.java | 238 +++
.../rocksdb/RSchemaReadWriteHandlerTest.java | 77 +
.../rocksdb/RSchemaRegionAdvancedTest.java | 167 ++
.../rocksdb/RocksDBBenchmarkEngine.java | 153 ++
.../schemaregion/rocksdb/RocksDBBenchmarkTask.java | 109 ++
.../schemaregion/rocksdb/RocksDBTestUtils.java | 72 +
.../iotdb/tsfile/utils/ReadWriteIOUtils.java | 8 +-
36 files changed, 5555 insertions(+), 181 deletions(-)
diff --git a/server/pom.xml b/server/pom.xml
index 8514095349..55ffa494c9 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -262,6 +262,11 @@
<version>2.6</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ <version>6.27.3</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index dc2a019158..de12695ec6 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -962,7 +962,7 @@ timestamp_precision=ms
####################
### Schema Engine Configuration
####################
-# Choose the mode of schema engine. The value could be Memory and Schema_File. If the provided value doesn't match any pre-defined value, Memory mode will be used as default.
+# Choose the mode of schema engine. The value could be Memory,Schema_File and Rocksdb_based. If the provided value doesn't match any pre-defined value, Memory mode will be used as default.
# Datatype: string
# schema_engine_mode=Memory
diff --git a/server/src/assembly/resources/tools/rocksdb-transfer.bat b/server/src/assembly/resources/tools/rocksdb-transfer.bat
new file mode 100644
index 0000000000..89765a7f7e
--- /dev/null
+++ b/server/src/assembly/resources/tools/rocksdb-transfer.bat
@@ -0,0 +1,126 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM
+
+@echo off
+echo ````````````````````````
+echo Use the script to transfer metadata managed by default(pure memory) SchemaEngine to RocksDB based SchemaEngine
+echo ````````````````````````
+
+
+set PATH="%JAVA_HOME%\bin\";%PATH%
+set "FULL_VERSION="
+set "MAJOR_VERSION="
+set "MINOR_VERSION="
+
+
+for /f tokens^=2-5^ delims^=.-_+^" %%j in ('java -fullversion 2^>^&1') do (
+ set "FULL_VERSION=%%j-%%k-%%l-%%m"
+ IF "%%j" == "1" (
+ set "MAJOR_VERSION=%%k"
+ set "MINOR_VERSION=%%l"
+ ) else (
+ set "MAJOR_VERSION=%%j"
+ set "MINOR_VERSION=%%k"
+ )
+)
+
+set JAVA_VERSION=%MAJOR_VERSION%
+
+@REM we do not check jdk that version less than 1.8 because they are too stale...
+IF "%JAVA_VERSION%" == "6" (
+ echo IoTDB only supports jdk >= 8, please check your java version.
+ goto finally
+)
+IF "%JAVA_VERSION%" == "7" (
+ echo IoTDB only supports jdk >= 8, please check your java version.
+ goto finally
+)
+
+if "%OS%" == "Windows_NT" setlocal
+
+pushd %~dp0..
+if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%cd%
+popd
+
+set IOTDB_CONF=%IOTDB_HOME%\conf
+set IOTDB_LOGS=%IOTDB_HOME%\logs
+
+@setlocal ENABLEDELAYEDEXPANSION ENABLEEXTENSIONS
+set is_conf_path=false
+for %%i in (%*) do (
+ IF "%%i" == "-c" (
+ set is_conf_path=true
+ ) ELSE IF "!is_conf_path!" == "true" (
+ set is_conf_path=false
+ set IOTDB_CONF=%%i
+ ) ELSE (
+ set CONF_PARAMS=!CONF_PARAMS! %%i
+ )
+)
+
+IF EXIST "%IOTDB_CONF%\iotdb-env.bat" (
+ CALL "%IOTDB_CONF%\iotdb-env.bat" %1
+ ) ELSE (
+ echo "can't find %IOTDB_CONF%\iotdb-env.bat"
+ )
+
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.metadata.schemaregion.rocksdb.SchemaDataTransfer
+if NOT DEFINED JAVA_HOME goto :err
+
+@REM -----------------------------------------------------------------------------
+@REM JVM Opts we'll use in legacy run or installation
+set JAVA_OPTS=-ea^
+ -Dlogback.configurationFile="%IOTDB_CONF%\logback.xml"^
+ -DIOTDB_HOME="%IOTDB_HOME%"^
+ -DTSFILE_HOME="%IOTDB_HOME%"^
+ -DTSFILE_CONF="%IOTDB_CONF%"^
+ -DIOTDB_CONF="%IOTDB_CONF%"^
+ -Dsun.jnu.encoding=UTF-8^
+ -Dfile.encoding=UTF-8
+
+@REM ***** CLASSPATH library setting *****
+@REM Ensure that any user defined CLASSPATH variables are not used on startup
+set CLASSPATH="%IOTDB_HOME%\lib\*"
+set CLASSPATH=%CLASSPATH%;iotdb.IoTDB
+goto okClasspath
+
+:append
+set CLASSPATH=%CLASSPATH%;%1
+
+goto :eof
+
+@REM -----------------------------------------------------------------------------
+:okClasspath
+
+rem echo CLASSPATH: %CLASSPATH%
+
+"%JAVA_HOME%\bin\java" %JAVA_OPTS% %IOTDB_HEAP_OPTS% -cp %CLASSPATH% %IOTDB_JMX_OPTS% %MAIN_CLASS% %CONF_PARAMS%
+goto finally
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+
+@REM -----------------------------------------------------------------------------
+:finally
+
+pause
+
+ENDLOCAL
diff --git a/server/src/assembly/resources/tools/rocksdb-transfer.sh b/server/src/assembly/resources/tools/rocksdb-transfer.sh
new file mode 100644
index 0000000000..a899d084f7
--- /dev/null
+++ b/server/src/assembly/resources/tools/rocksdb-transfer.sh
@@ -0,0 +1,82 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo Use the script to transfer metadata managed by default(pure memory) SchemaEngine to RocksDB based SchemaEngine
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+ export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+IOTDB_CONF=${IOTDB_HOME}/conf
+# IOTDB_LOGS=${IOTDB_HOME}/logs
+
+is_conf_path=false
+for arg do
+ shift
+ if [ "$arg" == "-c" ]; then
+ is_conf_path=true
+ continue
+ fi
+ if [ $is_conf_path == true ]; then
+ IOTDB_CONF=$arg
+ is_conf_path=false
+ continue
+ fi
+ set -- "$@" "$arg"
+done
+
+CONF_PARAMS=$*
+
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+ if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+ . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+ else
+ . "$IOTDB_CONF/iotdb-env.sh"
+ fi
+else
+ echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.db.metadata.rocksdb.MetaDataTransfer
+
+launch_service()
+{
+ class="$1"
+ iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+ iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+ iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+ iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+ iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+ iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+ exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
+ return $?
+}
+
+# Start up the service
+launch_service "$classname"
+
+exit $?
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 2b36f96ef8..fcb73ee4ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -32,9 +32,9 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
private static final Logger logger = LoggerFactory.getLogger(SchemaRegionStateMachine.class);
- private final SchemaRegion region;
+ private final ISchemaRegion region;
- public SchemaRegionStateMachine(SchemaRegion region) {
+ public SchemaRegionStateMachine(ISchemaRegion region) {
this.region = region;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java
similarity index 81%
copy from server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
copy to server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java
index f02d88f4d7..f96e5dfe50 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/AcquireLockTimeoutException.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.metadata.schemaregion;
+package org.apache.iotdb.db.exception.metadata;
-public enum SchemaEngineMode {
- Memory,
- Schema_File
+public class AcquireLockTimeoutException extends MetadataException {
+ public AcquireLockTimeoutException(String msg) {
+ super(msg);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java
index cd9c88669b..bf7ec5fa99 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigManager.java
@@ -33,8 +33,8 @@ import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateExceptio
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.rescon.SchemaResourceManager;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
import org.apache.iotdb.db.metadata.storagegroup.IStorageGroupSchemaManager;
import org.apache.iotdb.db.metadata.storagegroup.StorageGroupSchemaManager;
import org.apache.iotdb.db.metadata.template.Template;
@@ -183,7 +183,7 @@ public class LocalConfigManager {
partitionTable.clear();
- for (SchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
+ for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
schemaRegion.clear();
}
schemaEngine.clear();
@@ -206,7 +206,7 @@ public class LocalConfigManager {
storageGroupSchemaManager.forceLog();
templateManager.forceLog();
- for (SchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
+ for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
schemaRegion.forceMlog();
}
}
@@ -516,7 +516,7 @@ public class LocalConfigManager {
partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
}
- public SchemaRegion getSchemaRegion(SchemaRegionId schemaRegionId) {
+ public ISchemaRegion getSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataException {
return schemaEngine.getSchemaRegion(schemaRegionId);
}
@@ -544,7 +544,7 @@ public class LocalConfigManager {
}
}
- private SchemaRegion localCreateSchemaRegion(
+ private ISchemaRegion localCreateSchemaRegion(
PartialPath storageGroup, SchemaRegionId schemaRegionId) throws MetadataException {
return schemaEngine.createSchemaRegion(
storageGroup,
@@ -559,10 +559,10 @@ public class LocalConfigManager {
* root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
* thrown.
*/
- public SchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
+ public ISchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
- SchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+ ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
if (schemaRegion == null) {
schemaRegion = localCreateSchemaRegion(storageGroup, schemaRegionId);
partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
@@ -571,7 +571,7 @@ public class LocalConfigManager {
}
// This interface involves storage group auto creation
- public SchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
+ public ISchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
throws MetadataException {
ensureStorageGroup(path, true);
return getBelongedSchemaRegion(path);
@@ -583,9 +583,9 @@ public class LocalConfigManager {
* paths represented by the given pathPattern. If isPrefixMatch, all storage groups under the
* prefixPath that matches the given pathPattern will be collected.
*/
- public List<SchemaRegion> getInvolvedSchemaRegions(PartialPath pathPattern, boolean isPrefixMatch)
- throws MetadataException {
- List<SchemaRegion> result = new ArrayList<>();
+ public List<ISchemaRegion> getInvolvedSchemaRegions(
+ PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException {
+ List<ISchemaRegion> result = new ArrayList<>();
for (PartialPath storageGroup :
storageGroupSchemaManager.getInvolvedStorageGroups(pathPattern, isPrefixMatch)) {
for (SchemaRegionId schemaRegionId :
@@ -597,9 +597,9 @@ public class LocalConfigManager {
return result;
}
- public List<SchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
+ public List<ISchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
throws MetadataException {
- List<SchemaRegion> result = new ArrayList<>();
+ List<ISchemaRegion> result = new ArrayList<>();
for (SchemaRegionId schemaRegionId :
partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup)) {
result.add(schemaEngine.getSchemaRegion(schemaRegionId));
@@ -709,7 +709,7 @@ public class LocalConfigManager {
public Set<String> getPathsSetTemplate(String templateName) throws MetadataException {
Set<String> result = new HashSet<>();
if (templateName.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
- for (SchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
+ for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
result.addAll(schemaRegion.getPathsSetTemplate(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
}
} else {
@@ -726,7 +726,7 @@ public class LocalConfigManager {
public Set<String> getPathsUsingTemplate(String templateName) throws MetadataException {
Set<String> result = new HashSet<>();
if (templateName.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) {
- for (SchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
+ for (ISchemaRegion schemaRegion : schemaEngine.getAllSchemaRegions()) {
result.addAll(schemaRegion.getPathsUsingTemplate(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD));
}
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
index 0793afdfc4..56a8c0729d 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics;
-import org.apache.iotdb.db.metadata.schemaregion.SchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
import org.apache.iotdb.db.qp.constant.SQLConstant;
@@ -158,12 +158,12 @@ public class LocalSchemaProcessor {
* root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
* thrown.
*/
- private SchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
+ private ISchemaRegion getBelongedSchemaRegion(PartialPath path) throws MetadataException {
return configManager.getBelongedSchemaRegion(path);
}
// This interface involves storage group auto creation
- private SchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
+ private ISchemaRegion getBelongedSchemaRegionWithAutoCreate(PartialPath path)
throws MetadataException {
return configManager.getBelongedSchemaRegionWithAutoCreate(path);
}
@@ -174,12 +174,12 @@ public class LocalSchemaProcessor {
* paths represented by the given pathPattern. If isPrefixMatch, all storage groups under the
* prefixPath that matches the given pathPattern will be collected.
*/
- private List<SchemaRegion> getInvolvedSchemaRegions(
+ private List<ISchemaRegion> getInvolvedSchemaRegions(
PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException {
return configManager.getInvolvedSchemaRegions(pathPattern, isPrefixMatch);
}
- private List<SchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
+ private List<ISchemaRegion> getSchemaRegionsByStorageGroup(PartialPath storageGroup)
throws MetadataException {
return configManager.getSchemaRegionsByStorageGroup(storageGroup);
}
@@ -334,7 +334,7 @@ public class LocalSchemaProcessor {
*/
public String deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
- List<SchemaRegion> schemaRegions = getInvolvedSchemaRegions(pathPattern, isPrefixMatch);
+ List<ISchemaRegion> schemaRegions = getInvolvedSchemaRegions(pathPattern, isPrefixMatch);
if (schemaRegions.isEmpty()) {
// In the cluster mode, the deletion of a timeseries will be forwarded to all the nodes. For
// nodes that do not have the metadata of the timeseries, the coordinator expects a
@@ -344,7 +344,7 @@ public class LocalSchemaProcessor {
Set<String> failedNames = new HashSet<>();
int deletedNum = 0;
Pair<Integer, Set<String>> sgDeletionResult;
- for (SchemaRegion schemaRegion : schemaRegions) {
+ for (ISchemaRegion schemaRegion : schemaRegions) {
sgDeletionResult = schemaRegion.deleteTimeseries(pathPattern, isPrefixMatch);
deletedNum += sgDeletionResult.left;
failedNames.addAll(sgDeletionResult.right);
@@ -414,7 +414,7 @@ public class LocalSchemaProcessor {
}
try {
PartialPath storageGroup = configManager.getBelongedStorageGroup(path);
- for (SchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
+ for (ISchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
if (schemaRegion.isPathExist(path)) {
return true;
}
@@ -449,7 +449,7 @@ public class LocalSchemaProcessor {
return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber();
}
int count = 0;
- for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+ for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
count += schemaRegion.getAllTimeseriesCount(pathPattern, isPrefixMatch);
}
return count;
@@ -471,7 +471,7 @@ public class LocalSchemaProcessor {
public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
int num = 0;
- for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+ for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
num += schemaRegion.getDevicesNum(pathPattern, isPrefixMatch);
}
return num;
@@ -521,7 +521,7 @@ public class LocalSchemaProcessor {
PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
Map<PartialPath, Integer> result = new HashMap<>();
Map<PartialPath, Integer> sgResult;
- for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+ for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
sgResult = schemaRegion.getMeasurementCountGroupByLevel(pathPattern, level, isPrefixMatch);
for (PartialPath path : sgResult.keySet()) {
if (result.containsKey(path)) {
@@ -569,7 +569,7 @@ public class LocalSchemaProcessor {
configManager.getNodesListInGivenLevel(pathPattern, nodeLevel, isPrefixMatch, filter);
Set<PartialPath> result = new TreeSet<>(pair.left);
for (PartialPath storageGroup : pair.right) {
- for (SchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
+ for (ISchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
result.addAll(
schemaRegion.getNodesListInGivenLevel(pathPattern, nodeLevel, isPrefixMatch, filter));
}
@@ -593,7 +593,7 @@ public class LocalSchemaProcessor {
configManager.getChildNodePathInNextLevel(pathPattern);
Set<String> result = pair.left;
for (PartialPath storageGroup : pair.right) {
- for (SchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
+ for (ISchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
result.addAll(schemaRegion.getChildNodePathInNextLevel(pathPattern));
}
}
@@ -615,7 +615,7 @@ public class LocalSchemaProcessor {
configManager.getChildNodeNameInNextLevel(pathPattern);
Set<String> result = pair.left;
for (PartialPath storageGroup : pair.right) {
- for (SchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
+ for (ISchemaRegion schemaRegion : getSchemaRegionsByStorageGroup(storageGroup)) {
result.addAll(schemaRegion.getChildNodeNameInNextLevel(pathPattern));
}
}
@@ -719,7 +719,7 @@ public class LocalSchemaProcessor {
public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
throws MetadataException {
Set<PartialPath> result = new TreeSet<>();
- for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+ for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
result.addAll(schemaRegion.getMatchedDevices(pathPattern, isPrefixMatch));
}
return result;
@@ -738,7 +738,7 @@ public class LocalSchemaProcessor {
int offset = plan.getOffset();
Pair<List<ShowDevicesResult>, Integer> regionResult;
- for (SchemaRegion schemaRegion :
+ for (ISchemaRegion schemaRegion :
getInvolvedSchemaRegions(plan.getPath(), plan.isPrefixMatch())) {
if (limit != 0 && plan.getLimit() == 0) {
break;
@@ -805,7 +805,7 @@ public class LocalSchemaProcessor {
int tmpLimit = limit;
int tmpOffset = offset;
- for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
+ for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) {
if (limit != 0 && tmpLimit == 0) {
break;
}
@@ -847,7 +847,7 @@ public class LocalSchemaProcessor {
}
Pair<List<ShowTimeSeriesResult>, Integer> regionResult;
- for (SchemaRegion schemaRegion :
+ for (ISchemaRegion schemaRegion :
getInvolvedSchemaRegions(plan.getPath(), plan.isPrefixMatch())) {
if (limit != 0 && plan.getLimit() == 0) {
break;
@@ -966,7 +966,7 @@ public class LocalSchemaProcessor {
// endregion
// region Interfaces for alias and tag/attribute operations
- public void changeAlias(PartialPath path, String alias) throws MetadataException {
+ public void changeAlias(PartialPath path, String alias) throws MetadataException, IOException {
getBelongedSchemaRegion(path).changeAlias(path, alias);
}
@@ -1060,7 +1060,7 @@ public class LocalSchemaProcessor {
public void collectMeasurementSchema(
PartialPath prefixPath, List<IMeasurementSchema> measurementSchemas) {
try {
- for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(prefixPath, true)) {
+ for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(prefixPath, true)) {
schemaRegion.collectMeasurementSchema(prefixPath, measurementSchemas);
}
} catch (MetadataException ignored) {
@@ -1076,7 +1076,7 @@ public class LocalSchemaProcessor {
public void collectTimeseriesSchema(
PartialPath prefixPath, Collection<TimeseriesSchema> timeseriesSchemas) {
try {
- for (SchemaRegion schemaRegion : getInvolvedSchemaRegions(prefixPath, true)) {
+ for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(prefixPath, true)) {
schemaRegion.collectTimeseriesSchema(prefixPath, timeseriesSchemas);
}
} catch (MetadataException ignored) {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
index 46341679cc..f1826c1829 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
@@ -61,7 +61,7 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
}
/** @param alias alias of measurementName */
- MeasurementMNode(IMNode parent, String name, IMeasurementSchema schema, String alias) {
+ public MeasurementMNode(IMNode parent, String name, IMeasurementSchema schema, String alias) {
super(parent, name);
this.schema = schema;
this.alias = alias;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
new file mode 100644
index 0000000000..9008224310
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/ISchemaRegion.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface ISchemaRegion {
+
+ @SuppressWarnings("squid:S2093")
+ void init(IStorageGroupMNode storageGroupMNode) throws MetadataException;
+
+ void clear();
+
+ void forceMlog();
+
+ // this method is mainly used for recover and metadata sync
+ void operation(PhysicalPlan plan) throws IOException, MetadataException;
+
+ void deleteSchemaRegion() throws MetadataException;
+
+ @SuppressWarnings("squid:S3776")
+ // Suppress high Cognitive Complexity warning
+ void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException;
+
+ void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException;
+
+ Pair<Integer, Set<String>> deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException;
+
+ void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException;
+
+ boolean isPathExist(PartialPath path) throws MetadataException;
+
+ int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException;
+
+ int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch) throws MetadataException;
+
+ int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch)
+ throws MetadataException;
+
+ Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
+ PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException;
+
+ // region Interfaces for level Node info Query
+ List<PartialPath> getNodesListInGivenLevel(
+ PartialPath pathPattern,
+ int nodeLevel,
+ boolean isPrefixMatch,
+ LocalSchemaProcessor.StorageGroupFilter filter)
+ throws MetadataException;
+
+ Set<String> getChildNodePathInNextLevel(PartialPath pathPattern) throws MetadataException;
+
+ Set<String> getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException;
+
+ Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException;
+
+ Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException;
+
+ Pair<List<ShowDevicesResult>, Integer> getMatchedDevices(ShowDevicesPlan plan)
+ throws MetadataException;
+
+ Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
+ PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+ throws MetadataException;
+
+ Pair<List<ShowTimeSeriesResult>, Integer> showTimeseries(
+ ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException;
+
+ // attention: this path must be a device node
+ List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
+ throws PathNotExistException;
+
+ // region Interfaces and methods for MNode query
+ IMNode getDeviceNode(PartialPath path) throws MetadataException;
+
+ IMeasurementMNode[] getMeasurementMNodes(PartialPath deviceId, String[] measurements)
+ throws MetadataException;
+
+ IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException;
+
+ List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException;
+
+ void changeAlias(PartialPath path, String alias) throws MetadataException, IOException;
+
+ @SuppressWarnings("squid:S3776")
+ // Suppress high Cognitive Complexity warning
+ void upsertTagsAndAttributes(
+ String alias,
+ Map<String, String> tagsMap,
+ Map<String, String> attributesMap,
+ PartialPath fullPath)
+ throws MetadataException, IOException;
+
+ void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
+ throws MetadataException, IOException;
+
+ void addTags(Map<String, String> tagsMap, PartialPath fullPath)
+ throws MetadataException, IOException;
+
+ @SuppressWarnings("squid:S3776")
+ // Suppress high Cognitive Complexity warning
+ void dropTagsOrAttributes(Set<String> keySet, PartialPath fullPath)
+ throws MetadataException, IOException;
+
+ @SuppressWarnings("squid:S3776")
+ // Suppress high Cognitive Complexity warning
+ void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath fullPath)
+ throws MetadataException, IOException;
+
+ @SuppressWarnings("squid:S3776")
+ // Suppress high Cognitive Complexity warning
+ void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath fullPath)
+ throws MetadataException, IOException;
+
+ void collectMeasurementSchema(
+ PartialPath prefixPath, List<IMeasurementSchema> measurementSchemas);
+
+ void collectTimeseriesSchema(
+ PartialPath prefixPath, Collection<TimeseriesSchema> timeseriesSchemas);
+
+ @SuppressWarnings("squid:S3776")
+ // Suppress high Cognitive Complexity warning
+ IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException, IOException;
+
+ Set<String> getPathsSetTemplate(String templateName) throws MetadataException;
+
+ Set<String> getPathsUsingTemplate(String templateName) throws MetadataException;
+
+ boolean isTemplateAppendable(Template template, List<String> measurements)
+ throws MetadataException;
+
+ void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException;
+
+ void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException;
+
+ void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException;
+
+ IMeasurementMNode getMeasurementMNodeForTrigger(PartialPath fullPath) throws MetadataException;
+
+ void releaseMeasurementMNodeAfterDropTrigger(IMeasurementMNode measurementMNode)
+ throws MetadataException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
index 5bfaa8002f..28ce4f31d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngine.java
@@ -20,9 +20,14 @@
package org.apache.iotdb.db.metadata.schemaregion;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaRegion;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
@@ -31,7 +36,9 @@ import java.util.concurrent.ConcurrentHashMap;
// manage all the schemaRegion in this dataNode
public class SchemaEngine {
- private Map<SchemaRegionId, SchemaRegion> schemaRegionMap;
+ private Map<SchemaRegionId, ISchemaRegion> schemaRegionMap;
+ private SchemaEngineMode schemaRegionStoredMode;
+ private static final Logger logger = LoggerFactory.getLogger(SchemaEngine.class);
private static class SchemaEngineManagerHolder {
private static final SchemaEngine INSTANCE = new SchemaEngine();
@@ -47,6 +54,9 @@ public class SchemaEngine {
public void init() {
schemaRegionMap = new ConcurrentHashMap<>();
+ schemaRegionStoredMode =
+ SchemaEngineMode.valueOf(IoTDBDescriptor.getInstance().getConfig().getSchemaEngineMode());
+ logger.info("used schema engine mode: {}.", schemaRegionStoredMode);
}
public void clear() {
@@ -56,22 +66,35 @@ public class SchemaEngine {
}
}
- public SchemaRegion getSchemaRegion(SchemaRegionId regionId) {
+ public ISchemaRegion getSchemaRegion(SchemaRegionId regionId) {
return schemaRegionMap.get(regionId);
}
- public Collection<SchemaRegion> getAllSchemaRegions() {
+ public Collection<ISchemaRegion> getAllSchemaRegions() {
return schemaRegionMap.values();
}
- public synchronized SchemaRegion createSchemaRegion(
+ public synchronized ISchemaRegion createSchemaRegion(
PartialPath storageGroup, SchemaRegionId schemaRegionId, IStorageGroupMNode storageGroupMNode)
throws MetadataException {
- SchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
+ ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion != null) {
return schemaRegion;
}
- schemaRegion = new SchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
+ switch (schemaRegionStoredMode) {
+ case Memory:
+ case Schema_File:
+ schemaRegion = new SchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
+ break;
+ case Rocksdb_based:
+ schemaRegion = new RSchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "This mode [%s] is not supported. Please check and modify it.",
+ schemaRegionStoredMode));
+ }
schemaRegionMap.put(schemaRegionId, schemaRegion);
return schemaRegion;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
index f02d88f4d7..9147b9374c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaEngineMode.java
@@ -21,5 +21,6 @@ package org.apache.iotdb.db.metadata.schemaregion;
public enum SchemaEngineMode {
Memory,
- Schema_File
+ Schema_File,
+ Rocksdb_based
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegion.java
index 8beb55ad4b..a067b17e30 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegion.java
@@ -145,7 +145,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARA
* </ol>
*/
@SuppressWarnings("java:S1135") // ignore todos
-public class SchemaRegion {
+public class SchemaRegion implements ISchemaRegion {
private static final Logger logger = LoggerFactory.getLogger(StorageGroupSchemaManager.class);
@@ -323,6 +323,7 @@ public class SchemaRegion {
}
/** function for clearing metadata components of one schema region */
+ @Override
public synchronized void clear() {
isClearing = true;
try {
@@ -408,31 +409,7 @@ public class SchemaRegion {
clear();
// delete all the schema region files
- File schemaRegionDir = SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath);
- File[] sgFiles = schemaRegionDir.listFiles();
- if (sgFiles == null) {
- throw new MetadataException(
- String.format("Can't get files in schema region dir %s", schemaRegionDirPath));
- }
- for (File file : sgFiles) {
- if (file.delete()) {
- logger.info("delete schema region folder {}", schemaRegionDir.getAbsolutePath());
- } else {
- logger.info("delete schema region folder {} failed.", schemaRegionDir.getAbsolutePath());
- throw new MetadataException(
- String.format(
- "Failed to delete schema region folder %s", schemaRegionDir.getAbsolutePath()));
- }
- }
-
- if (schemaRegionDir.delete()) {
- logger.info("delete schema region folder {}", schemaRegionDir.getAbsolutePath());
- } else {
- logger.info("delete schema region folder {} failed.", schemaRegionDir.getAbsolutePath());
- throw new MetadataException(
- String.format(
- "Failed to delete schema region folder %s", schemaRegionDir.getAbsolutePath()));
- }
+ SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
}
// endregion
@@ -523,7 +500,7 @@ public class SchemaRegion {
* @param encoding the encoding function {@code Encoding} of the timeseries
* @param compressor the compressor function {@code Compressor} of the time series
*/
- public void createTimeseries(
+ private void createTimeseries(
PartialPath path,
TSDataType dataType,
TSEncoding encoding,
@@ -748,7 +725,6 @@ public class SchemaRegion {
* <p>(we develop this method as we need to get the node's lock after we get the lock.writeLock())
*
* @param path path
- * @param
*/
public IMNode getDeviceNodeWithAutoCreate(PartialPath path, boolean autoCreateSchema)
throws IOException, MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionUtils.java
new file mode 100644
index 0000000000..40f2cc10c1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion;
+
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+
+import org.slf4j.Logger;
+
+import java.io.File;
+
+public class SchemaRegionUtils {
+
+ public static void deleteSchemaRegionFolder(String schemaRegionDirPath, Logger logger)
+ throws MetadataException {
+ File schemaRegionDir = SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath);
+ File[] sgFiles = schemaRegionDir.listFiles();
+ if (sgFiles == null) {
+ throw new MetadataException(
+ String.format("Can't get files in schema region dir %s", schemaRegionDirPath));
+ }
+ for (File file : sgFiles) {
+ if (file.delete()) {
+ logger.info("delete schema region folder {}", schemaRegionDir.getAbsolutePath());
+ } else {
+ logger.info("delete schema region folder {} failed.", schemaRegionDir.getAbsolutePath());
+ throw new MetadataException(
+ String.format(
+ "Failed to delete schema region folder %s", schemaRegionDir.getAbsolutePath()));
+ }
+ }
+
+ if (schemaRegionDir.delete()) {
+ logger.info("delete schema region folder {}", schemaRegionDir.getAbsolutePath());
+ } else {
+ logger.info("delete schema region folder {} failed.", schemaRegionDir.getAbsolutePath());
+ throw new MetadataException(
+ String.format(
+ "Failed to delete schema region folder %s", schemaRegionDir.getAbsolutePath()));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/CheckKeyResult.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/CheckKeyResult.java
index 403b714d35..739e7a7a69 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/CheckKeyResult.java
@@ -16,24 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.service;
-import org.apache.iotdb.db.utils.MemUtils;
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
-public class IoTDBShutdownHook extends Thread {
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils.NODE_TYPE_ARRAY;
- private static final Logger logger = LoggerFactory.getLogger(IoTDBShutdownHook.class);
+public class CheckKeyResult {
- @Override
- public void run() {
- if (logger.isInfoEnabled()) {
- logger.info(
- "IoTDB exits. Jvm memory usage: {}",
- MemUtils.bytesCntToStr(
- Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()));
- }
+ private byte[] value;
+ private RMNodeType nodeType;
+
+ public boolean existAnyKey() {
+ return nodeType != null;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public void setValue(byte[] value) {
+ this.value = value;
+ }
+
+ public void setExistType(char type) {
+ nodeType = NODE_TYPE_ARRAY[type];
+ }
+
+ public boolean getResult(RMNodeType type) {
+ return type == nodeType;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConstants.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConstants.java
new file mode 100644
index 0000000000..e97031d529
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaConstants.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+public class RSchemaConstants {
+
+ public static final char ZERO = '0';
+ public static final char ROOT_CHAR = 'r';
+ public static final String ROOT = "r";
+ public static final String ROOT_STRING = "root";
+
+ public static final String PATH_SEPARATOR = ".";
+ public static final String ESCAPE_PATH_SEPARATOR = "[.]";
+
+ public static final String TABLE_NAME_TAGS = "tags";
+
+ // Node type
+ public static final char NODE_TYPE_ROOT = '\u0000';
+ public static final char NODE_TYPE_INTERNAL = '\u0001';
+ public static final char NODE_TYPE_SG = '\u0002';
+ public static final char NODE_TYPE_ENTITY = '\u0004';
+ public static final char NODE_TYPE_MEASUREMENT = '\u0008';
+ public static final char NODE_TYPE_ALIAS = '\u0010';
+
+ public static final int MAX_NODE_TYPE_NUM = NODE_TYPE_ALIAS + 1;
+
+ public static final Character[] ALL_NODE_TYPE_ARRAY =
+ new Character[] {
+ NODE_TYPE_ROOT,
+ NODE_TYPE_INTERNAL,
+ NODE_TYPE_SG,
+ NODE_TYPE_ENTITY,
+ NODE_TYPE_MEASUREMENT,
+ NODE_TYPE_ALIAS
+ };
+
+ public static final byte DATA_VERSION = 0x00;
+
+ public static final byte DEFAULT_FLAG = 0x00;
+
+ public static final byte FLAG_SET_TTL = 0x01;
+ public static final byte FLAG_HAS_SCHEMA = 0x01 << 1;
+ public static final byte FLAG_HAS_ALIAS = 0x01 << 2;
+ public static final byte FLAG_HAS_TAGS = 0x01 << 3;
+ public static final byte FLAG_HAS_ATTRIBUTES = 0x01 << 4;
+ public static final byte FLAG_IS_ALIGNED = 0x01 << 5;
+
+ public static final byte DATA_BLOCK_TYPE_TTL = 0x01;
+ public static final byte DATA_BLOCK_TYPE_SCHEMA = 0x01 << 1;
+ public static final byte DATA_BLOCK_TYPE_ALIAS = 0x01 << 2;
+ public static final byte DATA_BLOCK_TYPE_TAGS = 0x01 << 3;
+ public static final byte DATA_BLOCK_TYPE_ATTRIBUTES = 0x01 << 4;
+ // alias's origin key
+ public static final byte DATA_BLOCK_TYPE_ORIGIN_KEY = 0x01 << 5;
+
+ public static final byte[] DEFAULT_NODE_VALUE = new byte[] {DATA_VERSION, DEFAULT_FLAG};
+ public static final byte[] DEFAULT_ALIGNED_ENTITY_VALUE =
+ new byte[] {DATA_VERSION, FLAG_IS_ALIGNED};
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaLogger.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaLogger.java
new file mode 100644
index 0000000000..346a1514dc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaLogger.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.rocksdb.DBOptions;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.Logger;
+import org.rocksdb.Options;
+
+public class RSchemaLogger extends Logger {
+ private final org.slf4j.Logger logger;
+
+ public RSchemaLogger(Options options, org.slf4j.Logger logger) {
+ super(options);
+ this.logger = logger;
+ }
+
+ public RSchemaLogger(DBOptions options, org.slf4j.Logger logger) {
+ super(options);
+ this.logger = logger;
+ }
+
+ @Override
+ protected void log(InfoLogLevel infoLogLevel, String logMsg) {
+ switch (infoLogLevel) {
+ case DEBUG_LEVEL:
+ logger.debug(logMsg);
+ break;
+ case NUM_INFO_LOG_LEVELS:
+ case INFO_LEVEL:
+ logger.info(logMsg);
+ break;
+ case WARN_LEVEL:
+ logger.warn(logMsg);
+ break;
+ case ERROR_LEVEL:
+ case FATAL_LEVEL:
+ case HEADER_LEVEL:
+ logger.error(logMsg);
+ default:
+ break;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandler.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandler.java
new file mode 100644
index 0000000000..86b503bdaa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandler.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.primitives.Bytes;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Filter;
+import org.rocksdb.Holder;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.rocksdb.util.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ALL_NODE_TYPE_ARRAY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_VERSION;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_FLAG;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.TABLE_NAME_TAGS;
+
+public class RSchemaReadWriteHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(RSchemaReadWriteHandler.class);
+
+ protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private static final String ROCKSDB_FOLDER = "rocksdb-schema";
+
+ private static final String[] INNER_TABLES =
+ new String[] {new String(RocksDB.DEFAULT_COLUMN_FAMILY), TABLE_NAME_TAGS};
+
+ public static final String ROCKSDB_PATH = config.getSystemDir() + File.separator + ROCKSDB_FOLDER;
+
+ private static final long BLOCK_CACHE = 20L * 1024 * 1024 * 1024;
+ private static final long BLOCK_CACHE_COMPRESSED = 10L * 1024 * 1024 * 1024;
+
+ private RocksDB rocksDB;
+
+ ConcurrentMap<String, ColumnFamilyHandle> columnFamilyHandleMap = new ConcurrentHashMap<>();
+ List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
+ List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ public RSchemaReadWriteHandler(String path) throws RocksDBException {
+ initReadWriteHandler(path);
+ }
+
+ public RSchemaReadWriteHandler() throws RocksDBException {
+ initReadWriteHandler(ROCKSDB_PATH);
+ }
+
+ private void initReadWriteHandler(String path) throws RocksDBException {
+ try (Options options = new Options()) {
+ org.rocksdb.Logger rocksDBLogger = new RSchemaLogger(options, logger);
+ rocksDBLogger.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
+
+ options
+ .setCreateIfMissing(true)
+ .setAllowMmapReads(true)
+ .setWriteBufferSize(64 * SizeUnit.KB)
+ .setMaxWriteBufferNumber(6)
+ .setMaxBackgroundJobs(10)
+ .setStatistics(new Statistics())
+ .setLogger(rocksDBLogger);
+
+ final Filter bloomFilter = new BloomFilter(64);
+
+ final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
+ Cache cache = new LRUCache(BLOCK_CACHE, 6);
+ tableOptions
+ .setBlockCache(cache)
+ .setFilterPolicy(bloomFilter)
+ .setBlockSizeDeviation(5)
+ .setBlockRestartInterval(10)
+ .setCacheIndexAndFilterBlocks(true)
+ .setBlockCacheCompressed(new LRUCache(BLOCK_CACHE_COMPRESSED, 6));
+
+ options.setTableFormatConfig(tableOptions);
+
+ try (DBOptions dbOptions = new DBOptions(options)) {
+
+ initColumnFamilyDescriptors(options, path);
+
+ rocksDB = RocksDB.open(dbOptions, path, columnFamilyDescriptors, columnFamilyHandles);
+
+ initInnerColumnFamilies();
+
+ initRootKey();
+ }
+ }
+ }
+
+ private void initColumnFamilyDescriptors(Options options, String path) throws RocksDBException {
+ List<byte[]> cfs = RocksDB.listColumnFamilies(options, path);
+ if (cfs.isEmpty()) {
+ cfs = new ArrayList<>();
+ cfs.add(RocksDB.DEFAULT_COLUMN_FAMILY);
+ }
+
+ for (byte[] tableBytes : cfs) {
+ columnFamilyDescriptors.add(
+ new ColumnFamilyDescriptor(tableBytes, new ColumnFamilyOptions()));
+ }
+ }
+
+ private void initInnerColumnFamilies() throws RocksDBException {
+ for (String tableNames : INNER_TABLES) {
+ boolean tableCreated = false;
+ for (ColumnFamilyHandle cfh : columnFamilyHandles) {
+ if (tableNames.equals(new String(cfh.getName()))) {
+ tableCreated = true;
+ break;
+ }
+ }
+ if (!tableCreated) {
+ createTable(tableNames);
+ }
+ }
+ for (ColumnFamilyHandle handle : columnFamilyHandles) {
+ columnFamilyHandleMap.put(new String(handle.getName()), handle);
+ }
+ }
+
+ private void initRootKey() throws RocksDBException {
+ byte[] rootKey = RSchemaUtils.toRocksDBKey(ROOT, NODE_TYPE_ROOT);
+ if (!keyExist(rootKey)) {
+ rocksDB.put(rootKey, new byte[] {DATA_VERSION, DEFAULT_FLAG});
+ }
+ }
+
+ private void createTable(String tableName) throws RocksDBException {
+ ColumnFamilyHandle columnFamilyHandle =
+ rocksDB.createColumnFamily(
+ new ColumnFamilyDescriptor(tableName.getBytes(), new ColumnFamilyOptions()));
+ columnFamilyDescriptors.add(
+ new ColumnFamilyDescriptor(tableName.getBytes(), new ColumnFamilyOptions()));
+ columnFamilyHandles.add(columnFamilyHandle);
+ }
+
+ public ColumnFamilyHandle getColumnFamilyHandleByName(String columnFamilyName) {
+ return columnFamilyHandleMap.get(columnFamilyName);
+ }
+
+ public void updateNode(byte[] key, byte[] value) throws RocksDBException {
+ rocksDB.put(key, value);
+ }
+
+ public void createNode(String levelKey, RMNodeType type, byte[] value) throws RocksDBException {
+ byte[] nodeKey = RSchemaUtils.toRocksDBKey(levelKey, type.getValue());
+ rocksDB.put(nodeKey, value);
+ }
+
+ public void createNode(byte[] nodeKey, byte[] value) throws RocksDBException {
+ rocksDB.put(nodeKey, value);
+ }
+
+ public void convertToEntityNode(String levelPath, byte[] value) throws RocksDBException {
+ try (WriteBatch batch = new WriteBatch()) {
+ byte[] internalKey = RSchemaUtils.toInternalNodeKey(levelPath);
+ byte[] entityKey = RSchemaUtils.toEntityNodeKey(levelPath);
+ batch.delete(internalKey);
+ batch.put(entityKey, value);
+ executeBatch(batch);
+ }
+ }
+
+ public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
+ String[] nodes = fullPath.getNodes();
+ String key = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
+ try {
+ byte[] value = rocksDB.get(key.getBytes());
+ if (value == null) {
+ logger.warn("path not exist: {}", key);
+ throw new MetadataException("key not exist");
+ }
+ return new MeasurementMNode(null, fullPath.getFullPath(), null, null);
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ public boolean keyExistByType(String levelKey, RMNodeType type) throws RocksDBException {
+ return keyExistByType(levelKey, type, new Holder<>());
+ }
+
+ public boolean keyExistByType(String levelKey, RMNodeType type, Holder<byte[]> holder)
+ throws RocksDBException {
+ byte[] key = RSchemaUtils.toRocksDBKey(levelKey, type.getValue());
+ return keyExist(key, holder);
+ }
+
+ public CheckKeyResult keyExistByAllTypes(String levelKey) throws RocksDBException {
+ RMNodeType[] types =
+ new RMNodeType[] {
+ RMNodeType.ALISA,
+ RMNodeType.ENTITY,
+ RMNodeType.INTERNAL,
+ RMNodeType.MEASUREMENT,
+ RMNodeType.STORAGE_GROUP
+ };
+ return keyExistByTypes(levelKey, types);
+ }
+
+ public CheckKeyResult keyExistByTypes(String levelKey, RMNodeType... types)
+ throws RocksDBException {
+ CheckKeyResult result = new CheckKeyResult();
+ try {
+ Arrays.stream(types)
+ .forEach(
+ x -> {
+ byte[] key = Bytes.concat(new byte[] {(byte) x.getValue()}, levelKey.getBytes());
+ try {
+ Holder<byte[]> holder = new Holder<>();
+ boolean keyExisted = keyExist(key, holder);
+ if (keyExisted) {
+ result.setExistType(x.getValue());
+ result.setValue(holder.getValue());
+ }
+ } catch (RocksDBException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ if (e.getCause() instanceof RocksDBException) {
+ throw (RocksDBException) e.getCause();
+ }
+ throw e;
+ }
+ return result;
+ }
+
+ public boolean keyExist(byte[] key, Holder<byte[]> holder) throws RocksDBException {
+ boolean exist = false;
+ if (rocksDB.keyMayExist(key, holder)) {
+ if (holder.getValue() == null) {
+ byte[] value = rocksDB.get(key);
+ if (value != null) {
+ exist = true;
+ holder.setValue(value);
+ }
+ } else {
+ exist = true;
+ }
+ }
+ return exist;
+ }
+
+ public boolean keyExist(byte[] key) throws RocksDBException {
+ return keyExist(key, new Holder<>());
+ }
+
+ public void scanAllKeysRecursively(Set<String> seeds, int level, Function<String, Boolean> op) {
+ if (seeds == null || seeds.isEmpty()) {
+ return;
+ }
+ Set<String> children = ConcurrentHashMap.newKeySet();
+ seeds
+ .parallelStream()
+ .forEach(
+ x -> {
+ if (Boolean.TRUE.equals(op.apply(x))) {
+ // x is not leaf node
+ String childrenPrefix = RSchemaUtils.getNextLevelOfPath(x, level);
+ children.addAll(getAllByPrefix(childrenPrefix));
+ }
+ });
+ if (!children.isEmpty()) {
+ scanAllKeysRecursively(children, level + 1, op);
+ }
+ }
+
+ public Set<String> getAllByPrefix(String prefix) {
+ Set<String> result = new HashSet<>();
+ byte[] prefixKey = prefix.getBytes();
+ try (RocksIterator iterator = rocksDB.newIterator()) {
+ for (iterator.seek(prefixKey); iterator.isValid(); iterator.next()) {
+ String key = new String(iterator.key());
+ if (!key.startsWith(prefix)) {
+ break;
+ }
+ result.add(key);
+ }
+ return result;
+ }
+ }
+
+ public byte[] get(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
+ if (columnFamilyHandle == null) {
+ return rocksDB.get(key);
+ }
+ return rocksDB.get(columnFamilyHandle, key);
+ }
+
+ public RocksIterator iterator(ColumnFamilyHandle columnFamilyHandle) {
+ if (columnFamilyHandle == null) {
+ return rocksDB.newIterator();
+ }
+ return rocksDB.newIterator(columnFamilyHandle);
+ }
+
+ public boolean existAnySiblings(String siblingPrefix) {
+ for (char type : ALL_NODE_TYPE_ARRAY) {
+ byte[] key = RSchemaUtils.toRocksDBKey(siblingPrefix, type);
+ try (RocksIterator iterator = rocksDB.newIterator()) {
+ for (iterator.seek(key); iterator.isValid(); iterator.next()) {
+ if (!RSchemaUtils.prefixMatch(iterator.key(), key)) {
+ break;
+ } else {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ public void getKeyByPrefix(String innerName, Function<String, Boolean> function) {
+ try (RocksIterator iterator = rocksDB.newIterator()) {
+ for (iterator.seek(innerName.getBytes()); iterator.isValid(); iterator.next()) {
+ String keyStr = new String(iterator.key());
+ if (!keyStr.startsWith(innerName)) {
+ break;
+ }
+ function.apply(keyStr);
+ }
+ }
+ }
+
+ public Map<byte[], byte[]> getKeyValueByPrefix(String innerName) {
+ try (RocksIterator iterator = rocksDB.newIterator()) {
+ Map<byte[], byte[]> result = new HashMap<>();
+ for (iterator.seek(innerName.getBytes()); iterator.isValid(); iterator.next()) {
+ String keyStr = new String(iterator.key());
+ if (!keyStr.startsWith(innerName)) {
+ break;
+ }
+ result.put(iterator.key(), iterator.value());
+ }
+ return result;
+ }
+ }
+
+ public String findBelongToSpecifiedNodeType(String[] nodes, char nodeType) {
+ String innerPathName;
+ for (int level = nodes.length; level > 0; level--) {
+ String[] copy = Arrays.copyOf(nodes, level);
+ innerPathName = RSchemaUtils.convertPartialPathToInnerByNodes(copy, level, nodeType);
+ boolean isBelongToType = rocksDB.keyMayExist(innerPathName.getBytes(), new Holder<>());
+ if (isBelongToType) {
+ return innerPathName;
+ }
+ }
+ return null;
+ }
+
+ public void executeBatch(WriteBatch batch) throws RocksDBException {
+ rocksDB.write(new WriteOptions(), batch);
+ }
+
+ public void deleteNode(String[] nodes, RMNodeType type) throws RocksDBException {
+ byte[] key =
+ RSchemaUtils.toRocksDBKey(
+ RSchemaUtils.getLevelPath(nodes, nodes.length - 1), type.getValue());
+ rocksDB.delete(key);
+ }
+
+ public void deleteByKey(byte[] key) throws RocksDBException {
+ rocksDB.delete(key);
+ }
+
+ public void deleteNodeByPrefix(byte[] startKey, byte[] endKey) throws RocksDBException {
+ rocksDB.deleteRange(startKey, endKey);
+ }
+
+ public void deleteNodeByPrefix(ColumnFamilyHandle handle, byte[] startKey, byte[] endKey)
+ throws RocksDBException {
+ rocksDB.deleteRange(handle, startKey, endKey);
+ }
+
+ @TestOnly
+ public void scanAllKeys(String filePath) throws IOException {
+ try (RocksIterator iterator = rocksDB.newIterator()) {
+ logger.info("\n-----------------scan rocksdb start----------------------");
+ iterator.seekToFirst();
+ File outputFile = new File(filePath);
+ if (outputFile.exists()) {
+ boolean deleted = outputFile.delete();
+ logger.info("delete output file: " + deleted);
+ }
+ try (BufferedOutputStream outputStream =
+ new BufferedOutputStream(new FileOutputStream(outputFile))) {
+ while (iterator.isValid()) {
+ byte[] key = iterator.key();
+ key[0] = (byte) (key[0] + '0');
+ outputStream.write(key);
+ outputStream.write(" -> ".getBytes());
+
+ byte[] value = iterator.value();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+ // skip the version flag and node type flag
+ ReadWriteIOUtils.readBytes(byteBuffer, 2);
+ while (byteBuffer.hasRemaining()) {
+ byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+ switch (blockType) {
+ case RSchemaConstants.DATA_BLOCK_TYPE_TTL:
+ long l = ReadWriteIOUtils.readLong(byteBuffer);
+ outputStream.write(String.valueOf(l).getBytes());
+ outputStream.write(" ".getBytes());
+ break;
+ case DATA_BLOCK_TYPE_SCHEMA:
+ MeasurementSchema schema = MeasurementSchema.deserializeFrom(byteBuffer);
+ outputStream.write(schema.toString().getBytes());
+ outputStream.write(" ".getBytes());
+ break;
+ case RSchemaConstants.DATA_BLOCK_TYPE_ALIAS:
+ String str = ReadWriteIOUtils.readString(byteBuffer);
+ outputStream.write(Objects.requireNonNull(str).getBytes());
+ outputStream.write(" ".getBytes());
+ break;
+ case DATA_BLOCK_TYPE_ORIGIN_KEY:
+ byte[] originKey = RSchemaUtils.readOriginKey(byteBuffer);
+ outputStream.write(originKey);
+ outputStream.write(" ".getBytes());
+ break;
+ case RSchemaConstants.DATA_BLOCK_TYPE_TAGS:
+ case RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES:
+ Map<String, String> map = ReadWriteIOUtils.readMap(byteBuffer);
+ for (Map.Entry<String, String> entry : Objects.requireNonNull(map).entrySet()) {
+ outputStream.write(
+ ("<" + entry.getKey() + "," + entry.getValue() + ">").getBytes());
+ }
+ outputStream.write(" ".getBytes());
+ break;
+ default:
+ break;
+ }
+ }
+ outputStream.write("\n".getBytes());
+ iterator.next();
+ }
+ }
+ logger.info("\n-----------------scan rocksdb end----------------------");
+ }
+ }
+
+ public void close() throws RocksDBException {
+ rocksDB.syncWal();
+ rocksDB.closeE();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
new file mode 100644
index 0000000000..0eb798fd61
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -0,0 +1,1962 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MNodeTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.SchemaDirCreationFailureException;
+import org.apache.iotdb.db.metadata.LocalSchemaProcessor.StorageGroupFilter;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.idtable.IDTable;
+import org.apache.iotdb.db.metadata.idtable.IDTableManager;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaRegionUtils;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.REntityMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeValueType;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMeasurementMNode;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import com.google.common.collect.MapMaker;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.rocksdb.Holder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ALL_NODE_TYPE_ARRAY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_NODE_VALUE;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ZERO;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+public class RSchemaRegion implements ISchemaRegion {
+ private static final Logger logger = LoggerFactory.getLogger(RSchemaRegion.class);
+
+ protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ // TODO: make it configurable
+ public static final int MAX_PATH_DEPTH = 10;
+
+ private static final long MAX_LOCK_WAIT_TIME = 50;
+
+ private final RSchemaReadWriteHandler readWriteHandler;
+
+ private final ReadWriteLock deleteUpdateLock = new ReentrantReadWriteLock();
+
+ private final Map<String, ReentrantLock> locksPool =
+ new MapMaker().weakValues().initialCapacity(10000).makeMap();
+
+ private String schemaRegionDirPath;
+ private String storageGroupFullPath;
+ private ConsensusGroupId schemaRegionId;
+ private IStorageGroupMNode storageGroupMNode;
+ private int storageGroupPathLevel;
+
+ public RSchemaRegion() throws MetadataException {
+ try {
+ readWriteHandler = new RSchemaReadWriteHandler();
+ } catch (RocksDBException e) {
+ logger.error("create RocksDBReadWriteHandler fail", e);
+ throw new MetadataException(e);
+ }
+ }
+
+ public RSchemaRegion(
+ PartialPath storageGroup,
+ ConsensusGroupId schemaRegionId,
+ IStorageGroupMNode storageGroupMNode)
+ throws MetadataException {
+ this.schemaRegionId = schemaRegionId;
+ storageGroupFullPath = storageGroup.getFullPath();
+ init(storageGroupMNode);
+ try {
+ readWriteHandler = new RSchemaReadWriteHandler(schemaRegionDirPath);
+ } catch (RocksDBException e) {
+ logger.error("create RocksDBReadWriteHandler fail", e);
+ throw new MetadataException(e);
+ }
+ }
+
+ @Override
+ public void init(IStorageGroupMNode storageGroupMNode) throws MetadataException {
+ this.storageGroupMNode = storageGroupMNode;
+ schemaRegionDirPath =
+ config.getSchemaDir()
+ + File.separator
+ + storageGroupFullPath
+ + File.separator
+ + schemaRegionId.getId();
+ File schemaRegionFolder = SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath);
+ if (!schemaRegionFolder.exists()) {
+ if (schemaRegionFolder.mkdirs()) {
+ logger.info("create schema region folder {}", schemaRegionDirPath);
+ } else {
+ if (!schemaRegionFolder.exists()) {
+ logger.error("create schema region folder {} failed.", schemaRegionDirPath);
+ throw new SchemaDirCreationFailureException(schemaRegionDirPath);
+ }
+ }
+ }
+ storageGroupPathLevel = RSchemaUtils.getLevelByPartialPath(storageGroupFullPath);
+ }
+
+ @Override
+ public void forceMlog() {
+ // do nothing
+ }
+
+ @Override
+ public void operation(PhysicalPlan plan) throws IOException, MetadataException {
+ switch (plan.getOperatorType()) {
+ case CREATE_TIMESERIES:
+ CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
+ createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
+ break;
+ case CREATE_ALIGNED_TIMESERIES:
+ CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
+ (CreateAlignedTimeSeriesPlan) plan;
+ createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+ break;
+ case DELETE_TIMESERIES:
+ DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
+ // cause we only has one path for one DeleteTimeSeriesPlan
+ deleteTimeseries(deleteTimeSeriesPlan.getPaths().get(0));
+ break;
+ case CHANGE_ALIAS:
+ ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
+ changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
+ break;
+ case AUTO_CREATE_DEVICE_MNODE:
+ AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
+ autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
+ break;
+ case CHANGE_TAG_OFFSET:
+ case SET_TEMPLATE:
+ case ACTIVATE_TEMPLATE:
+ case UNSET_TEMPLATE:
+ logger.error("unsupported operations {}", plan);
+ break;
+ default:
+ logger.error("Unrecognizable command {}", plan.getOperatorType());
+ }
+ }
+
+ @Override
+ public void deleteSchemaRegion() throws MetadataException {
+ clear();
+ SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
+ }
+
+ @Override
+ public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
+ try {
+ if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ createTimeseries(
+ plan.getPath(),
+ new MeasurementSchema(
+ plan.getPath().getMeasurement(),
+ plan.getDataType(),
+ plan.getEncoding(),
+ plan.getCompressor(),
+ plan.getProps()),
+ plan.getAlias(),
+ plan.getTags(),
+ plan.getAttributes());
+ // update id table if id table log file is disabled
+ if (config.isEnableIDTable() && !config.isEnableIDTableLogFile()) {
+ IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPath().getDevicePath());
+ idTable.createTimeseries(plan);
+ }
+ } else {
+ throw new AcquireLockTimeoutException(
+ "Acquire lock timeout when creating timeseries: " + plan.getPath().getFullPath());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ deleteUpdateLock.readLock().unlock();
+ }
+ }
+
+ @TestOnly
+ protected void createTimeseries(
+ PartialPath path,
+ TSDataType dataType,
+ TSEncoding encoding,
+ CompressionType compressor,
+ Map<String, String> props)
+ throws MetadataException {
+ createTimeseries(path, dataType, encoding, compressor, props, null);
+ }
+
+ @TestOnly
+ protected void createTimeseries(
+ PartialPath path,
+ TSDataType dataType,
+ TSEncoding encoding,
+ CompressionType compressor,
+ Map<String, String> props,
+ String alias)
+ throws MetadataException {
+ createTimeseries(
+ path,
+ new MeasurementSchema(path.getMeasurement(), dataType, encoding, compressor, props),
+ alias,
+ null,
+ null);
+ }
+
+ protected void createTimeseries(
+ PartialPath path,
+ IMeasurementSchema schema,
+ String alias,
+ Map<String, String> tags,
+ Map<String, String> attributes)
+ throws MetadataException {
+ // regular check
+ if (path.getNodes().length > RSchemaRegion.MAX_PATH_DEPTH) {
+ throw new IllegalPathException(
+ String.format(
+ "path is too long, provide: %d, max: %d",
+ path.getNodeLength(), RSchemaRegion.MAX_PATH_DEPTH));
+ }
+ MetaFormatUtils.checkTimeseries(path);
+ MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), schema.getProps());
+
+ // sg check and create
+ String[] nodes = path.getNodes();
+ SchemaUtils.checkDataTypeWithEncoding(schema.getType(), schema.getEncodingType());
+
+ try {
+ createTimeSeriesRecursively(
+ nodes, nodes.length, storageGroupPathLevel, schema, alias, tags, attributes);
+ // TODO: load tags to memory
+ } catch (RocksDBException | IOException e) {
+ throw new MetadataException(e);
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void createTimeSeriesRecursively(
+ String[] nodes,
+ int start,
+ int end,
+ IMeasurementSchema schema,
+ String alias,
+ Map<String, String> tags,
+ Map<String, String> attributes)
+ throws InterruptedException, MetadataException, RocksDBException, IOException {
+ if (start <= end) {
+ // "ROOT" node must exist and don't need to check
+ return;
+ }
+ String levelPath = RSchemaUtils.getLevelPath(nodes, start - 1);
+ Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
+ if (!checkResult.existAnyKey()) {
+ createTimeSeriesRecursively(nodes, start - 1, end, schema, alias, tags, attributes);
+ if (start == nodes.length) {
+ createTimeSeriesNode(nodes, levelPath, schema, alias, tags, attributes);
+ } else if (start == nodes.length - 1) {
+ readWriteHandler.createNode(levelPath, RMNodeType.ENTITY, DEFAULT_NODE_VALUE);
+ } else {
+ readWriteHandler.createNode(levelPath, RMNodeType.INTERNAL, DEFAULT_NODE_VALUE);
+ }
+ } else {
+ if (start == nodes.length) {
+ throw new PathAlreadyExistException(RSchemaUtils.getPathByLevelPath(levelPath));
+ }
+
+ if (start == nodes.length - 1) {
+ if (checkResult.getResult(RMNodeType.INTERNAL)) {
+ // convert the parent node to entity if it is internal node
+ readWriteHandler.convertToEntityNode(levelPath, DEFAULT_NODE_VALUE);
+ } else if (checkResult.getResult(RMNodeType.ENTITY)) {
+ if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
+ throw new AlignedTimeseriesException(
+ "Timeseries under this entity is aligned, please use createAlignedTimeseries"
+ + " or change entity.",
+ RSchemaUtils.getPathByLevelPath(levelPath));
+ }
+ } else {
+ throw new MNodeTypeMismatchException(
+ RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
+ }
+ }
+
+ if (checkResult.getResult(RMNodeType.MEASUREMENT)
+ || checkResult.getResult(RMNodeType.ALISA)) {
+ throw new MNodeTypeMismatchException(
+ RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.INTERNAL_MNODE_TYPE);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ }
+
+ private void createTimeSeriesNode(
+ String[] nodes,
+ String levelPath,
+ IMeasurementSchema schema,
+ String alias,
+ Map<String, String> tags,
+ Map<String, String> attributes)
+ throws IOException, RocksDBException, MetadataException, InterruptedException {
+ // create time-series node
+ try (WriteBatch batch = new WriteBatch()) {
+ byte[] value = RSchemaUtils.buildMeasurementNodeValue(schema, alias, tags, attributes);
+ byte[] measurementKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
+ batch.put(measurementKey, value);
+
+ // measurement with tags will save in a separate table at the same time
+ if (tags != null && !tags.isEmpty()) {
+ batch.put(
+ readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
+ measurementKey,
+ DEFAULT_NODE_VALUE);
+ }
+
+ if (StringUtils.isNotEmpty(alias)) {
+ String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
+ aliasNodes[nodes.length - 1] = alias;
+ String aliasLevelPath = RSchemaUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
+ byte[] aliasNodeKey = RSchemaUtils.toAliasNodeKey(aliasLevelPath);
+ Lock lock = locksPool.computeIfAbsent(aliasLevelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ if (!readWriteHandler.keyExistByAllTypes(aliasLevelPath).existAnyKey()) {
+ batch.put(aliasNodeKey, RSchemaUtils.buildAliasNodeValue(measurementKey));
+ readWriteHandler.executeBatch(batch);
+ } else {
+ throw new AliasAlreadyExistException(
+ RSchemaUtils.getPathByLevelPath(levelPath), alias);
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ } else {
+ readWriteHandler.executeBatch(batch);
+ }
+ }
+ }
+
+ private void createAlignedTimeSeries(
+ PartialPath prefixPath,
+ List<String> measurements,
+ List<TSDataType> dataTypes,
+ List<TSEncoding> encodings)
+ throws MetadataException {
+ if (prefixPath.getNodeLength() > MAX_PATH_DEPTH - 1) {
+ throw new IllegalPathException(
+ String.format(
+ "Prefix path is too long, provide: %d, max: %d",
+ prefixPath.getNodeLength(), RSchemaRegion.MAX_PATH_DEPTH - 1));
+ }
+
+ MetaFormatUtils.checkTimeseries(prefixPath);
+
+ for (int i = 0; i < measurements.size(); i++) {
+ SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
+ MetaFormatUtils.checkNodeName(measurements.get(i));
+ }
+
+ try (WriteBatch batch = new WriteBatch()) {
+ createEntityRecursively(
+ prefixPath.getNodes(), prefixPath.getNodeLength(), storageGroupPathLevel + 1, true);
+ String[] locks = new String[measurements.size()];
+ for (int i = 0; i < measurements.size(); i++) {
+ String measurement = measurements.get(i);
+ String levelPath = RSchemaUtils.getMeasurementLevelPath(prefixPath.getNodes(), measurement);
+ locks[i] = levelPath;
+ MeasurementSchema schema =
+ new MeasurementSchema(measurement, dataTypes.get(i), encodings.get(i));
+ byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+ byte[] value = RSchemaUtils.buildMeasurementNodeValue(schema, null, null, null);
+ batch.put(key, value);
+ }
+
+ for (String lockKey : locks) {
+ Lock lock = locksPool.computeIfAbsent(lockKey, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ if (readWriteHandler.keyExistByAllTypes(lockKey).existAnyKey()) {
+ throw new PathAlreadyExistException(lockKey);
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + lockKey);
+ }
+ }
+ readWriteHandler.executeBatch(batch);
+
+ // TODO: update cache if necessary
+ } catch (RocksDBException | IOException e) {
+ throw new MetadataException(e);
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws MetadataException {
+ PartialPath prefixPath = plan.getPrefixPath();
+ List<String> measurements = plan.getMeasurements();
+ List<TSDataType> dataTypes = plan.getDataTypes();
+ List<TSEncoding> encodings = plan.getEncodings();
+
+ try {
+ if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ createAlignedTimeSeries(prefixPath, measurements, dataTypes, encodings);
+ // update id table if not in recovering or disable id table log file
+ if (config.isEnableIDTable() && !config.isEnableIDTableLogFile()) {
+ IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
+ idTable.createAlignedTimeseries(plan);
+ }
+ } else {
+ throw new AcquireLockTimeoutException(
+ "Acquire lock timeout when do createAlignedTimeSeries: " + prefixPath.getFullPath());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ deleteUpdateLock.readLock().unlock();
+ }
+ }
+
+ private void createEntityRecursively(String[] nodes, int start, int end, boolean aligned)
+ throws RocksDBException, MetadataException, InterruptedException {
+ if (start <= end) {
+ // "ROOT" must exist
+ return;
+ }
+ String levelPath = RSchemaUtils.getLevelPath(nodes, start - 1);
+ Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ CheckKeyResult checkResult = readWriteHandler.keyExistByAllTypes(levelPath);
+ if (!checkResult.existAnyKey()) {
+ createEntityRecursively(nodes, start - 1, end, aligned);
+ if (start == nodes.length) {
+ byte[] nodeKey = RSchemaUtils.toEntityNodeKey(levelPath);
+ byte[] value = aligned ? DEFAULT_ALIGNED_ENTITY_VALUE : DEFAULT_NODE_VALUE;
+ readWriteHandler.createNode(nodeKey, value);
+ } else {
+ readWriteHandler.createNode(levelPath, RMNodeType.INTERNAL, DEFAULT_NODE_VALUE);
+ }
+ } else {
+ if (start == nodes.length) {
+ // make sure sg node and entity node are different
+ // eg.,'root.a' is a storage group path, 'root.a.b' can not be a timeseries
+ if (checkResult.getResult(RMNodeType.STORAGE_GROUP)) {
+ throw new MetadataException("Storage Group Node and Entity Node could not be same!");
+ }
+
+ if (!checkResult.getResult(RMNodeType.ENTITY)) {
+ throw new MNodeTypeMismatchException(
+ RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
+ }
+
+ if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) == 0) {
+ throw new MetadataException(
+ "Timeseries under this entity is not aligned, please use createTimeseries or change entity. (Path: "
+ + RSchemaUtils.getPathByLevelPath(levelPath)
+ + ")");
+ }
+ } else if (checkResult.getResult(RMNodeType.MEASUREMENT)
+ || checkResult.getResult(RMNodeType.ALISA)) {
+ throw new MNodeTypeMismatchException(
+ RSchemaUtils.getPathByLevelPath(levelPath), MetadataConstant.ENTITY_MNODE_TYPE);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ }
+
+ @Override
+ public Pair<Integer, Set<String>> deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException {
+ try {
+ if (deleteUpdateLock.writeLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ Set<String> failedNames = ConcurrentHashMap.newKeySet();
+ Set<IMNode> parentNeedsToCheck = ConcurrentHashMap.newKeySet();
+
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ traverseOutcomeBasins(
+ pathPattern.getNodes(),
+ MAX_PATH_DEPTH,
+ (key, value) -> {
+ String path = null;
+ RMeasurementMNode deletedNode;
+ try {
+ path = RSchemaUtils.getPathByInnerName(new String(key));
+ String[] nodes = MetaUtils.splitPathToDetachedPath(path);
+ deletedNode = new RMeasurementMNode(path, value, readWriteHandler);
+ atomicInteger.incrementAndGet();
+ try (WriteBatch batch = new WriteBatch()) {
+ // delete the last node of path
+ batch.delete(key);
+ if (deletedNode.getAlias() != null) {
+ String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
+ aliasNodes[nodes.length - 1] = deletedNode.getAlias();
+ String aliasLevelPath =
+ RSchemaUtils.getLevelPath(aliasNodes, aliasNodes.length - 1);
+ batch.delete(RSchemaUtils.toAliasNodeKey(aliasLevelPath));
+ }
+ if (deletedNode.getTags() != null && !deletedNode.getTags().isEmpty()) {
+ batch.delete(
+ readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS), key);
+ // TODO: tags invert index update
+ }
+ readWriteHandler.executeBatch(batch);
+ if (!deletedNode.getParent().isStorageGroup()) {
+ parentNeedsToCheck.add(deletedNode.getParent());
+ }
+ }
+ } catch (Exception e) {
+ logger.error("delete timeseries [{}] fail", path, e);
+ failedNames.add(path);
+ return false;
+ }
+ return true;
+ },
+ new Character[] {NODE_TYPE_MEASUREMENT});
+
+ // delete parent without child after timeseries deleted
+ while (true) {
+ if (parentNeedsToCheck.isEmpty()) {
+ break;
+ }
+ Set<IMNode> tempSet = ConcurrentHashMap.newKeySet();
+
+ parentNeedsToCheck
+ .parallelStream()
+ .forEach(
+ currentNode -> {
+ if (!currentNode.isStorageGroup()) {
+ PartialPath parentPath = currentNode.getPartialPath();
+ int level = parentPath.getNodeLength();
+ int end = parentPath.getNodeLength() - 1;
+ if (!readWriteHandler.existAnySiblings(
+ RSchemaUtils.getLevelPathPrefix(parentPath.getNodes(), end, level))) {
+ try {
+ readWriteHandler.deleteNode(
+ parentPath.getNodes(), RSchemaUtils.typeOfMNode(currentNode));
+ IMNode parentNode = currentNode.getParent();
+ if (!parentNode.isStorageGroup()) {
+ tempSet.add(currentNode.getParent());
+ }
+ } catch (Exception e) {
+ logger.warn("delete {} fail.", parentPath.getFullPath(), e);
+ }
+ }
+ }
+ });
+ parentNeedsToCheck.clear();
+ parentNeedsToCheck.addAll(tempSet);
+ }
+ return new Pair<>(atomicInteger.get(), failedNames);
+ } else {
+ throw new AcquireLockTimeoutException(
+ "acquire lock timeout when delete timeseries: " + pathPattern);
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ return new Pair<>(0, null);
+ } finally {
+ deleteUpdateLock.writeLock().unlock();
+ }
+ }
+
+ private void traverseOutcomeBasins(
+ String[] nodes,
+ int maxLevel,
+ BiFunction<byte[], byte[], Boolean> function,
+ Character[] nodeTypeArray)
+ throws IllegalPathException {
+ List<String[]> allNodesArray = RSchemaUtils.replaceMultiWildcardToSingle(nodes, maxLevel);
+ allNodesArray.parallelStream().forEach(x -> traverseByPatternPath(x, function, nodeTypeArray));
+ }
+
+ private void traverseByPatternPath(
+ String[] nodes, BiFunction<byte[], byte[], Boolean> function, Character[] nodeTypeArray) {
+
+ int startIndex = 0;
+ List<String[]> scanKeys = new ArrayList<>();
+
+ int indexOfPrefix = indexOfFirstWildcard(nodes, startIndex);
+ if (indexOfPrefix >= nodes.length) {
+ Arrays.stream(nodeTypeArray)
+ .parallel()
+ .forEach(
+ x -> {
+ String levelPrefix =
+ RSchemaUtils.convertPartialPathToInnerByNodes(nodes, nodes.length - 1, x);
+ try {
+ Holder<byte[]> holder = new Holder<>();
+ readWriteHandler.keyExist(levelPrefix.getBytes(), holder);
+ if (holder.getValue() != null) {
+ function.apply(levelPrefix.getBytes(), holder.getValue());
+ }
+ } catch (RocksDBException e) {
+ logger.error(e.getMessage());
+ }
+ });
+ return;
+ }
+
+ startIndex = indexOfPrefix;
+ String[] seedPath = ArrayUtils.subarray(nodes, 0, indexOfPrefix);
+ scanKeys.add(seedPath);
+
+ while (!scanKeys.isEmpty()) {
+ int firstNonWildcardIndex = indexOfFirstNonWildcard(nodes, startIndex);
+ int nextFirstWildcardIndex = indexOfFirstWildcard(nodes, firstNonWildcardIndex);
+ startIndex = nextFirstWildcardIndex;
+ int level = nextFirstWildcardIndex - 1;
+
+ boolean lastIteration = nextFirstWildcardIndex >= nodes.length;
+ Character[] nodeType;
+ if (!lastIteration) {
+ nodeType = ALL_NODE_TYPE_ARRAY;
+ } else {
+ nodeType = nodeTypeArray;
+ }
+
+ Queue<String[]> tempNodes = new ConcurrentLinkedQueue<>();
+ byte[] suffixToMatch =
+ RSchemaUtils.getSuffixOfLevelPath(
+ ArrayUtils.subarray(nodes, firstNonWildcardIndex, nextFirstWildcardIndex), level);
+
+ scanKeys
+ .parallelStream()
+ .forEach(
+ prefixNodes -> {
+ String levelPrefix =
+ RSchemaUtils.getLevelPathPrefix(prefixNodes, prefixNodes.length - 1, level);
+ Arrays.stream(nodeType)
+ .parallel()
+ .forEach(
+ x -> {
+ byte[] startKey = RSchemaUtils.toRocksDBKey(levelPrefix, x);
+ RocksIterator iterator = readWriteHandler.iterator(null);
+ iterator.seek(startKey);
+ while (iterator.isValid()) {
+ if (!RSchemaUtils.prefixMatch(iterator.key(), startKey)) {
+ break;
+ }
+ if (RSchemaUtils.suffixMatch(iterator.key(), suffixToMatch)) {
+ if (lastIteration) {
+ function.apply(iterator.key(), iterator.value());
+ } else {
+ tempNodes.add(RSchemaUtils.toMetaNodes(iterator.key()));
+ }
+ }
+ iterator.next();
+ }
+ });
+ });
+ scanKeys.clear();
+ scanKeys.addAll(tempNodes);
+ tempNodes.clear();
+ }
+ }
+
+ private int indexOfFirstWildcard(String[] nodes, int start) {
+ int index = start;
+ for (; index < nodes.length; index++) {
+ if (ONE_LEVEL_PATH_WILDCARD.equals(nodes[index])
+ || MULTI_LEVEL_PATH_WILDCARD.equals(nodes[index])) {
+ break;
+ }
+ }
+ return index;
+ }
+
+ private int indexOfFirstNonWildcard(String[] nodes, int start) {
+ int index = start;
+ for (; index < nodes.length; index++) {
+ if (!ONE_LEVEL_PATH_WILDCARD.equals(nodes[index])
+ && !MULTI_LEVEL_PATH_WILDCARD.equals(nodes[index])) {
+ break;
+ }
+ }
+ return index;
+ }
+
+ protected Pair<Integer, Set<String>> deleteTimeseries(PartialPath pathPattern)
+ throws MetadataException {
+ return deleteTimeseries(pathPattern, false);
+ }
+
+ private IMNode getDeviceNodeWithAutoCreate(PartialPath devicePath, boolean autoCreateSchema)
+ throws MetadataException {
+ IMNode node = null;
+ try {
+ node = getDeviceNode(devicePath);
+ return node;
+ } catch (PathNotExistException e) {
+ if (!config.isAutoCreateSchemaEnabled()) {
+ throw new PathNotExistException(devicePath.getFullPath());
+ }
+ try {
+ createEntityRecursively(
+ devicePath.getNodes(), devicePath.getNodeLength(), storageGroupPathLevel, false);
+ node = getDeviceNode(devicePath);
+ } catch (RocksDBException ex) {
+ throw new MetadataException(ex);
+ } catch (InterruptedException ex) {
+ logger.warn("Acquire lock interrupted", ex);
+ Thread.currentThread().interrupt();
+ }
+ }
+ return node;
+ }
+
+ @Override
+ public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isPathExist(PartialPath path) throws MetadataException {
+ if (PATH_ROOT.equals(path.getFullPath())) {
+ return true;
+ }
+
+ String innerPathName = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ try {
+ CheckKeyResult checkKeyResult =
+ readWriteHandler.keyExistByTypes(innerPathName, RMNodeType.values());
+ if (checkKeyResult.existAnyKey()) {
+ return true;
+ }
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ return false;
+ }
+
+ @Override
+ public int getAllTimeseriesCount(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException {
+ return getCountByNodeType(new Character[] {NODE_TYPE_MEASUREMENT}, pathPattern.getNodes());
+ }
+
+ @TestOnly
+ public int getAllTimeseriesCount(PartialPath pathPattern) throws MetadataException {
+ return getAllTimeseriesCount(pathPattern, false);
+ }
+
+ @Override
+ public int getDevicesNum(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException {
+ return getCountByNodeType(new Character[] {NODE_TYPE_ENTITY}, pathPattern.getNodes());
+ }
+
+ private int getCountByNodeType(Character[] nodetype, String[] nodes) throws IllegalPathException {
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ BiFunction<byte[], byte[], Boolean> function =
+ (a, b) -> {
+ atomicInteger.incrementAndGet();
+ return true;
+ };
+ traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
+ return atomicInteger.get();
+ }
+
+ @Override
+ public int getNodesCountInGivenLevel(PartialPath pathPattern, int level, boolean isPrefixMatch)
+ throws MetadataException {
+ // todo support wildcard
+ if (pathPattern.getFullPath().contains(ONE_LEVEL_PATH_WILDCARD)) {
+ throw new MetadataException(
+ "Wildcards are not currently supported for this operation"
+ + " [COUNT NODES pathPattern].");
+ }
+ String innerNameByLevel =
+ RSchemaUtils.getLevelPath(pathPattern.getNodes(), pathPattern.getNodeLength() - 1, level);
+ AtomicInteger atomicInteger = new AtomicInteger(0);
+ Function<String, Boolean> function =
+ s -> {
+ atomicInteger.incrementAndGet();
+ return true;
+ };
+ Arrays.stream(ALL_NODE_TYPE_ARRAY)
+ .parallel()
+ .forEach(
+ x -> {
+ String getKeyByInnerNameLevel =
+ x + innerNameByLevel + RSchemaConstants.PATH_SEPARATOR + level;
+ readWriteHandler.getKeyByPrefix(getKeyByInnerNameLevel, function);
+ });
+
+ return atomicInteger.get();
+ }
+
+ @Override
+ public Map<PartialPath, Integer> getMeasurementCountGroupByLevel(
+ PartialPath pathPattern, int level, boolean isPrefixMatch) throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<PartialPath> getNodesListInGivenLevel(
+ PartialPath pathPattern, int nodeLevel, boolean isPrefixMatch, StorageGroupFilter filter)
+ throws MetadataException {
+ return getNodesListInGivenLevel(pathPattern, nodeLevel);
+ }
+
+ public List<PartialPath> getNodesListInGivenLevel(PartialPath pathPattern, int nodeLevel)
+ throws MetadataException {
+ // TODO: ignore pathPattern with *, all nodeLevel are start from "root.*"
+ List<PartialPath> results = new ArrayList<>();
+ if (nodeLevel == 0) {
+ results.add(new PartialPath(RSchemaConstants.ROOT));
+ return results;
+ }
+ // TODO: level one usually only contains small numbers, query in serialize
+ Set<String> paths;
+ StringBuilder builder = new StringBuilder();
+ if (nodeLevel <= 5) {
+ char level = (char) (ZERO + nodeLevel);
+ String prefix =
+ builder.append(RSchemaConstants.ROOT).append(PATH_SEPARATOR).append(level).toString();
+ paths = readWriteHandler.getAllByPrefix(prefix);
+ } else {
+ paths = ConcurrentHashMap.newKeySet();
+ char upperLevel = (char) (ZERO + nodeLevel - 1);
+ String prefix =
+ builder
+ .append(RSchemaConstants.ROOT)
+ .append(PATH_SEPARATOR)
+ .append(upperLevel)
+ .toString();
+ Set<String> parentPaths = readWriteHandler.getAllByPrefix(prefix);
+ parentPaths
+ .parallelStream()
+ .forEach(
+ x -> {
+ String targetPrefix = RSchemaUtils.getNextLevelOfPath(x, upperLevel);
+ paths.addAll(readWriteHandler.getAllByPrefix(targetPrefix));
+ });
+ }
+ return RSchemaUtils.convertToPartialPath(paths, nodeLevel);
+ }
+
+ @Override
+ public Set<String> getChildNodePathInNextLevel(PartialPath pathPattern) throws MetadataException {
+ // todo support wildcard
+ if (pathPattern.getFullPath().contains(ONE_LEVEL_PATH_WILDCARD)) {
+ throw new MetadataException(
+ "Wildcards are not currently supported for this operation"
+ + " [SHOW CHILD PATHS pathPattern].");
+ }
+ Set<String> result = Collections.synchronizedSet(new HashSet<>());
+ String innerNameByLevel =
+ RSchemaUtils.getLevelPath(
+ pathPattern.getNodes(),
+ pathPattern.getNodeLength() - 1,
+ pathPattern.getNodeLength())
+ + RSchemaConstants.PATH_SEPARATOR
+ + pathPattern.getNodeLength();
+ Function<String, Boolean> function =
+ s -> {
+ result.add(RSchemaUtils.getPathByInnerName(s));
+ return true;
+ };
+
+ Arrays.stream(ALL_NODE_TYPE_ARRAY)
+ .parallel()
+ .forEach(x -> readWriteHandler.getKeyByPrefix(x + innerNameByLevel, function));
+ return result;
+ }
+
+ @Override
+ public Set<String> getChildNodeNameInNextLevel(PartialPath pathPattern) throws MetadataException {
+ Set<String> childPath = getChildNodePathInNextLevel(pathPattern);
+ Set<String> childName = new HashSet<>();
+ for (String str : childPath) {
+ childName.add(str.substring(str.lastIndexOf(RSchemaConstants.PATH_SEPARATOR) + 1));
+ }
+ return childName;
+ }
+
+ @Override
+ public Set<PartialPath> getBelongedDevices(PartialPath timeseries) throws MetadataException {
+ Set<PartialPath> result = Collections.synchronizedSet(new HashSet<>());
+ BiFunction<byte[], byte[], Boolean> function =
+ (a, b) -> {
+ String path = new String(a);
+ PartialPath partialPath;
+ try {
+ partialPath =
+ new PartialPath(path.substring(0, path.lastIndexOf(IoTDBConstant.PATH_SEPARATOR)));
+ } catch (IllegalPathException e) {
+ return false;
+ }
+ result.add(partialPath);
+ return true;
+ };
+ traverseOutcomeBasins(
+ timeseries.getNodes(), MAX_PATH_DEPTH, function, new Character[NODE_TYPE_ENTITY]);
+ return result;
+ }
+
+ @Override
+ public Set<PartialPath> getMatchedDevices(PartialPath pathPattern, boolean isPrefixMatch)
+ throws MetadataException {
+ Set<PartialPath> allPath = new HashSet<>();
+ getMatchedPathByNodeType(pathPattern.getNodes(), new Character[] {NODE_TYPE_ENTITY}, allPath);
+ return allPath;
+ }
+
+ private void getMatchedPathByNodeType(
+ String[] nodes, Character[] nodetype, Collection<PartialPath> collection)
+ throws IllegalPathException {
+ List<String> allResult = Collections.synchronizedList(new ArrayList<>());
+ BiFunction<byte[], byte[], Boolean> function =
+ (a, b) -> {
+ allResult.add(RSchemaUtils.getPathByInnerName(new String(a)));
+ return true;
+ };
+ traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, nodetype);
+
+ for (String path : allResult) {
+ collection.add(new PartialPath(path));
+ }
+ }
+
+ @Override
+ public Pair<List<ShowDevicesResult>, Integer> getMatchedDevices(ShowDevicesPlan plan)
+ throws MetadataException {
+ List<ShowDevicesResult> res = Collections.synchronizedList(new ArrayList<>());
+ BiFunction<byte[], byte[], Boolean> function =
+ (a, b) -> {
+ String fullPath = RSchemaUtils.getPathByInnerName(new String(a));
+ res.add(new ShowDevicesResult(fullPath, RSchemaUtils.isAligned(b), storageGroupFullPath));
+ return true;
+ };
+ traverseOutcomeBasins(
+ plan.getPath().getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_ENTITY});
+
+ // todo Page query, record offset
+ return new Pair<>(res, 1);
+ }
+
+ @Override
+ public List<MeasurementPath> getMeasurementPaths(PartialPath pathPattern, boolean isPrefixMath)
+ throws MetadataException {
+ List<MeasurementPath> allResult = Collections.synchronizedList(new ArrayList<>());
+ BiFunction<byte[], byte[], Boolean> function =
+ (a, b) -> {
+ allResult.add(
+ new RMeasurementMNode(
+ RSchemaUtils.getPathByInnerName(new String(a)), b, readWriteHandler)
+ .getMeasurementPath());
+ return true;
+ };
+ traverseOutcomeBasins(
+ pathPattern.getNodes(), MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
+
+ return allResult;
+ }
+
+ @Override
+ public Pair<List<MeasurementPath>, Integer> getMeasurementPathsWithAlias(
+ PartialPath pathPattern, int limit, int offset, boolean isPrefixMatch)
+ throws MetadataException {
+ // todo page query
+ return new Pair<>(getMeasurementPaths(pathPattern, false), offset + limit);
+ }
+
+ @Override
+ public Pair<List<ShowTimeSeriesResult>, Integer> showTimeseries(
+ ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
+ if (plan.getKey() != null && plan.getValue() != null) {
+ return showTimeseriesWithIndex(plan, context);
+ } else {
+ return showTimeseriesWithoutIndex(plan, context);
+ }
+ }
+
+ private Pair<List<ShowTimeSeriesResult>, Integer> showTimeseriesWithIndex(
+ ShowTimeSeriesPlan plan, QueryContext context) {
+ // temporarily unsupported
+ throw new UnsupportedOperationException("temporarily unsupported : showTimeseriesWithIndex");
+ }
+
+ private Pair<List<ShowTimeSeriesResult>, Integer> showTimeseriesWithoutIndex(
+ ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
+
+ List<ShowTimeSeriesResult> res = new LinkedList<>();
+ Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> measurementPathsAndTags =
+ getMatchedMeasurementPathWithTags(plan.getPath().getNodes());
+ for (Entry<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> entry :
+ measurementPathsAndTags.entrySet()) {
+ MeasurementPath measurementPath = entry.getKey();
+ res.add(
+ new ShowTimeSeriesResult(
+ measurementPath.getFullPath(),
+ measurementPath.getMeasurementAlias(),
+ storageGroupFullPath,
+ measurementPath.getMeasurementSchema().getType(),
+ measurementPath.getMeasurementSchema().getEncodingType(),
+ measurementPath.getMeasurementSchema().getCompressor(),
+ 0,
+ entry.getValue().left,
+ entry.getValue().right));
+ }
+ // todo Page query, record offset
+ return new Pair<>(res, 1);
+ }
+
+ private Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>>
+ getMatchedMeasurementPathWithTags(String[] nodes) throws IllegalPathException {
+ Map<MeasurementPath, Pair<Map<String, String>, Map<String, String>>> allResult =
+ new ConcurrentHashMap<>();
+ BiFunction<byte[], byte[], Boolean> function =
+ (a, b) -> {
+ MeasurementPath measurementPath =
+ new RMeasurementMNode(
+ RSchemaUtils.getPathByInnerName(new String(a)), b, readWriteHandler)
+ .getMeasurementPath();
+ Object tag = RSchemaUtils.parseNodeValue(b, RMNodeValueType.TAGS);
+ if (!(tag instanceof Map)) {
+ tag = Collections.emptyMap();
+ }
+ Object attributes = RSchemaUtils.parseNodeValue(b, RMNodeValueType.ATTRIBUTES);
+ if (!(attributes instanceof Map)) {
+ attributes = Collections.emptyMap();
+ }
+ @SuppressWarnings("unchecked")
+ Pair<Map<String, String>, Map<String, String>> tagsAndAttributes =
+ new Pair<>((Map<String, String>) tag, (Map<String, String>) attributes);
+ allResult.put(measurementPath, tagsAndAttributes);
+ return true;
+ };
+ traverseOutcomeBasins(nodes, MAX_PATH_DEPTH, function, new Character[] {NODE_TYPE_MEASUREMENT});
+
+ return allResult;
+ }
+
+ @Override
+ public List<MeasurementPath> getAllMeasurementByDevicePath(PartialPath devicePath)
+ throws PathNotExistException {
+ List<MeasurementPath> result = new ArrayList<>();
+ String nextLevelPathName =
+ RSchemaUtils.convertPartialPathToInner(
+ devicePath.getFullPath(), devicePath.getNodeLength() + 1, NODE_TYPE_MEASUREMENT);
+ Map<byte[], byte[]> allMeasurementPath =
+ readWriteHandler.getKeyValueByPrefix(nextLevelPathName);
+ for (Map.Entry<byte[], byte[]> entry : allMeasurementPath.entrySet()) {
+ PartialPath pathName;
+ try {
+ pathName = new PartialPath(new String(entry.getKey()));
+ } catch (IllegalPathException e) {
+ throw new PathNotExistException(e.getMessage());
+ }
+ MeasurementSchema measurementSchema =
+ (MeasurementSchema) RSchemaUtils.parseNodeValue(entry.getValue(), RMNodeValueType.SCHEMA);
+ result.add(new MeasurementPath(pathName, measurementSchema));
+ }
+ return result;
+ }
+
+ @Override
+ public IMNode getDeviceNode(PartialPath path) throws MetadataException {
+ String[] nodes = path.getNodes();
+ String levelPath = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
+ Holder<byte[]> holder = new Holder<>();
+ try {
+ if (readWriteHandler.keyExistByType(levelPath, RMNodeType.ENTITY, holder)) {
+ return new REntityMNode(path.getFullPath(), holder.getValue(), readWriteHandler);
+ } else {
+ throw new PathNotExistException(path.getFullPath());
+ }
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ @Override
+ public IMeasurementMNode[] getMeasurementMNodes(PartialPath deviceId, String[] measurements)
+ throws MetadataException {
+ IMeasurementMNode[] mNodes = new IMeasurementMNode[measurements.length];
+ for (int i = 0; i < mNodes.length; i++) {
+ try {
+ mNodes[i] = getMeasurementMNode(deviceId.concatNode(measurements[i]));
+ } catch (PathNotExistException | MNodeTypeMismatchException ignored) {
+ logger.warn("MeasurementMNode {} does not exist in {}", measurements[i], deviceId);
+ }
+ if (mNodes[i] == null && !IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
+ throw new MetadataException(measurements[i] + " does not exist in " + deviceId);
+ }
+ }
+ return mNodes;
+ }
+
+ @Override
+ public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws MetadataException {
+ String[] nodes = fullPath.getNodes();
+ String key = RSchemaUtils.getLevelPath(nodes, nodes.length - 1);
+ IMeasurementMNode node = null;
+ try {
+ Holder<byte[]> holder = new Holder<>();
+ if (readWriteHandler.keyExistByType(key, RMNodeType.MEASUREMENT, holder)) {
+ node = new RMeasurementMNode(fullPath.getFullPath(), holder.getValue(), readWriteHandler);
+ } else if (readWriteHandler.keyExistByType(key, RMNodeType.ALISA, holder)) {
+ byte[] aliasValue = holder.getValue();
+ if (aliasValue != null) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(aliasValue);
+ ReadWriteIOUtils.readBytes(byteBuffer, 3);
+ byte[] oriKey = RSchemaUtils.readOriginKey(byteBuffer);
+ node =
+ new RMeasurementMNode(
+ fullPath.getFullPath(), readWriteHandler.get(null, oriKey), readWriteHandler);
+ }
+ }
+ return node;
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ @Override
+ public void changeAlias(PartialPath path, String alias) throws MetadataException, IOException {
+ upsertTagsAndAttributes(alias, null, null, path);
+ }
+
+ @Override
+ public void upsertTagsAndAttributes(
+ String alias,
+ Map<String, String> tagsMap,
+ Map<String, String> attributesMap,
+ PartialPath path)
+ throws MetadataException, IOException {
+ try {
+ if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ byte[] originKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
+ try {
+ Lock rawKeyLock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+ if (rawKeyLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean hasUpdate = false;
+ String[] nodes = path.getNodes();
+ RMeasurementMNode mNode = (RMeasurementMNode) getMeasurementMNode(path);
+ // upsert alias
+ if (StringUtils.isNotEmpty(alias)
+ && (StringUtils.isEmpty(mNode.getAlias()) || !mNode.getAlias().equals(alias))) {
+ String oldAliasStr = mNode.getAlias();
+ mNode.setAlias(alias);
+ hasUpdate = true;
+ String[] newAlias = Arrays.copyOf(nodes, nodes.length);
+ newAlias[nodes.length - 1] = alias;
+ String newAliasLevel = RSchemaUtils.getLevelPath(newAlias, newAlias.length - 1);
+ byte[] newAliasKey = RSchemaUtils.toAliasNodeKey(newAliasLevel);
+ Lock newAliasLock =
+ locksPool.computeIfAbsent(newAliasLevel, x -> new ReentrantLock());
+ Lock oldAliasLock = null;
+ try (WriteBatch batch = new WriteBatch()) {
+ if (newAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ if (readWriteHandler.keyExistByAllTypes(newAliasLevel).existAnyKey()) {
+ throw new PathAlreadyExistException("Alias node has exist: " + newAliasLevel);
+ }
+ batch.put(newAliasKey, RSchemaUtils.buildAliasNodeValue(originKey));
+ if (StringUtils.isNotEmpty(oldAliasStr)) {
+ String[] oldAliasNodes = Arrays.copyOf(nodes, nodes.length);
+ oldAliasNodes[nodes.length - 1] = oldAliasStr;
+ String oldAliasLevel =
+ RSchemaUtils.getLevelPath(oldAliasNodes, oldAliasNodes.length - 1);
+ byte[] oldAliasKey = RSchemaUtils.toAliasNodeKey(oldAliasLevel);
+ oldAliasLock =
+ locksPool.computeIfAbsent(oldAliasLevel, x -> new ReentrantLock());
+ if (oldAliasLock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ if (!readWriteHandler.keyExist(oldAliasKey)) {
+ logger.error(
+ "origin node [{}] has alias but alias node [{}] doesn't exist ",
+ levelPath,
+ oldAliasLevel);
+ }
+ batch.delete(oldAliasKey);
+ } else {
+ throw new AcquireLockTimeoutException(
+ "acquire lock timeout: " + oldAliasLevel);
+ }
+ }
+ } else {
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + newAliasLevel);
+ }
+ // TODO: need application lock
+ readWriteHandler.executeBatch(batch);
+ } finally {
+ newAliasLock.unlock();
+ if (oldAliasLock != null) {
+ oldAliasLock.unlock();
+ }
+ }
+ }
+
+ try (WriteBatch batch = new WriteBatch()) {
+ if (tagsMap != null && !tagsMap.isEmpty()) {
+ if (mNode.getTags() == null) {
+ mNode.setTags(tagsMap);
+ } else {
+ mNode.getTags().putAll(tagsMap);
+ }
+ batch.put(
+ readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
+ originKey,
+ DEFAULT_NODE_VALUE);
+ hasUpdate = true;
+ }
+ if (attributesMap != null && !attributesMap.isEmpty()) {
+ if (mNode.getAttributes() == null) {
+ mNode.setAttributes(attributesMap);
+ } else {
+ mNode.getAttributes().putAll(attributesMap);
+ }
+ hasUpdate = true;
+ }
+ if (hasUpdate) {
+ batch.put(originKey, mNode.getRocksDBValue());
+ readWriteHandler.executeBatch(batch);
+ }
+ }
+
+ } finally {
+ rawKeyLock.unlock();
+ }
+ } else {
+ rawKeyLock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ } else {
+ throw new AcquireLockTimeoutException(
+ "acquire lock timeout when do upsertTagsAndAttributes: " + path.getFullPath());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ deleteUpdateLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void addAttributes(Map<String, String> attributesMap, PartialPath path)
+ throws MetadataException, IOException {
+ if (attributesMap == null || attributesMap.isEmpty()) {
+ return;
+ }
+
+ try {
+ if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+ Holder<byte[]> holder = new Holder<>();
+ try {
+ Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ if (!readWriteHandler.keyExist(key, holder)) {
+ throw new PathNotExistException(path.getFullPath());
+ }
+
+ byte[] originValue = holder.getValue();
+ RMeasurementMNode mNode =
+ new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+ if (mNode.getAttributes() != null) {
+ for (Map.Entry<String, String> entry : attributesMap.entrySet()) {
+ if (mNode.getAttributes().containsKey(entry.getKey())) {
+ throw new MetadataException(
+ String.format(
+ "TimeSeries [%s] already has the attribute [%s].",
+ path, new String(key)));
+ }
+ }
+ attributesMap.putAll(mNode.getAttributes());
+ }
+ mNode.setAttributes(attributesMap);
+ readWriteHandler.updateNode(key, mNode.getRocksDBValue());
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ } else {
+ throw new AcquireLockTimeoutException(
+ "acquire lock timeout when do addAttributes: " + path.getFullPath());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ deleteUpdateLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void addTags(Map<String, String> tagsMap, PartialPath path)
+ throws MetadataException, IOException {
+ if (tagsMap == null || tagsMap.isEmpty()) {
+ return;
+ }
+
+ try {
+ if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+ Holder<byte[]> holder = new Holder<>();
+ try {
+ Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ if (!readWriteHandler.keyExist(key, holder)) {
+ throw new PathNotExistException(path.getFullPath());
+ }
+ byte[] originValue = holder.getValue();
+ RMeasurementMNode mNode =
+ new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+ boolean hasTags = false;
+ if (mNode.getTags() != null && mNode.getTags().size() > 0) {
+ hasTags = true;
+ for (Map.Entry<String, String> entry : tagsMap.entrySet()) {
+ if (mNode.getTags().containsKey(entry.getKey())) {
+ throw new MetadataException(
+ String.format(
+ "TimeSeries [%s] already has the tag [%s].", path, new String(key)));
+ }
+ tagsMap.putAll(mNode.getTags());
+ }
+ }
+ mNode.setTags(tagsMap);
+ try (WriteBatch batch = new WriteBatch()) {
+ if (!hasTags) {
+ batch.put(
+ readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS),
+ key,
+ DEFAULT_NODE_VALUE);
+ }
+ batch.put(key, mNode.getRocksDBValue());
+ // TODO: need application lock
+ readWriteHandler.executeBatch(batch);
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ } else {
+ throw new AcquireLockTimeoutException(
+ "acquire lock timeout when do addTags: " + path.getFullPath());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ deleteUpdateLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void dropTagsOrAttributes(Set<String> keySet, PartialPath path)
+ throws MetadataException, IOException {
+ if (keySet == null || keySet.isEmpty()) {
+ return;
+ }
+ try {
+ if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+ Holder<byte[]> holder = new Holder<>();
+ try {
+ Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ if (!readWriteHandler.keyExist(key, holder)) {
+ throw new PathNotExistException(path.getFullPath());
+ }
+
+ byte[] originValue = holder.getValue();
+ RMeasurementMNode mNode =
+ new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+ int tagLen = mNode.getTags() == null ? 0 : mNode.getTags().size();
+
+ boolean didAnyUpdate = false;
+ for (String toDelete : keySet) {
+ didAnyUpdate = false;
+ if (mNode.getTags() != null && mNode.getTags().containsKey(toDelete)) {
+ mNode.getTags().remove(toDelete);
+ didAnyUpdate = true;
+ }
+ if (mNode.getAttributes() != null && mNode.getAttributes().containsKey(toDelete)) {
+ mNode.getAttributes().remove(toDelete);
+ didAnyUpdate = true;
+ }
+ if (!didAnyUpdate) {
+ logger.warn("TimeSeries [{}] does not have tag/attribute [{}]", path, toDelete);
+ }
+ }
+ if (didAnyUpdate) {
+ try (WriteBatch batch = new WriteBatch()) {
+ if (tagLen > 0 && mNode.getTags().size() <= 0) {
+ batch.delete(
+ readWriteHandler.getColumnFamilyHandleByName(TABLE_NAME_TAGS), key);
+ }
+ batch.put(key, mNode.getRocksDBValue());
+ readWriteHandler.executeBatch(batch);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ } else {
+ throw new AcquireLockTimeoutException(
+ "acquire lock timeout when do dropTagsOrAttributes: " + path.getFullPath());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ deleteUpdateLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void setTagsOrAttributesValue(Map<String, String> alterMap, PartialPath path)
+ throws MetadataException, IOException {
+ if (alterMap == null || alterMap.isEmpty()) {
+ return;
+ }
+
+ try {
+ if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ byte[] key = RSchemaUtils.toMeasurementNodeKey(levelPath);
+ Holder<byte[]> holder = new Holder<>();
+ try {
+ Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ if (!readWriteHandler.keyExist(key, holder)) {
+ throw new PathNotExistException(path.getFullPath());
+ }
+ byte[] originValue = holder.getValue();
+ RMeasurementMNode mNode =
+ new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+ boolean didAnyUpdate = false;
+ for (Map.Entry<String, String> entry : alterMap.entrySet()) {
+ didAnyUpdate = false;
+ if (mNode.getTags() != null && mNode.getTags().containsKey(entry.getKey())) {
+ mNode.getTags().put(entry.getKey(), entry.getValue());
+ didAnyUpdate = true;
+ }
+ if (mNode.getAttributes() != null
+ && mNode.getAttributes().containsKey(entry.getKey())) {
+ mNode.getAttributes().put(entry.getKey(), entry.getValue());
+ didAnyUpdate = true;
+ }
+ if (!didAnyUpdate) {
+ throw new MetadataException(
+ String.format(
+ "TimeSeries [%s] does not have tag/attribute [%s].",
+ path, new String(key)),
+ true);
+ }
+ }
+ if (didAnyUpdate) {
+ readWriteHandler.updateNode(key, mNode.getRocksDBValue());
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ } else {
+ throw new AcquireLockTimeoutException(
+ "acquire lock timeout when do setTagsOrAttributesValue: " + path.getFullPath());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ deleteUpdateLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void renameTagOrAttributeKey(String oldKey, String newKey, PartialPath path)
+ throws MetadataException, IOException {
+ if (StringUtils.isEmpty(oldKey) || StringUtils.isEmpty(newKey)) {
+ return;
+ }
+
+ try {
+ if (deleteUpdateLock.readLock().tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ byte[] nodeKey = RSchemaUtils.toMeasurementNodeKey(levelPath);
+ Holder<byte[]> holder = new Holder<>();
+ try {
+ Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+ if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+ try {
+ if (!readWriteHandler.keyExist(nodeKey, holder)) {
+ throw new PathNotExistException(path.getFullPath());
+ }
+ byte[] originValue = holder.getValue();
+ RMeasurementMNode mNode =
+ new RMeasurementMNode(path.getFullPath(), originValue, readWriteHandler);
+ boolean didAnyUpdate = false;
+ if (mNode.getTags() != null && mNode.getTags().containsKey(oldKey)) {
+ String value = mNode.getTags().get(oldKey);
+ mNode.getTags().remove(oldKey);
+ mNode.getTags().put(newKey, value);
+ didAnyUpdate = true;
+ }
+
+ if (mNode.getAttributes() != null && mNode.getAttributes().containsKey(oldKey)) {
+ String value = mNode.getAttributes().get(oldKey);
+ mNode.getAttributes().remove(oldKey);
+ mNode.getAttributes().put(newKey, value);
+ didAnyUpdate = true;
+ }
+
+ if (didAnyUpdate) {
+ readWriteHandler.updateNode(nodeKey, mNode.getRocksDBValue());
+ } else {
+ throw new MetadataException(
+ String.format(
+ "TimeSeries [%s] does not have tag/attribute [%s].", path, oldKey),
+ true);
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ lock.unlock();
+ throw new AcquireLockTimeoutException("acquire lock timeout: " + levelPath);
+ }
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ } else {
+ throw new AcquireLockTimeoutException(
+ "acquire lock timeout when do renameTagOrAttributeKey: " + path.getFullPath());
+ }
+ } catch (InterruptedException e) {
+ logger.warn("Acquire lock interrupted", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ deleteUpdateLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void collectMeasurementSchema(
+ PartialPath prefixPath, List<IMeasurementSchema> measurementSchemas) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void collectTimeseriesSchema(
+ PartialPath prefixPath, Collection<TimeseriesSchema> timeseriesSchemas) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IMNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
+ throws MetadataException, IOException {
+ // devicePath is a logical path which is parent of measurement, whether in template or not
+ PartialPath devicePath = plan.getDevicePath();
+ String[] measurementList = plan.getMeasurements();
+ IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
+
+ IMNode deviceMNode = getDeviceNodeWithAutoCreate(devicePath, plan.isAligned());
+
+ if (deviceMNode == null) {
+ throw new MetadataException(
+ String.format("Failed to create deviceMNode,device path:[%s]", plan.getDevicePath()));
+ }
+ // check insert non-aligned InsertPlan for aligned timeseries
+ if (deviceMNode.isEntity()) {
+ if (plan.isAligned() && !deviceMNode.getAsEntityMNode().isAligned()) {
+ throw new MetadataException(
+ String.format(
+ "Timeseries under path [%s] is not aligned , please set"
+ + " InsertPlan.isAligned() = false",
+ plan.getDevicePath()));
+ }
+
+ if (!plan.isAligned() && deviceMNode.getAsEntityMNode().isAligned()) {
+ throw new MetadataException(
+ String.format(
+ "Timeseries under path [%s] is aligned , please set"
+ + " InsertPlan.isAligned() = true",
+ plan.getDevicePath()));
+ }
+ }
+
+ // get node for each measurement
+ Map<Integer, IMeasurementMNode> nodeMap = new HashMap<>();
+ Map<Integer, PartialPath> missingNodeIndex = new HashMap<>();
+ for (int i = 0; i < measurementList.length; i++) {
+ PartialPath path = new PartialPath(devicePath.getFullPath(), measurementList[i]);
+ IMeasurementMNode node = getMeasurementMNode(path);
+ if (node == null) {
+ if (!config.isAutoCreateSchemaEnabled()) {
+ throw new PathNotExistException(path.getFullPath());
+ }
+ missingNodeIndex.put(i, path);
+ } else {
+ nodeMap.put(i, node);
+ }
+ }
+
+ // create missing nodes
+ if (!missingNodeIndex.isEmpty()) {
+ if (!(plan instanceof InsertRowPlan) && !(plan instanceof InsertTabletPlan)) {
+ throw new MetadataException(
+ String.format(
+ "Only support insertRow and insertTablet, plan is [%s]", plan.getOperatorType()));
+ }
+
+ if (plan.isAligned()) {
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (Integer index : missingNodeIndex.keySet()) {
+ measurements.add(measurementList[index]);
+ TSDataType type = plan.getDataTypes()[index];
+ dataTypes.add(type);
+ encodings.add(getDefaultEncoding(type));
+ }
+ createAlignedTimeSeries(devicePath, measurements, dataTypes, encodings);
+ } else {
+ for (Map.Entry<Integer, PartialPath> entry : missingNodeIndex.entrySet()) {
+ IMeasurementSchema schema =
+ new MeasurementSchema(
+ entry.getValue().getMeasurement(), plan.getDataTypes()[entry.getKey()]);
+ createTimeseries(entry.getValue(), schema, null, null, null);
+ }
+ }
+
+ // get the latest node
+ for (Entry<Integer, PartialPath> entry : missingNodeIndex.entrySet()) {
+ nodeMap.put(entry.getKey(), getMeasurementMNode(entry.getValue()));
+ }
+ }
+
+ // check datatype
+ for (int i = 0; i < measurementList.length; i++) {
+ try {
+ // check type is match
+ if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
+ try {
+ checkDataTypeMatch(plan, i, nodeMap.get(i).getSchema().getType());
+ } catch (DataTypeMismatchException mismatchException) {
+ logger.warn(
+ "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
+ measurementList[i],
+ plan.getDataTypes()[i],
+ nodeMap.get(i).getSchema().getType());
+ if (!config.isEnablePartialInsert()) {
+ throw mismatchException;
+ } else {
+ // mark failed measurement
+ plan.markFailedMeasurementInsertion(i, mismatchException);
+ continue;
+ }
+ }
+ measurementMNodes[i] = nodeMap.get(i);
+ // set measurementName instead of alias
+ measurementList[i] = nodeMap.get(i).getName();
+ }
+ } catch (MetadataException e) {
+ if (IoTDB.isClusterMode()) {
+ logger.debug(
+ "meet error when check {}.{}, message: {}",
+ devicePath,
+ measurementList[i],
+ e.getMessage());
+ } else {
+ logger.warn(
+ "meet error when check {}.{}, message: {}",
+ devicePath,
+ measurementList[i],
+ e.getMessage());
+ }
+ if (config.isEnablePartialInsert()) {
+ // mark failed measurement
+ plan.markFailedMeasurementInsertion(i, e);
+ } else {
+ throw e;
+ }
+ }
+ }
+ return deviceMNode;
+ }
+
+ private void checkDataTypeMatch(InsertPlan plan, int loc, TSDataType dataType)
+ throws MetadataException {
+ TSDataType insertDataType;
+ if (plan instanceof InsertRowPlan) {
+ if (!((InsertRowPlan) plan).isNeedInferType()) {
+ // only when InsertRowPlan's values is object[], we should check type
+ insertDataType = getTypeInLoc(plan, loc);
+ } else {
+ insertDataType = dataType;
+ }
+ } else {
+ insertDataType = getTypeInLoc(plan, loc);
+ }
+ if (dataType != insertDataType) {
+ String measurement = plan.getMeasurements()[loc];
+ logger.warn(
+ "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
+ measurement,
+ insertDataType,
+ dataType);
+ throw new DataTypeMismatchException(measurement, insertDataType, dataType);
+ }
+ }
+ /** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */
+ private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException {
+ TSDataType dataType;
+ if (plan instanceof InsertRowPlan) {
+ InsertRowPlan tPlan = (InsertRowPlan) plan;
+ dataType =
+ TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
+ } else if (plan instanceof InsertTabletPlan) {
+ dataType = (plan).getDataTypes()[loc];
+ } else {
+ throw new MetadataException(
+ String.format(
+ "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+ }
+ return dataType;
+ }
+
+ @Override
+ public void clear() {
+ try {
+ readWriteHandler.close();
+ } catch (RocksDBException e) {
+ logger.error("Failed to close readWriteHandler,try again.", e);
+ try {
+ Thread.sleep(5);
+ readWriteHandler.close();
+ } catch (RocksDBException e1) {
+ logger.error(String.format("This schemaRegion [%s] closed failed.", this), e);
+ } catch (InterruptedException e1) {
+ logger.warn("Close RocksdDB instance interrupted", e1);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public Set<String> getPathsSetTemplate(String templateName) throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<String> getPathsUsingTemplate(String templateName) throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isTemplateAppendable(Template template, List<String> measurements) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setSchemaTemplate(SetTemplatePlan plan) throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unsetSchemaTemplate(UnsetTemplatePlan plan) throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setUsingSchemaTemplate(ActivateTemplatePlan plan) throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IMeasurementMNode getMeasurementMNodeForTrigger(PartialPath fullPath)
+ throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void releaseMeasurementMNodeAfterDropTrigger(IMeasurementMNode measurementMNode)
+ throws MetadataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("storage group:[%s]", storageGroupFullPath);
+ }
+
+ @TestOnly
+ public void printScanAllKeys() throws IOException {
+ readWriteHandler.scanAllKeys(schemaRegionDirPath + File.separator + "scanAllKeys.txt");
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaUtils.java
new file mode 100644
index 0000000000..1ad26f3aa8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaUtils.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.REntityMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeValueType;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMeasurementMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RStorageGroupMNode;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_ALIAS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_TAGS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_BLOCK_TYPE_TTL;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DATA_VERSION;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.DEFAULT_FLAG;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ESCAPE_PATH_SEPARATOR;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_HAS_ALIAS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_HAS_ATTRIBUTES;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_HAS_SCHEMA;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_HAS_TAGS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_IS_ALIGNED;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.FLAG_SET_TTL;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ALIAS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_INTERNAL;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.PATH_SEPARATOR;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT_CHAR;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ROOT_STRING;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.ZERO;
+
+public class RSchemaUtils {
+ public static final RMNodeType[] NODE_TYPE_ARRAY = new RMNodeType[NODE_TYPE_ALIAS + 1];
+
+ static {
+ NODE_TYPE_ARRAY[NODE_TYPE_INTERNAL] = RMNodeType.INTERNAL;
+ NODE_TYPE_ARRAY[NODE_TYPE_SG] = RMNodeType.STORAGE_GROUP;
+ NODE_TYPE_ARRAY[NODE_TYPE_ENTITY] = RMNodeType.ENTITY;
+ NODE_TYPE_ARRAY[NODE_TYPE_MEASUREMENT] = RMNodeType.MEASUREMENT;
+ NODE_TYPE_ARRAY[NODE_TYPE_ALIAS] = RMNodeType.ALISA;
+ }
+
+ protected static byte[] toInternalNodeKey(String levelPath) {
+ return toRocksDBKey(levelPath, NODE_TYPE_INTERNAL);
+ }
+
+ protected static byte[] toStorageNodeKey(String levelPath) {
+ return toRocksDBKey(levelPath, NODE_TYPE_SG);
+ }
+
+ protected static byte[] toEntityNodeKey(String levelPath) {
+ return toRocksDBKey(levelPath, NODE_TYPE_ENTITY);
+ }
+
+ protected static byte[] toMeasurementNodeKey(String levelPath) {
+ return toRocksDBKey(levelPath, NODE_TYPE_MEASUREMENT);
+ }
+
+ protected static byte[] toAliasNodeKey(String levelPath) {
+ return toRocksDBKey(levelPath, NODE_TYPE_ALIAS);
+ }
+
+ protected static byte[] toRocksDBKey(String levelPath, char type) {
+ return (type + levelPath).getBytes();
+ }
+
+ public static String getLevelPathPrefix(String[] nodes, int end, int level) {
+ StringBuilder builder = new StringBuilder();
+ char depth = (char) (ZERO + level);
+ builder.append(ROOT).append(PATH_SEPARATOR).append(depth);
+ for (int i = 1; i <= end; i++) {
+ builder.append(nodes[i]).append(PATH_SEPARATOR).append(depth);
+ }
+ return builder.toString();
+ }
+
+ public static String getLevelPath(String[] nodes, int end) {
+ return getLevelPath(nodes, end, end);
+ }
+
+ public static String getLevelPath(String[] nodes, int end, int level) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(ROOT);
+ char depth = (char) (ZERO + level);
+ for (int i = 1; i <= end; i++) {
+ builder.append(PATH_SEPARATOR).append(depth).append(nodes[i]);
+ }
+ return builder.toString();
+ }
+
+ public static String getMeasurementLevelPath(String[] prefixPath, String measurement) {
+ String[] nodes = ArrayUtils.add(prefixPath, measurement);
+ return getLevelPath(nodes, nodes.length - 1);
+ }
+
+ public static List<PartialPath> convertToPartialPath(Collection<String> paths, int level) {
+ return paths
+ .parallelStream()
+ .map(x -> getPartialPathFromInnerPath(x, level))
+ .collect(Collectors.toList());
+ }
+
+ public static String getNextLevelOfPath(String innerPath, int currentLevel) {
+ char levelChar = (char) (ZERO + currentLevel);
+ String old = PATH_SEPARATOR + levelChar;
+ String target = PATH_SEPARATOR + (char) (levelChar + 1);
+ return innerPath.replace(old, target);
+ }
+
+ public static String getNextLevelOfPath(String innerPath, char currentLevel) {
+ String old = PATH_SEPARATOR + currentLevel;
+ String target = PATH_SEPARATOR + (char) (currentLevel + 1);
+ return innerPath.replace(old, target);
+ }
+
+ public static PartialPath getPartialPathFromInnerPath(String path, int level) {
+ char charLevel = (char) (ZERO + level);
+ return getPartialPathFromInnerPath(path, charLevel);
+ }
+
+ public static PartialPath getPartialPathFromInnerPath(String path, char level) {
+ String pathWithoutLevel = path.replace(PATH_SEPARATOR + level, PATH_SEPARATOR);
+ String[] nodes = pathWithoutLevel.split(ESCAPE_PATH_SEPARATOR);
+ nodes[0] = PATH_ROOT;
+ return new PartialPath(nodes);
+ }
+
+ public static RMNodeType typeOfMNode(IMNode mNode) {
+ // order sensitive
+ if (mNode instanceof REntityMNode) {
+ return RMNodeType.ENTITY;
+ }
+
+ if (mNode instanceof RStorageGroupMNode) {
+ return RMNodeType.STORAGE_GROUP;
+ }
+
+ if (mNode instanceof RMeasurementMNode) {
+ return RMNodeType.MEASUREMENT;
+ }
+
+ return RMNodeType.INTERNAL;
+ }
+
+ public static byte[] buildMeasurementNodeValue(
+ IMeasurementSchema schema,
+ String alias,
+ Map<String, String> tags,
+ Map<String, String> attributes)
+ throws IOException {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ReadWriteIOUtils.write(DATA_VERSION, outputStream);
+
+ byte flag = DEFAULT_FLAG;
+ if (alias != null) {
+ flag = (byte) (flag | FLAG_HAS_ALIAS);
+ }
+
+ if (tags != null && tags.size() > 0) {
+ flag = (byte) (flag | FLAG_HAS_TAGS);
+ }
+
+ if (attributes != null && attributes.size() > 0) {
+ flag = (byte) (flag | FLAG_HAS_ATTRIBUTES);
+ }
+
+ if (schema != null) {
+ flag = (byte) (flag | FLAG_HAS_SCHEMA);
+ }
+
+ ReadWriteIOUtils.write(flag, outputStream);
+
+ if (schema != null) {
+ ReadWriteIOUtils.write(DATA_BLOCK_TYPE_SCHEMA, outputStream);
+ schema.serializeTo(outputStream);
+ }
+
+ if (alias != null) {
+ ReadWriteIOUtils.write(DATA_BLOCK_TYPE_ALIAS, outputStream);
+ ReadWriteIOUtils.write(alias, outputStream);
+ }
+
+ if (tags != null && tags.size() > 0) {
+ ReadWriteIOUtils.write(DATA_BLOCK_TYPE_TAGS, outputStream);
+ ReadWriteIOUtils.write(tags, outputStream);
+ }
+
+ if (attributes != null && attributes.size() > 0) {
+ ReadWriteIOUtils.write(DATA_BLOCK_TYPE_TAGS, outputStream);
+ ReadWriteIOUtils.write(tags, outputStream);
+ }
+ return outputStream.toByteArray();
+ }
+
+ public static byte[] buildAliasNodeValue(byte[] originKey) {
+ byte[] prefix = new byte[] {DATA_VERSION, DEFAULT_FLAG, DATA_BLOCK_TYPE_ORIGIN_KEY};
+ byte[] len = BytesUtils.intToBytes(originKey.length);
+ return BytesUtils.concatByteArray(BytesUtils.concatByteArray(prefix, len), originKey);
+ }
+
+ public static byte[] readOriginKey(ByteBuffer buffer) {
+ int len = ReadWriteIOUtils.readInt(buffer);
+ return ReadWriteIOUtils.readBytes(buffer, len);
+ }
+
+ public static int indexOfDataBlockType(byte[] data, RMNodeValueType valueType) {
+ if (valueType.getFlag() != null && (data[1] & valueType.getFlag()) == 0) {
+ return -1;
+ }
+
+ int index = -1;
+ boolean typeExist = false;
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+ // skip the data version and filter byte
+ ReadWriteIOUtils.readBytes(byteBuffer, 2);
+
+ while (byteBuffer.hasRemaining()) {
+ byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+ index = byteBuffer.position();
+ switch (blockType) {
+ case DATA_BLOCK_TYPE_TTL:
+ ReadWriteIOUtils.readLong(byteBuffer);
+ break;
+ case DATA_BLOCK_TYPE_ALIAS:
+ ReadWriteIOUtils.readString(byteBuffer);
+ break;
+ case DATA_BLOCK_TYPE_ORIGIN_KEY:
+ readOriginKey(byteBuffer);
+ break;
+ case DATA_BLOCK_TYPE_SCHEMA:
+ MeasurementSchema.deserializeFrom(byteBuffer);
+ break;
+ case DATA_BLOCK_TYPE_TAGS:
+ case DATA_BLOCK_TYPE_ATTRIBUTES:
+ ReadWriteIOUtils.readMap(byteBuffer);
+ break;
+ default:
+ break;
+ }
+ // got the data we need,don't need to read any more
+ if (valueType.getType() == blockType) {
+ typeExist = true;
+ break;
+ }
+ }
+ return typeExist ? index : -1;
+ }
+
+ public static byte[] updateTTL(byte[] origin, long ttl) {
+ int index = indexOfDataBlockType(origin, RMNodeValueType.TTL);
+ if (index < 1) {
+ byte[] ttlBlock = new byte[Long.BYTES + 1];
+ ttlBlock[0] = DATA_BLOCK_TYPE_TTL;
+ BytesUtils.longToBytes(ttl, ttlBlock, 1);
+ origin[1] = (byte) (origin[1] | FLAG_SET_TTL);
+ return BytesUtils.concatByteArray(origin, ttlBlock);
+ } else {
+ BytesUtils.longToBytes(ttl, origin, index);
+ return origin;
+ }
+ }
+
+ private static final char START_FLAG = '\u0019';
+ private static final char SPLIT_FLAG = '.';
+
+ /**
+ * parse value and return a specified type. if no data is required, null is returned.
+ *
+ * @param value value written in default table
+ * @param valueType the type of value to obtain
+ */
+ public static Object parseNodeValue(byte[] value, RMNodeValueType valueType) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+ // skip the version flag and node type flag
+ ReadWriteIOUtils.readByte(byteBuffer);
+ // get block type
+ byte filter = ReadWriteIOUtils.readByte(byteBuffer);
+ Object obj = null;
+ if (valueType.getFlag() != null && (filter & valueType.getFlag()) == 0) {
+ return obj;
+ }
+
+ // this means that the following data contains the information we need
+ while (byteBuffer.hasRemaining()) {
+ byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+ switch (blockType) {
+ case DATA_BLOCK_TYPE_TTL:
+ obj = ReadWriteIOUtils.readLong(byteBuffer);
+ break;
+ case DATA_BLOCK_TYPE_ALIAS:
+ obj = ReadWriteIOUtils.readString(byteBuffer);
+ break;
+ case DATA_BLOCK_TYPE_ORIGIN_KEY:
+ obj = readOriginKey(byteBuffer);
+ break;
+ case DATA_BLOCK_TYPE_SCHEMA:
+ obj = MeasurementSchema.deserializeFrom(byteBuffer);
+ break;
+ case DATA_BLOCK_TYPE_TAGS:
+ case DATA_BLOCK_TYPE_ATTRIBUTES:
+ obj = ReadWriteIOUtils.readMap(byteBuffer);
+ break;
+ default:
+ break;
+ }
+ // got the data we need,don't need to read any more
+ if (valueType.getType() == blockType) {
+ break;
+ }
+ }
+ return obj;
+ }
+
+ public static boolean isAligned(byte[] value) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+ // skip the version flag and node type flag
+ ReadWriteIOUtils.readByte(byteBuffer);
+ // get block type
+ byte flag = ReadWriteIOUtils.readByte(byteBuffer);
+ return (flag & FLAG_IS_ALIGNED) > 0;
+ }
+ /**
+ * get inner name by converting partial path.
+ *
+ * @param partialPath the path needed to be converted.
+ * @param level the level needed to be added.
+ * @param nodeType specified type
+ * @return inner name
+ */
+ public static String convertPartialPathToInner(String partialPath, int level, char nodeType) {
+ StringBuilder stringBuilder = new StringBuilder(nodeType + ROOT);
+ for (int i = partialPath.indexOf(PATH_SEPARATOR); i < partialPath.length() && i >= 0; i++) {
+ char currentChar = partialPath.charAt(i);
+ stringBuilder.append(currentChar);
+ if (currentChar == SPLIT_FLAG) {
+ stringBuilder.append(level);
+ }
+ }
+ return stringBuilder.toString();
+ }
+
+ public static String convertPartialPathToInnerByNodes(String[] nodes, int level, char nodeType) {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(nodeType).append(ROOT);
+ for (int i = 0; i < nodes.length; i++) {
+ if (i == 0 && nodes[i].equals(ROOT_STRING)) {
+ continue;
+ }
+ stringBuilder.append(SPLIT_FLAG).append(level).append(nodes[i]);
+ }
+ return stringBuilder.toString();
+ }
+
+ public static int getLevelByPartialPath(String partialPath) {
+ int levelCount = 0;
+ for (char c : partialPath.toCharArray()) {
+ if (SPLIT_FLAG == c) {
+ levelCount++;
+ }
+ }
+ return levelCount;
+ }
+
+ public static byte[] getSuffixOfLevelPath(String[] nodes, int level) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String str : nodes) {
+ stringBuilder.append(PATH_SEPARATOR).append(level).append(str);
+ }
+ return stringBuilder.toString().getBytes();
+ }
+
+ public static boolean suffixMatch(byte[] key, byte[] suffix) {
+ if (key.length < suffix.length) {
+ return false;
+ }
+
+ for (int i = key.length - 1, j = suffix.length - 1; i >= 0 && j >= 0; i--, j--) {
+ if ((key[i] ^ suffix[j]) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean prefixMatch(byte[] key, byte[] prefix) {
+ if (key.length < prefix.length) {
+ return false;
+ }
+
+ for (int i = 0, j = 0; i < key.length && j < prefix.length; i++, j++) {
+ if ((key[i] ^ prefix[j]) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static String[] toMetaNodes(byte[] rocksdbKey) {
+ String rawKey =
+ new String(
+ Objects.requireNonNull(BytesUtils.subBytes(rocksdbKey, 1, rocksdbKey.length - 1)));
+ String[] nodes = rawKey.split(ESCAPE_PATH_SEPARATOR);
+ nodes[0] = ROOT_STRING;
+ for (int i = 1; i < nodes.length; i++) {
+ nodes[i] = nodes[i].substring(1);
+ }
+ return nodes;
+ }
+
+ public static String getPathByInnerName(String innerName) {
+ char[] keyConvertToCharArray = innerName.toCharArray();
+ StringBuilder stringBuilder = new StringBuilder();
+ char lastChar = START_FLAG;
+ boolean replaceFlag = true;
+ for (char c : keyConvertToCharArray) {
+ if (SPLIT_FLAG == lastChar || START_FLAG == lastChar) {
+ lastChar = c;
+ continue;
+ }
+ if (ROOT_CHAR == c && replaceFlag) {
+ lastChar = c;
+ stringBuilder.append(ROOT_STRING);
+ replaceFlag = false;
+ continue;
+ }
+ stringBuilder.append(c);
+ lastChar = c;
+ }
+ return stringBuilder.toString();
+ }
+
+ public static String getPathByLevelPath(String innerName) {
+ char[] keyConvertToCharArray = innerName.toCharArray();
+ StringBuilder stringBuilder = new StringBuilder();
+ char lastChar = START_FLAG;
+ boolean replaceFlag = true;
+ for (char c : keyConvertToCharArray) {
+ if (SPLIT_FLAG == lastChar) {
+ lastChar = c;
+ continue;
+ }
+ if (ROOT_CHAR == c && replaceFlag) {
+ lastChar = c;
+ stringBuilder.append(ROOT_STRING);
+ replaceFlag = false;
+ continue;
+ }
+ stringBuilder.append(c);
+ lastChar = c;
+ }
+ return stringBuilder.toString();
+ }
+
+ public static String[] newStringArray(String[] oldArray) throws IllegalPathException {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String str : oldArray) {
+ stringBuilder.append(PATH_SEPARATOR).append(str);
+ }
+ return MetaUtils.splitPathToDetachedPath(stringBuilder.substring(1));
+ }
+
+ public static String replaceWildcard(int num) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < num; i++) {
+ stringBuilder.append(RSchemaConstants.PATH_SEPARATOR).append(ONE_LEVEL_PATH_WILDCARD);
+ }
+ return stringBuilder.substring(1);
+ }
+
+ public static List<int[]> getAllCompoundMode(int sum, int n) {
+ if (n <= 2) {
+ List<int[]> result = new ArrayList<>();
+ for (int i = 1; i < sum; i++) {
+ result.add(new int[] {i, sum - i});
+ }
+ return result;
+ }
+ List<int[]> allResult = new ArrayList<>();
+ for (int i = 1; i <= sum - n + 1; i++) {
+ List<int[]> temp = getAllCompoundMode(sum - i, n - 1);
+ for (int[] value : temp) {
+ int[] result = new int[value.length + 1];
+ result[0] = i;
+ System.arraycopy(value, 0, result, 1, value.length);
+ allResult.add(result);
+ }
+ }
+ return allResult;
+ }
+
+ // eg. root.a.*.**.b.**.c
+ public static List<String[]> replaceMultiWildcardToSingle(String[] nodes, int maxLevel)
+ throws IllegalPathException {
+ List<String[]> allNodesArray = new ArrayList<>();
+ List<Integer> multiWildcardPosition = new ArrayList<>();
+ for (int i = 0; i < nodes.length; i++) {
+ if (MULTI_LEVEL_PATH_WILDCARD.equals(nodes[i])) {
+ multiWildcardPosition.add(i);
+ }
+ }
+ if (multiWildcardPosition.isEmpty()) {
+ allNodesArray.add(nodes);
+ } else if (multiWildcardPosition.size() == 1) {
+ for (int i = 1; i <= maxLevel - nodes.length + 2; i++) {
+ String[] clone = nodes.clone();
+ clone[multiWildcardPosition.get(0)] = replaceWildcard(i);
+ allNodesArray.add(RSchemaUtils.newStringArray(clone));
+ }
+ } else {
+ for (int sum = multiWildcardPosition.size();
+ sum <= maxLevel - (nodes.length - multiWildcardPosition.size() - 1);
+ sum++) {
+ List<int[]> result = getAllCompoundMode(sum, multiWildcardPosition.size());
+ for (int[] value : result) {
+ String[] clone = nodes.clone();
+ for (int i = 0; i < value.length; i++) {
+ clone[multiWildcardPosition.get(i)] = replaceWildcard(value[i]);
+ }
+ allNodesArray.add(RSchemaUtils.newStringArray(clone));
+ }
+ }
+ }
+ return allNodesArray;
+ }
+
+ public static String concatNodesName(String[] nodes, int startIdx, int endIdx) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = startIdx; i <= endIdx; i++) {
+ stringBuilder.append(PATH_SEPARATOR).append(nodes[i]);
+ }
+ return stringBuilder.substring(1);
+ }
+
+ public static boolean startWith(byte[] a, byte[] b) {
+ if (a.length < b.length) {
+ return false;
+ }
+
+ for (int i = 0; i < b.length; i++) {
+ if (a[i] != b[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/REntityMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/REntityMNode.java
new file mode 100644
index 0000000000..97768c9e88
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/REntityMNode.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class REntityMNode extends RInternalMNode implements IEntityMNode {
+
+ private volatile boolean isAligned = false;
+
+ /**
+ * Constructor of MNode.
+ *
+ * @param fullPath
+ */
+ public REntityMNode(String fullPath, RSchemaReadWriteHandler readWriteHandler) {
+ super(fullPath, readWriteHandler);
+ }
+
+ @Override
+ void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException {
+ String innerName =
+ RSchemaUtils.convertPartialPathToInner(
+ childName, childNameMaxLevel, RMNodeType.ENTITY.getValue());
+ try {
+ readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ public REntityMNode(String fullPath, byte[] value, RSchemaReadWriteHandler readWriteHandler) {
+ super(fullPath, readWriteHandler);
+ deserialize(value);
+ }
+
+ @Override
+ public boolean addAlias(String alias, IMeasurementMNode child) {
+ // In rocksdb-based mode, there is no need to update in memory
+ return true;
+ }
+
+ @Override
+ public void deleteAliasChild(String alias) {
+ // Don't do any update in the MNode related class, it won't persistent in memory or on disk
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, IMeasurementMNode> getAliasChildren() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setAliasChildren(Map<String, IMeasurementMNode> aliasChildren) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isAligned() {
+ return isAligned;
+ }
+
+ @Override
+ public void setAligned(boolean isAligned) {
+ this.isAligned = isAligned;
+ }
+
+ private void deserialize(byte[] value) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+ // skip the version flag and node type flag
+ byte flag = ReadWriteIOUtils.readBytes(byteBuffer, 2)[1];
+ isAligned = (RSchemaConstants.FLAG_IS_ALIGNED & flag) > 0;
+ }
+
+ @Override
+ public ILastCacheContainer getLastCacheContainer(String measurementId) {
+ return null;
+ }
+
+ @Override
+ public Map<String, ILastCacheContainer> getTemplateLastCaches() {
+ return null;
+ }
+
+ @Override
+ public boolean isEntity() {
+ return true;
+ }
+
+ @Override
+ public void serializeTo(MLogWriter logWriter) throws IOException {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RInternalMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RInternalMNode.java
new file mode 100644
index 0000000000..b9113f6737
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RInternalMNode.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.container.IMNodeContainer;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+import org.apache.iotdb.db.metadata.template.Template;
+
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+
+public class RInternalMNode extends RMNode {
+
+ // schema template
+ protected Template schemaTemplate = null;
+
+ private volatile boolean useTemplate = false;
+
+ /**
+ * Constructor of MNode.
+ *
+ * @param fullPath
+ */
+ public RInternalMNode(String fullPath, RSchemaReadWriteHandler readWriteHandler) {
+ super(fullPath, readWriteHandler);
+ }
+
+ @Override
+ void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException {
+ String innerName =
+ RSchemaUtils.convertPartialPathToInner(
+ childName, childNameMaxLevel, RMNodeType.INTERNAL.getValue());
+ try {
+ readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ /** check whether the MNode has a child with the name */
+ @Override
+ public boolean hasChild(String name) {
+ String childPathName = fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+ IMNode node = getNodeBySpecifiedPath(childPathName);
+ return node != null;
+ }
+
+ /** get the child with the name */
+ @Override
+ public IMNode getChild(String name) {
+ String childPathName = fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+ return getNodeBySpecifiedPath(childPathName);
+ }
+
+ /**
+ * add a child to current mnode
+ *
+ * @param name child's name
+ * @param child child's node
+ * @return
+ */
+ @Override
+ public IMNode addChild(String name, IMNode child) {
+ child.setParent(this);
+ String childName = fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+ int childNameMaxLevel = RSchemaUtils.getLevelByPartialPath(childName);
+ try {
+ if (child instanceof RMNode) {
+ ((RMNode) child).updateChildNode(childName, childNameMaxLevel);
+ } else {
+ logger.error("Rocksdb-based is currently used, but the node type received is not RMNode!");
+ }
+ } catch (MetadataException e) {
+ logger.error(e.getMessage());
+ }
+ return child;
+ }
+
+ /**
+ * Add a child to the current mnode.
+ *
+ * <p>This method will not take the child's name as one of the inputs and will also make this
+ * Mnode be child node's parent. All is to reduce the probability of mistaken by users and be more
+ * convenient for users to use. And the return of this method is used to conveniently construct a
+ * chain of time series for users.
+ *
+ * @param child child's node
+ * @return return the MNode already added
+ */
+ @Override
+ public IMNode addChild(IMNode child) {
+ addChild(child.getName(), child);
+ return child;
+ }
+
+ /** delete a child */
+ @Override
+ public IMNode deleteChild(String name) {
+ String childPathName = fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+ int nodeNameMaxLevel = RSchemaUtils.getLevelByPartialPath(childPathName);
+ for (RMNodeType type : RMNodeType.values()) {
+ byte[] childInnerName =
+ RSchemaUtils.convertPartialPathToInner(childPathName, nodeNameMaxLevel, type.getValue())
+ .getBytes();
+ try {
+ if (readWriteHandler.keyExist(childInnerName)) {
+ readWriteHandler.deleteByKey(childInnerName);
+ return null;
+ }
+ } catch (RocksDBException e) {
+ logger.error(e.getMessage());
+ }
+ }
+ // The return value from this method is used to estimate memory size
+ // When based on Rocksdb, mNodes are not held in memory for long periods
+ // Therefore, the return value here is meaningless
+ return null;
+ }
+
+ /**
+ * replace a child of this mnode
+ *
+ * @param oldChildName measurement name
+ * @param newChildNode new child node
+ */
+ @Override
+ public void replaceChild(String oldChildName, IMNode newChildNode) {
+ if (!oldChildName.equals(newChildNode.getName())) {
+ throw new RuntimeException("New child's name must be the same as old child's name!");
+ }
+ deleteChild(oldChildName);
+ addChild(newChildNode);
+ }
+
+ @Override
+ public IMNodeContainer getChildren() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * get upper template of this node, remember we get nearest template alone this node to root
+ *
+ * @return upper template
+ */
+ @Override
+ public Template getUpperTemplate() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Template getSchemaTemplate() {
+ return schemaTemplate;
+ }
+
+ @Override
+ public void setSchemaTemplate(Template schemaTemplate) {
+ this.schemaTemplate = schemaTemplate;
+ }
+
+ @Override
+ public boolean isUseTemplate() {
+ return useTemplate;
+ }
+
+ @Override
+ public void setUseTemplate(boolean useTemplate) {
+ this.useTemplate = useTemplate;
+ }
+
+ @Override
+ public void serializeTo(MLogWriter logWriter) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNode.java
new file mode 100644
index 0000000000..ee9d9983c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNode.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mnode.container.IMNodeContainer;
+import org.apache.iotdb.db.metadata.mtree.store.disk.cache.CacheEntry;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public abstract class RMNode implements IMNode {
+ /** from root to this node, only be set when used once for InternalMNode */
+ protected String fullPath;
+
+ protected RSchemaReadWriteHandler readWriteHandler;
+
+ protected IMNode parent;
+
+ protected String name;
+
+ protected static final Logger logger = LoggerFactory.getLogger(RMNode.class);
+
+ /** Constructor of MNode. */
+ public RMNode(String fullPath, RSchemaReadWriteHandler readWriteHandler) {
+ this.fullPath = fullPath.intern();
+ this.name = fullPath.substring(fullPath.lastIndexOf(RSchemaConstants.PATH_SEPARATOR) + 1);
+ this.readWriteHandler = readWriteHandler;
+ }
+
+ abstract void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException;
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public IMNode getParent() {
+ if (parent != null) {
+ return parent;
+ }
+ String parentName =
+ fullPath.substring(0, fullPath.lastIndexOf(RSchemaConstants.PATH_SEPARATOR));
+ parent = getNodeBySpecifiedPath(parentName);
+ return parent;
+ }
+
+ protected IMNode getNodeBySpecifiedPath(String keyName) {
+ byte[] value = null;
+ IMNode node;
+ int nodeNameMaxLevel = RSchemaUtils.getLevelByPartialPath(keyName);
+ for (RMNodeType type : RMNodeType.values()) {
+ String parentInnerName =
+ RSchemaUtils.convertPartialPathToInner(keyName, nodeNameMaxLevel, type.getValue());
+ try {
+ value = readWriteHandler.get(null, parentInnerName.getBytes());
+ } catch (RocksDBException e) {
+ logger.error("Failed to get parent node.", e);
+ }
+ if (value != null) {
+ switch (type.getValue()) {
+ case RSchemaConstants.NODE_TYPE_SG:
+ node = new RStorageGroupMNode(keyName, value, readWriteHandler);
+ return node;
+ case RSchemaConstants.NODE_TYPE_INTERNAL:
+ node = new RInternalMNode(keyName, readWriteHandler);
+ return node;
+ case RSchemaConstants.NODE_TYPE_ENTITY:
+ node = new REntityMNode(keyName, value, readWriteHandler);
+ return node;
+ case RSchemaConstants.NODE_TYPE_MEASUREMENT:
+ node = new RMeasurementMNode(keyName, value, readWriteHandler);
+ return node;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void setParent(IMNode parent) {
+ this.parent = parent;
+ }
+
+ /**
+ * get partial path of this node
+ *
+ * @return partial path
+ */
+ @Override
+ public PartialPath getPartialPath() {
+ try {
+ return new PartialPath(fullPath);
+ } catch (IllegalPathException ignored) {
+ return null;
+ }
+ }
+
+ /** get full path */
+ @Override
+ public String getFullPath() {
+ return fullPath;
+ }
+
+ @Override
+ public void setFullPath(String fullPath) {
+ this.fullPath = fullPath;
+ }
+
+ @Override
+ public boolean isUseTemplate() {
+ return false;
+ }
+
+ @Override
+ public boolean isStorageGroup() {
+ return false;
+ }
+
+ @Override
+ public boolean isEntity() {
+ return false;
+ }
+
+ @Override
+ public boolean isMeasurement() {
+ return false;
+ }
+
+ @Override
+ public IStorageGroupMNode getAsStorageGroupMNode() {
+ if (isStorageGroup()) {
+ return (IStorageGroupMNode) this;
+ } else {
+ throw new UnsupportedOperationException("Wrong MNode Type");
+ }
+ }
+
+ @Override
+ public IEntityMNode getAsEntityMNode() {
+ if (isEntity()) {
+ return (IEntityMNode) this;
+ } else {
+ throw new UnsupportedOperationException("Wrong MNode Type");
+ }
+ }
+
+ @Override
+ public IMeasurementMNode getAsMeasurementMNode() {
+ if (isMeasurement()) {
+ return (IMeasurementMNode) this;
+ } else {
+ throw new UnsupportedOperationException("Wrong MNode Type");
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RMNode mNode = (RMNode) o;
+ return Objects.equals(fullPath, mNode.getFullPath());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fullPath);
+ }
+
+ @Override
+ public String toString() {
+ return this.getName();
+ }
+
+ @Override
+ public void moveDataToNewMNode(IMNode newMNode) {
+ throw new UnsupportedOperationException("Temporarily unsupported");
+ }
+
+ @Override
+ public CacheEntry getCacheEntry() {
+ throw new UnsupportedOperationException("Temporarily unsupported");
+ }
+
+ @Override
+ public void setCacheEntry(CacheEntry cacheEntry) {
+ throw new UnsupportedOperationException("Temporarily unsupported");
+ }
+
+ @Override
+ public void setChildren(IMNodeContainer children) {
+ throw new UnsupportedOperationException("Temporarily unsupported");
+ }
+ // end
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeType.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeType.java
new file mode 100644
index 0000000000..9eb1f8ad2c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ALIAS;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_ENTITY;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_INTERNAL;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_MEASUREMENT;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants.NODE_TYPE_SG;
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils.NODE_TYPE_ARRAY;
+
+public enum RMNodeType {
+ INTERNAL(NODE_TYPE_INTERNAL),
+ STORAGE_GROUP(NODE_TYPE_SG),
+ ENTITY(NODE_TYPE_ENTITY),
+ MEASUREMENT(NODE_TYPE_MEASUREMENT),
+ ALISA(NODE_TYPE_ALIAS);
+
+ private char value;
+
+ RMNodeType(char value) {
+ this.value = value;
+ }
+
+ public RMNodeType of(char type) {
+ return NODE_TYPE_ARRAY[type];
+ }
+
+ public char getValue() {
+ return value;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeValueType.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeValueType.java
new file mode 100644
index 0000000000..e333f5e03a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMNodeValueType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+
+public enum RMNodeValueType {
+ TTL(RSchemaConstants.FLAG_SET_TTL, RSchemaConstants.DATA_BLOCK_TYPE_TTL),
+ SCHEMA(RSchemaConstants.FLAG_HAS_SCHEMA, RSchemaConstants.DATA_BLOCK_TYPE_SCHEMA),
+ ALIAS(RSchemaConstants.FLAG_HAS_ALIAS, RSchemaConstants.DATA_BLOCK_TYPE_ALIAS),
+ TAGS(RSchemaConstants.FLAG_HAS_TAGS, RSchemaConstants.DATA_BLOCK_TYPE_TAGS),
+ ATTRIBUTES(RSchemaConstants.FLAG_HAS_ATTRIBUTES, RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES),
+ ORIGIN_KEY(null, RSchemaConstants.DATA_BLOCK_TYPE_ORIGIN_KEY);
+
+ private byte type;
+ private Byte flag;
+
+ RMNodeValueType(Byte flag, byte type) {
+ this.type = type;
+ this.flag = flag;
+ }
+
+ public byte getType() {
+ return type;
+ }
+
+ public Byte getFlag() {
+ return flag;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
copy to server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
index 46341679cc..37a05773c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
@@ -16,74 +16,86 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.metadata.mnode;
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
-import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.container.IMNodeContainer;
-import org.apache.iotdb.db.metadata.mnode.container.MNodeContainers;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
import org.apache.iotdb.db.metadata.template.Template;
-import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.rocksdb.RocksDBException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
-public class MeasurementMNode extends MNode implements IMeasurementMNode {
-
- private static final Logger logger = LoggerFactory.getLogger(MeasurementMNode.class);
+public class RMeasurementMNode extends RMNode implements IMeasurementMNode {
- /** alias name of this measurement */
protected String alias;
- /** tag/attribute's start offset in tag file */
- private long offset = -1;
- /** measurement's Schema for one timeseries represented by current leaf node */
+
private IMeasurementSchema schema;
- /** last value cache */
- private volatile ILastCacheContainer lastCacheContainer = null;
- /** registered trigger */
- private TriggerExecutor triggerExecutor = null;
+
+ private Map<String, String> tags;
+
+ private Map<String, String> attributes;
/**
- * MeasurementMNode factory method. The type of returned MeasurementMNode is according to the
- * schema type. The default type is UnaryMeasurementMNode, which means if schema == null, an
- * UnaryMeasurementMNode will return.
+ * Constructor of MNode.
+ *
+ * @param fullPath
*/
- public static IMeasurementMNode getMeasurementMNode(
- IEntityMNode parent, String measurementName, IMeasurementSchema schema, String alias) {
- return new MeasurementMNode(parent, measurementName, schema, alias);
+ public RMeasurementMNode(String fullPath, RSchemaReadWriteHandler readWriteHandler) {
+ super(fullPath, readWriteHandler);
}
- /** @param alias alias of measurementName */
- MeasurementMNode(IMNode parent, String name, IMeasurementSchema schema, String alias) {
- super(parent, name);
- this.schema = schema;
- this.alias = alias;
+ @Override
+ void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException {
+ String innerName =
+ RSchemaUtils.convertPartialPathToInner(
+ childName, childNameMaxLevel, RMNodeType.MEASUREMENT.getValue());
+ // todo all existing attributes of the measurementNode need to be written
+ try {
+ readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ public RMeasurementMNode(
+ String fullPath, byte[] value, RSchemaReadWriteHandler readWriteHandler) {
+ super(fullPath, readWriteHandler);
+ deserialize(value);
}
@Override
public IEntityMNode getParent() {
- if (parent == null) {
+ if (super.getParent() == null) {
return null;
}
return parent.getAsEntityMNode();
}
- /**
- * get MeasurementPath of this node
- *
- * @return MeasurementPath
- */
@Override
public MeasurementPath getMeasurementPath() {
MeasurementPath result = new MeasurementPath(super.getPartialPath(), schema);
result.setUnderAlignedEntity(getParent().isAligned());
+ if (alias != null && !alias.isEmpty()) {
+ result.setMeasurementAlias(alias);
+ }
return result;
}
@@ -92,25 +104,20 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
return schema;
}
- /**
- * get data type
- *
- * @param measurementId if it's a vector schema, we need sensor name of it
- * @return measurement data type
- */
@Override
public TSDataType getDataType(String measurementId) {
return schema.getType();
}
+ // unsupported exceptions
@Override
public long getOffset() {
- return offset;
+ throw new UnsupportedOperationException();
}
@Override
public void setOffset(long offset) {
- this.offset = offset;
+ throw new UnsupportedOperationException();
}
@Override
@@ -125,51 +132,53 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
@Override
public TriggerExecutor getTriggerExecutor() {
- return triggerExecutor;
+ return null;
}
@Override
public void setTriggerExecutor(TriggerExecutor triggerExecutor) {
- this.triggerExecutor = triggerExecutor;
+ throw new UnsupportedOperationException();
}
@Override
public ILastCacheContainer getLastCacheContainer() {
- if (lastCacheContainer == null) {
- synchronized (this) {
- if (lastCacheContainer == null) {
- lastCacheContainer = new LastCacheContainer();
- }
- }
- }
- return lastCacheContainer;
+ throw new UnsupportedOperationException();
}
@Override
public void setLastCacheContainer(ILastCacheContainer lastCacheContainer) {
- this.lastCacheContainer = lastCacheContainer;
+ throw new UnsupportedOperationException();
}
@Override
public void serializeTo(MLogWriter logWriter) throws IOException {
- logWriter.serializeMeasurementMNode(this);
- }
-
- /** deserialize MeasurementMNode from MeasurementNodePlan */
- public static IMeasurementMNode deserializeFrom(MeasurementMNodePlan plan) {
- IMeasurementMNode node =
- MeasurementMNode.getMeasurementMNode(
- null, plan.getName(), plan.getSchema(), plan.getAlias());
- node.setOffset(plan.getOffset());
- return node;
+ throw new UnsupportedOperationException();
}
- @Override
- public String getFullPath() {
- if (fullPath != null) {
- return fullPath;
+ private void deserialize(byte[] value) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(value);
+ // skip the version flag and node type flag
+ ReadWriteIOUtils.readBytes(byteBuffer, 2);
+
+ while (byteBuffer.hasRemaining()) {
+ byte blockType = ReadWriteIOUtils.readByte(byteBuffer);
+ switch (blockType) {
+ case RSchemaConstants.DATA_BLOCK_TYPE_ALIAS:
+ alias = ReadWriteIOUtils.readString(byteBuffer);
+ break;
+ case RSchemaConstants.DATA_BLOCK_TYPE_SCHEMA:
+ schema = MeasurementSchema.deserializeFrom(byteBuffer);
+ break;
+ case RSchemaConstants.DATA_BLOCK_TYPE_TAGS:
+ tags = ReadWriteIOUtils.readMap(byteBuffer);
+ break;
+ case RSchemaConstants.DATA_BLOCK_TYPE_ATTRIBUTES:
+ attributes = ReadWriteIOUtils.readMap(byteBuffer);
+ break;
+ default:
+ break;
+ }
}
- return concatFullPath();
}
@Override
@@ -179,8 +188,6 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
@Override
public IMNode getChild(String name) {
- MeasurementMNode.logger.warn(
- "current node {} is a MeasurementMNode, can not get child {}", this.name, name);
throw new RuntimeException(
String.format(
"current node %s is a MeasurementMNode, can not get child %s", super.name, name));
@@ -189,7 +196,7 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
@Override
public IMNode addChild(String name, IMNode child) {
// Do nothing
- return null;
+ return child;
}
@Override
@@ -199,6 +206,7 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
@Override
public IMNode deleteChild(String name) {
+ // Do nothing
return null;
}
@@ -207,30 +215,23 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
@Override
public IMNodeContainer getChildren() {
- return MNodeContainers.emptyMNodeContainer();
+ throw new UnsupportedOperationException();
}
@Override
- public void setChildren(IMNodeContainer children) {
- // Do nothing
+ public Template getUpperTemplate() {
+ throw new UnsupportedOperationException();
}
@Override
- public Template getUpperTemplate() {
- return parent.getUpperTemplate();
- }
+ public void setSchemaTemplate(Template schemaTemplate) {}
@Override
public Template getSchemaTemplate() {
- MeasurementMNode.logger.warn(
- "current node {} is a MeasurementMNode, can not get Device Template", name);
throw new RuntimeException(
String.format("current node %s is a MeasurementMNode, can not get Device Template", name));
}
- @Override
- public void setSchemaTemplate(Template schemaTemplate) {}
-
@Override
public void setUseTemplate(boolean useTemplate) {}
@@ -238,4 +239,24 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode {
public boolean isMeasurement() {
return true;
}
+
+ public Map<String, String> getTags() {
+ return tags;
+ }
+
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public void setTags(Map<String, String> tags) {
+ this.tags = tags;
+ }
+
+ public void setAttributes(Map<String, String> attributes) {
+ this.attributes = attributes;
+ }
+
+ public byte[] getRocksDBValue() throws IOException {
+ return RSchemaUtils.buildMeasurementNodeValue(schema, alias, tags, attributes);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RStorageGroupMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RStorageGroupMNode.java
new file mode 100644
index 0000000000..799149c674
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RStorageGroupMNode.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+
+public class RStorageGroupMNode extends RInternalMNode implements IStorageGroupMNode {
+
+ private long dataTTL;
+
+ /**
+ * Constructor of MNode.
+ *
+ * @param fullPath
+ */
+ public RStorageGroupMNode(
+ String fullPath, long dataTTL, RSchemaReadWriteHandler readWriteHandler) {
+ super(fullPath, readWriteHandler);
+ this.dataTTL = dataTTL;
+ }
+
+ public RStorageGroupMNode(
+ String fullPath, byte[] value, RSchemaReadWriteHandler readWriteHandler) {
+ super(fullPath, readWriteHandler);
+ Object ttl = RSchemaUtils.parseNodeValue(value, RMNodeValueType.TTL);
+ if (ttl == null) {
+ ttl = IoTDBDescriptor.getInstance().getConfig().getDefaultTTL();
+ }
+ this.dataTTL = (long) ttl;
+ }
+
+ @Override
+ void updateChildNode(String childName, int childNameMaxLevel) throws MetadataException {
+ String innerName =
+ RSchemaUtils.convertPartialPathToInner(
+ childName, childNameMaxLevel, RMNodeType.STORAGE_GROUP.getValue());
+ long ttl = getDataTTL();
+ try {
+ readWriteHandler.updateNode(
+ innerName.getBytes(), RSchemaUtils.updateTTL(RSchemaConstants.DEFAULT_NODE_VALUE, ttl));
+ } catch (RocksDBException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ @Override
+ public boolean isStorageGroup() {
+ return true;
+ }
+
+ @Override
+ public boolean isEntity() {
+ return false;
+ }
+
+ @Override
+ public boolean isMeasurement() {
+ return false;
+ }
+
+ @Override
+ public void serializeTo(MLogWriter logWriter) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getDataTTL() {
+ return dataTTL;
+ }
+
+ @Override
+ public void setDataTTL(long dataTTL) {
+ this.dataTTL = dataTTL;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index 403b714d35..dd7554a3cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.service;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.utils.MemUtils;
import org.slf4j.Logger;
@@ -29,6 +31,10 @@ public class IoTDBShutdownHook extends Thread {
@Override
public void run() {
+ // close rocksdb if possible to avoid lose data
+ for (ISchemaRegion schemaRegion : SchemaEngine.getInstance().getAllSchemaRegions()) {
+ schemaRegion.clear();
+ }
if (logger.isInfoEnabled()) {
logger.info(
"IoTDB exits. Jvm memory usage: {}",
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBBenchmark.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBBenchmark.java
new file mode 100644
index 0000000000..54441c30cf
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBBenchmark.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@Ignore
+public class MRocksDBBenchmark {
+ protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private final RSchemaRegion rocksDBManager;
+
+ private static final Logger logger = LoggerFactory.getLogger(MRocksDBBenchmark.class);
+
+ public MRocksDBBenchmark(RSchemaRegion rocksDBManager) {
+ this.rocksDBManager = rocksDBManager;
+ }
+
+ public List<RocksDBBenchmarkTask.BenchmarkResult> benchmarkResults = new ArrayList<>();
+
+ public void testTimeSeriesCreation(List<List<CreateTimeSeriesPlan>> timeSeriesSet)
+ throws IOException {
+ RocksDBBenchmarkTask<List<CreateTimeSeriesPlan>> task =
+ new RocksDBBenchmarkTask<>(timeSeriesSet, RocksDBTestUtils.WRITE_CLIENT_NUM, 100);
+ RocksDBBenchmarkTask.BenchmarkResult result =
+ task.runBatchWork(
+ createTimeSeriesPlans -> {
+ RocksDBBenchmarkTask.TaskResult taskResult = new RocksDBBenchmarkTask.TaskResult();
+ createTimeSeriesPlans.forEach(
+ ts -> {
+ try {
+ rocksDBManager.createTimeseries(
+ ts.getPath(),
+ ts.getDataType(),
+ ts.getEncoding(),
+ ts.getCompressor(),
+ ts.getProps(),
+ ts.getAlias());
+ taskResult.success++;
+ } catch (Exception e) {
+ e.printStackTrace();
+ taskResult.failure++;
+ }
+ });
+ return taskResult;
+ },
+ "CreateTimeSeries");
+ benchmarkResults.add(result);
+ }
+
+ public void testMeasurementNodeQuery(Collection<String> queryTsSet) {
+ RocksDBBenchmarkTask<String> task =
+ new RocksDBBenchmarkTask<>(queryTsSet, RocksDBTestUtils.WRITE_CLIENT_NUM, 10000);
+ RocksDBBenchmarkTask.BenchmarkResult result =
+ task.runWork(
+ s -> {
+ try {
+ IMNode node = rocksDBManager.getMeasurementMNode(new PartialPath(s));
+ if (node != null) {
+ logger.warn(node.toString());
+ }
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ },
+ "MeasurementNodeQuery");
+ benchmarkResults.add(result);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java
new file mode 100644
index 0000000000..ed8364cb53
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/MRocksDBUnitTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mnode.InternalMNode;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Ignore
+public class MRocksDBUnitTest {
+
+ private RSchemaRegion rSchemaRegion;
+
+ @Before
+ public void setUp() throws MetadataException {
+
+ PartialPath storageGroup = new PartialPath("root.test.sg");
+ SchemaRegionId schemaRegionId = new SchemaRegionId((int) (Math.random() * 10));
+ MNode root = new InternalMNode(null, IoTDBConstant.PATH_ROOT);
+ IStorageGroupMNode storageGroupMNode = new StorageGroupMNode(root, "test", -1);
+ rSchemaRegion = new RSchemaRegion(storageGroup, schemaRegionId, storageGroupMNode);
+ }
+
+ @Test
+ public void testCreateTimeSeries() throws MetadataException, IOException {
+ PartialPath path = new PartialPath("root.test.sg.dd.m1");
+ rSchemaRegion.createTimeseries(
+ path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+
+ IMeasurementMNode m1 = rSchemaRegion.getMeasurementMNode(path);
+ Assert.assertNull(m1.getAlias());
+ Assert.assertEquals(m1.getSchema().getCompressor(), CompressionType.UNCOMPRESSED);
+ Assert.assertEquals(m1.getSchema().getEncodingType(), TSEncoding.PLAIN);
+ Assert.assertEquals(m1.getSchema().getType(), TSDataType.TEXT);
+ Assert.assertNull(m1.getSchema().getProps());
+
+ PartialPath path2 = new PartialPath("root.tt.sg.dd.m2");
+ rSchemaRegion.createTimeseries(
+ path2, TSDataType.DOUBLE, TSEncoding.PLAIN, CompressionType.GZIP, null, "ma");
+
+ rSchemaRegion.printScanAllKeys();
+
+ IMeasurementMNode m2 = rSchemaRegion.getMeasurementMNode(path2);
+ Assert.assertEquals(m2.getAlias(), "ma");
+ Assert.assertEquals(m2.getSchema().getCompressor(), CompressionType.GZIP);
+ Assert.assertEquals(m2.getSchema().getEncodingType(), TSEncoding.PLAIN);
+ Assert.assertEquals(m2.getSchema().getType(), TSDataType.DOUBLE);
+ Assert.assertNull(m2.getSchema().getProps());
+ }
+
+ @Test
+ public void testCreateAlignedTimeSeries() throws MetadataException, IOException {
+ PartialPath prefixPath = new PartialPath("root.tt.sg.dd");
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<TSEncoding> encodings = new ArrayList<>();
+ List<CompressionType> compressions = new ArrayList<>();
+
+ for (int i = 0; i < 6; i++) {
+ measurements.add("mm" + i);
+ dataTypes.add(TSDataType.INT32);
+ encodings.add(TSEncoding.PLAIN);
+ compressions.add(CompressionType.UNCOMPRESSED);
+ }
+
+ rSchemaRegion.createAlignedTimeSeries(
+ new CreateAlignedTimeSeriesPlan(
+ prefixPath, measurements, dataTypes, encodings, compressions, null, null, null));
+
+ try {
+ PartialPath path = new PartialPath("root.tt.sg.dd.mn");
+ rSchemaRegion.createTimeseries(
+ path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+ assert false;
+ } catch (MetadataException e) {
+ assert true;
+ }
+ rSchemaRegion.printScanAllKeys();
+ }
+
+ @Test
+ public void testNodeTypeCount() throws MetadataException, IOException {
+ List<PartialPath> storageGroups = new ArrayList<>();
+ storageGroups.add(new PartialPath("root.sg1"));
+ storageGroups.add(new PartialPath("root.inner.sg1"));
+ storageGroups.add(new PartialPath("root.inner.sg2"));
+ storageGroups.add(new PartialPath("root.inner1.inner2.inner3.sg"));
+ storageGroups.add(new PartialPath("root.inner1.inner2.sg"));
+
+ PartialPath path = new PartialPath("root.tt.sg.dd.m1");
+ rSchemaRegion.createTimeseries(
+ path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+
+ PartialPath path2 = new PartialPath("root.tt.sg.ddd.m2");
+ rSchemaRegion.createTimeseries(
+ path2, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, "ma");
+
+ rSchemaRegion.printScanAllKeys();
+
+ // test all timeseries number
+ Assert.assertEquals(
+ 1, rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.tt.sg.dd.m1"), false));
+ Assert.assertEquals(2, rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false));
+
+ // test device number
+ Assert.assertEquals(
+ 0, rSchemaRegion.getDevicesNum(new PartialPath("root.inner1.inner2"), false));
+ Assert.assertEquals(
+ 0, rSchemaRegion.getDevicesNum(new PartialPath("root.inner1.inner2.**"), false));
+ Assert.assertEquals(2, rSchemaRegion.getDevicesNum(new PartialPath("root.tt.sg.**"), false));
+ Assert.assertEquals(1, rSchemaRegion.getDevicesNum(new PartialPath("root.tt.sg.dd"), false));
+
+ // todo wildcard
+
+ // test nodes count in given level
+ Assert.assertEquals(
+ 2, rSchemaRegion.getNodesCountInGivenLevel(new PartialPath("root.tt.sg"), 3, false));
+ }
+
+ @Test
+ public void testPathPatternMatch() throws MetadataException, IOException {
+ List<PartialPath> timeseries = new ArrayList<>();
+ timeseries.add(new PartialPath("root.sg.d1.m1"));
+ timeseries.add(new PartialPath("root.sg.d1.m2"));
+ timeseries.add(new PartialPath("root.sg.d2.m1"));
+ timeseries.add(new PartialPath("root.sg.d2.m2"));
+ timeseries.add(new PartialPath("root.sg1.d1.m1"));
+ timeseries.add(new PartialPath("root.sg1.d1.m2"));
+ timeseries.add(new PartialPath("root.sg1.d2.m1"));
+ timeseries.add(new PartialPath("root.sg1.d2.m2"));
+
+ for (PartialPath path : timeseries) {
+ rSchemaRegion.createTimeseries(
+ path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+ }
+
+ // mRocksDBManager.traverseByPatternPath(new PartialPath("root.sg.d1.*"));
+ }
+
+ @Test
+ public void testDeleteTimeseries() throws MetadataException, IOException {
+ List<PartialPath> timeseries = new ArrayList<>();
+ timeseries.add(new PartialPath("root.sg.d1.m1"));
+ timeseries.add(new PartialPath("root.sg.d1.m2"));
+ timeseries.add(new PartialPath("root.sg.d2.m1"));
+ timeseries.add(new PartialPath("root.sg.d2.m2"));
+ timeseries.add(new PartialPath("root.sg.d3.m1"));
+ timeseries.add(new PartialPath("root.sg.d3.m2"));
+ timeseries.add(new PartialPath("root.sg1.d1.m1"));
+ timeseries.add(new PartialPath("root.sg1.d1.m2"));
+ timeseries.add(new PartialPath("root.sg1.d2.m1"));
+ timeseries.add(new PartialPath("root.sg1.d2.m2"));
+
+ for (PartialPath path : timeseries) {
+ rSchemaRegion.createTimeseries(
+ path, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, null);
+ }
+
+ Assert.assertEquals(
+ rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false), timeseries.size());
+
+ int count = timeseries.size();
+ rSchemaRegion.deleteTimeseries(new PartialPath("root.sg.d1.*"));
+ Assert.assertEquals(
+ rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false), count - 2);
+
+ count = count - 2;
+ rSchemaRegion.deleteTimeseries(new PartialPath("root.sg1.**"));
+ Assert.assertEquals(
+ rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false), count - 4);
+
+ count = count - 4;
+ rSchemaRegion.deleteTimeseries(new PartialPath("root.sg.*.m1"));
+ Assert.assertEquals(
+ rSchemaRegion.getAllTimeseriesCount(new PartialPath("root.**"), false), count - 2);
+
+ rSchemaRegion.printScanAllKeys();
+ }
+
+ @Test
+ public void testUpsert() throws MetadataException, IOException {
+ PartialPath path2 = new PartialPath("root.tt.sg.dd.m2");
+ rSchemaRegion.createTimeseries(
+ path2, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null, "ma");
+
+ IMeasurementMNode m1 = rSchemaRegion.getMeasurementMNode(new PartialPath("root.tt.sg.dd.m2"));
+ Assert.assertEquals(m1.getAlias(), "ma");
+
+ rSchemaRegion.changeAlias(new PartialPath("root.tt.sg.dd.m2"), "test");
+
+ IMeasurementMNode m2 = rSchemaRegion.getMeasurementMNode(new PartialPath("root.tt.sg.dd.m2"));
+ Assert.assertEquals(m2.getAlias(), "test");
+
+ IMeasurementMNode m3 = rSchemaRegion.getMeasurementMNode(new PartialPath("root.tt.sg.dd.test"));
+ Assert.assertEquals(m3.getAlias(), "test");
+ }
+
+ @After
+ public void clean() throws MetadataException {
+ rSchemaRegion.deleteSchemaRegion();
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandlerTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandlerTest.java
new file mode 100644
index 0000000000..b0663e45eb
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaReadWriteHandlerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RMNodeType;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.rocksdb.RocksDBException;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler.ROCKSDB_PATH;
+
+@Ignore
+public class RSchemaReadWriteHandlerTest {
+
+ private RSchemaReadWriteHandler readWriteHandler;
+
+ @Before
+ public void setUp() throws MetadataException, RocksDBException {
+ File file = new File(ROCKSDB_PATH);
+ if (!file.exists()) {
+ file.mkdirs();
+ }
+ readWriteHandler = new RSchemaReadWriteHandler();
+ }
+
+ @Test
+ public void testKeyExistByTypes() throws IllegalPathException, RocksDBException {
+ List<PartialPath> timeseries = new ArrayList<>();
+ timeseries.add(new PartialPath("root.sg.d1.m1"));
+ timeseries.add(new PartialPath("root.sg.d1.m2"));
+ timeseries.add(new PartialPath("root.sg.d2.m1"));
+ timeseries.add(new PartialPath("root.sg.d2.m2"));
+ timeseries.add(new PartialPath("root.sg1.d1.m1"));
+ timeseries.add(new PartialPath("root.sg1.d1.m2"));
+ timeseries.add(new PartialPath("root.sg1.d2.m1"));
+ timeseries.add(new PartialPath("root.sg1.d2.m2"));
+
+ for (PartialPath path : timeseries) {
+ String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ readWriteHandler.createNode(levelPath, RMNodeType.MEASUREMENT, path.getFullPath().getBytes());
+ }
+
+ for (PartialPath path : timeseries) {
+ String levelPath = RSchemaUtils.getLevelPath(path.getNodes(), path.getNodeLength() - 1);
+ CheckKeyResult result = readWriteHandler.keyExistByAllTypes(levelPath);
+ Assert.assertTrue(result.existAnyKey());
+ Assert.assertNotNull(result.getValue());
+ Assert.assertEquals(path.getFullPath(), new String(result.getValue()));
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegionAdvancedTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegionAdvancedTest.java
new file mode 100644
index 0000000000..f388f8099a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegionAdvancedTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode.RStorageGroupMNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Collections;
+
+@Ignore
+public class RSchemaRegionAdvancedTest {
+
+ private static RSchemaRegion schemaRegion = null;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.envSetUp();
+ PartialPath storageGroupPath = new PartialPath("root.vehicle.s0");
+ SchemaRegionId schemaRegionId = new SchemaRegionId(1);
+ RSchemaReadWriteHandler readWriteHandler = new RSchemaReadWriteHandler();
+ RStorageGroupMNode storageGroupMNode =
+ new RStorageGroupMNode(storageGroupPath.getFullPath(), -1, readWriteHandler);
+ schemaRegion = new RSchemaRegion(storageGroupPath, schemaRegionId, storageGroupMNode);
+
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s0.d0.s0"),
+ TSDataType.INT32,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s0.d0.s1"),
+ TSDataType.INT64,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s0.d0.s2"),
+ TSDataType.FLOAT,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s0.d0.s3"),
+ TSDataType.DOUBLE,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s0.d0.s4"),
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s0.d0.s5"),
+ TSDataType.TEXT,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s1.d1.s0"),
+ TSDataType.INT32,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s1.d1.s1"),
+ TSDataType.INT64,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s1.d1.s2"),
+ TSDataType.FLOAT,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s1.d1.s3"),
+ TSDataType.DOUBLE,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s1.d1.s4"),
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s1.d1.s5"),
+ TSDataType.TEXT,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testCache() throws MetadataException {
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s2.d2.s0"),
+ TSDataType.DOUBLE,
+ TSEncoding.RLE,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s2.d2.s1"),
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s2.d2.s2.g0"),
+ TSDataType.TEXT,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ schemaRegion.createTimeseries(
+ new PartialPath("root.vehicle.s2.d2.s3"),
+ TSDataType.TEXT,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+
+ IMNode node = schemaRegion.getDeviceNode(new PartialPath("root.vehicle.s0.d0"));
+ Assert.assertEquals(
+ TSDataType.INT32, node.getChild("s0").getAsMeasurementMNode().getSchema().getType());
+
+ Assert.assertFalse(schemaRegion.isPathExist(new PartialPath("root.vehicle.d100")));
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkEngine.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkEngine.java
new file mode 100644
index 0000000000..55d0fb355b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkEngine.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.logfile.MLogReader;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.utils.FileUtils;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler.ROCKSDB_PATH;
+
+public class RocksDBBenchmarkEngine {
+ private static final Logger logger = LoggerFactory.getLogger(RocksDBBenchmarkEngine.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ private static final int BIN_CAPACITY = 100 * 1000;
+
+ private File logFile;
+ public static List<List<CreateTimeSeriesPlan>> timeSeriesSet = new ArrayList<>();
+ public static Set<String> measurementPathSet = new HashSet<>();
+ public static Set<String> innerPathSet = new HashSet<>();
+ public static List<SetStorageGroupPlan> storageGroups = new ArrayList<>();
+
+ public RocksDBBenchmarkEngine() {
+ String schemaDir = config.getSchemaDir();
+ String logFilePath = schemaDir + File.separator + MetadataConstant.METADATA_LOG;
+ logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+ System.out.println(logFile.getAbsolutePath());
+ }
+
+ public static void main(String[] args) {
+ RocksDBBenchmarkEngine engine = new RocksDBBenchmarkEngine();
+ engine.startTest();
+ }
+
+ @Test
+ public void startTest() {
+ RocksDBTestUtils.printMemInfo("Benchmark rocksdb start");
+ try {
+ prepareBenchmark();
+ RocksDBTestUtils.printBenchmarkBaseline(
+ storageGroups, timeSeriesSet, measurementPathSet, innerPathSet);
+ /** rocksdb benchmark * */
+ RSchemaRegion rocksDBManager = new RSchemaRegion();
+ MRocksDBBenchmark mRocksDBBenchmark = new MRocksDBBenchmark(rocksDBManager);
+ mRocksDBBenchmark.testTimeSeriesCreation(timeSeriesSet);
+ mRocksDBBenchmark.testMeasurementNodeQuery(measurementPathSet);
+ RocksDBTestUtils.printReport(mRocksDBBenchmark.benchmarkResults, "rocksDB");
+ RocksDBTestUtils.printMemInfo("Benchmark finished");
+ } catch (IOException | MetadataException e) {
+ logger.error("Error happened when run benchmark", e);
+ }
+ }
+
+ public void prepareBenchmark() throws IOException {
+ long time = System.currentTimeMillis();
+ if (!logFile.exists()) {
+ throw new FileNotFoundException("we need a mlog.bin to init the benchmark test");
+ }
+ try (MLogReader mLogReader =
+ new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG)) {
+ parseForTestSet(mLogReader);
+ System.out.println("spend " + (System.currentTimeMillis() - time) + "ms to prepare dataset");
+ } catch (Exception e) {
+ throw new IOException("Failed to parser mlog.bin for err:" + e);
+ }
+ }
+
+ private static void parseForTestSet(MLogReader mLogReader) throws IllegalPathException {
+ List<CreateTimeSeriesPlan> currentList = null;
+ SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan();
+ setStorageGroupPlan.setPath(new PartialPath("root.iotcloud"));
+ storageGroups.add(setStorageGroupPlan);
+ while (mLogReader.hasNext()) {
+ PhysicalPlan plan = null;
+ try {
+ plan = mLogReader.next();
+ if (plan == null) {
+ continue;
+ }
+ switch (plan.getOperatorType()) {
+ case CREATE_TIMESERIES:
+ CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
+ PartialPath path = createTimeSeriesPlan.getPath();
+ if (currentList == null) {
+ currentList = new ArrayList<>(BIN_CAPACITY);
+ timeSeriesSet.add(currentList);
+ }
+ measurementPathSet.add(path.getFullPath());
+
+ innerPathSet.add(path.getDevice());
+ String[] subNodes = ArrayUtils.subarray(path.getNodes(), 0, path.getNodes().length - 2);
+ innerPathSet.add(String.join(".", subNodes));
+
+ currentList.add(createTimeSeriesPlan);
+ if (currentList.size() >= BIN_CAPACITY) {
+ currentList = null;
+ }
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ logger.error(
+ "Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
+ }
+ }
+ }
+
+ private void resetEnv() throws IOException {
+ File rockdDbFile = new File(ROCKSDB_PATH);
+ if (rockdDbFile.exists() && rockdDbFile.isDirectory()) {
+ FileUtils.deleteDirectory(rockdDbFile);
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkTask.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkTask.java
new file mode 100644
index 0000000000..f57008975a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBBenchmarkTask.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.commons.lang3.time.StopWatch;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class RocksDBBenchmarkTask<T> {
+ private Collection<T> dataSet;
+ private int workCount;
+ private int timeoutInMin;
+
+ RocksDBBenchmarkTask(Collection<T> dataSet, int workCount, int timeoutInMin) {
+ this.dataSet = dataSet;
+ this.workCount = workCount;
+ this.timeoutInMin = timeoutInMin;
+ }
+
+ public BenchmarkResult runBatchWork(Function<T, TaskResult> work, String name) {
+ ExecutorService executor = Executors.newFixedThreadPool(workCount);
+ AtomicInteger sucCounter = new AtomicInteger(0);
+ AtomicInteger failCounter = new AtomicInteger(0);
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ for (T input : dataSet) {
+ executor.submit(
+ () -> {
+ TaskResult result = work.apply(input);
+ sucCounter.addAndGet(result.success);
+ failCounter.addAndGet(result.failure);
+ });
+ }
+ try {
+ executor.shutdown();
+ executor.awaitTermination(timeoutInMin, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ stopWatch.stop();
+ return new BenchmarkResult(name, sucCounter.get(), failCounter.get(), stopWatch.getTime());
+ }
+
+ public BenchmarkResult runWork(Function<T, Boolean> work, String name) {
+ ExecutorService executor = Executors.newFixedThreadPool(workCount);
+ AtomicInteger sucCounter = new AtomicInteger(0);
+ AtomicInteger failCounter = new AtomicInteger(0);
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ for (T input : dataSet) {
+ executor.submit(
+ () -> {
+ if (work.apply(input)) {
+ sucCounter.incrementAndGet();
+ } else {
+ failCounter.incrementAndGet();
+ }
+ });
+ }
+ try {
+ executor.shutdown();
+ executor.awaitTermination(timeoutInMin, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ stopWatch.stop();
+ return new BenchmarkResult(name, sucCounter.get(), failCounter.get(), stopWatch.getTime());
+ }
+
+ public static class TaskResult {
+ public int success = 0;
+ public int failure = 0;
+ }
+
+ public static class BenchmarkResult {
+ public String name;
+ public long successCount;
+ public long failCount;
+ public long costInMs;
+
+ BenchmarkResult(String name, long successCount, long failCount, long cost) {
+ this.name = name;
+ this.successCount = successCount;
+ this.failCount = failCount;
+ this.costInMs = cost;
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBTestUtils.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBTestUtils.java
new file mode 100644
index 0000000000..85fd8da08f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RocksDBTestUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+
+import java.util.Collection;
+import java.util.List;
+
+public class RocksDBTestUtils {
+
+ public static int WRITE_CLIENT_NUM = 200;
+ public static int READ_CLIENT_NUM = 50;
+
+ public static void printBenchmarkBaseline(
+ List storageGroups,
+ List<List<CreateTimeSeriesPlan>> timeSeriesSet,
+ Collection queryTsSet,
+ Collection innerPathSet) {
+ System.out.println(
+ "#################################Benchmark configuration#################################");
+ System.out.println("-----------Configuration---------");
+ System.out.println("Write client num: " + WRITE_CLIENT_NUM);
+ System.out.println("Query client num: " + READ_CLIENT_NUM);
+ System.out.println("-----------Benchmark Data Set Size---------");
+ System.out.println("StorageGroup: " + storageGroups.size());
+ int count = 0;
+ for (List l : timeSeriesSet) {
+ count += l.size();
+ }
+ System.out.println("TimeSeries: " + count);
+ System.out.println("MeasurementNodeQuery: " + queryTsSet.size());
+ System.out.println("ChildrenNodeQuery: " + innerPathSet.size());
+ }
+
+ public static void printMemInfo(String stageInfo) {
+ System.out.printf(
+ "[%s] Memory used: %d%n",
+ stageInfo, Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory());
+ }
+
+ public static void printReport(
+ List<RocksDBBenchmarkTask.BenchmarkResult> results, String category) {
+ System.out.println(
+ String.format(
+ "\n\n#################################%s benchmark statistics#################################",
+ category));
+ System.out.println(String.format("%25s %15s %10s %15s", "", "success", "fail", "cost-in-ms"));
+ for (RocksDBBenchmarkTask.BenchmarkResult result : results) {
+ System.out.println(
+ String.format(
+ "%25s %15d %10d %15d",
+ result.name, result.successCount, result.failCount, result.costInMs));
+ }
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 76feee6bcf..ef8ce93535 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -113,14 +113,13 @@ public class ReadWriteIOUtils {
return readBool(buffer);
}
- public static int write(Map<String, String> map, DataOutputStream stream) throws IOException {
+ public static int write(Map<String, String> map, OutputStream stream) throws IOException {
if (map == null) {
return write(NO_BYTE_TO_READ, stream);
}
int length = 0;
- stream.writeInt(map.size());
- length += 4;
+ length += write(map.size(), stream);
for (Entry<String, String> entry : map.entrySet()) {
length += write(entry.getKey(), stream);
length += write(entry.getValue(), stream);
@@ -128,8 +127,7 @@ public class ReadWriteIOUtils {
return length;
}
- public static void write(List<Map<String, String>> maps, DataOutputStream stream)
- throws IOException {
+ public static void write(List<Map<String, String>> maps, OutputStream stream) throws IOException {
for (Map<String, String> map : maps) {
write(map, stream);
}