You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/27 01:04:44 UTC
[skywalking] branch master updated: Support Apache IoTDB as a storage option (#7766)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 842b5d9 Support Apache IoTDB as a storage option (#7766)
842b5d9 is described below
commit 842b5d927e9c196e14a4dc3f73812d2c4adfebf4
Author: 刘威 <51...@users.noreply.github.com>
AuthorDate: Sat Nov 27 09:04:23 2021 +0800
Support Apache IoTDB as a storage option (#7766)
Support Apache IoTDB as a storage option, mostly refer to previous InfluxDB storage option.
* The Design of Apache IoTDB Storage Option, https://skywalking.apache.org/blog/2021-11-23-design-of-iotdb-storage-option/
---
.github/workflows/e2e.alarm.yaml | 1 +
.github/workflows/e2e.cluster.yaml | 1 +
.github/workflows/e2e.event.yaml | 1 +
.github/workflows/e2e.log.yaml | 1 +
.github/workflows/e2e.profiling.yaml | 1 +
.github/workflows/e2e.storages.yaml | 1 +
.github/workflows/e2e.ttl.yaml | 1 +
CHANGES.md | 1 +
dist-material/release-docs/LICENSE | 9 +-
docs/en/setup/backend/backend-storage.md | 19 ++
oap-server-bom/pom.xml | 16 +-
oap-server/server-starter/pom.xml | 5 +
.../src/main/resources/application.yml | 8 +
oap-server/server-storage-plugin/pom.xml | 1 +
.../storage-iotdb-plugin/pom.xml | 59 ++++
.../server/storage/plugin/iotdb/IoTDBClient.java | 338 +++++++++++++++++++++
.../server/storage/plugin/iotdb/IoTDBIndexes.java | 34 +++
.../storage/plugin/iotdb/IoTDBStorageConfig.java | 36 +++
.../storage/plugin/iotdb/IoTDBStorageProvider.java | 158 ++++++++++
.../storage/plugin/iotdb/IoTDBTableInstaller.java | 45 +++
.../storage/plugin/iotdb/IoTDBTableMetaInfo.java | 114 +++++++
.../storage/plugin/iotdb/base/IoTDBBatchDAO.java | 64 ++++
.../plugin/iotdb/base/IoTDBHistoryDeleteDAO.java | 44 +++
.../plugin/iotdb/base/IoTDBInsertRequest.java | 105 +++++++
.../plugin/iotdb/base/IoTDBManagementDAO.java | 42 +++
.../storage/plugin/iotdb/base/IoTDBMetricsDAO.java | 69 +++++
.../plugin/iotdb/base/IoTDBNoneStreamDAO.java | 41 +++
.../storage/plugin/iotdb/base/IoTDBRecordDAO.java | 82 +++++
.../storage/plugin/iotdb/base/IoTDBStorageDAO.java | 58 ++++
.../iotdb/cache/IoTDBNetworkAddressAliasDAO.java | 57 ++++
.../management/IoTDBUITemplateManagementDAO.java | 113 +++++++
.../iotdb/profile/IoTDBProfileTaskLogQueryDAO.java | 69 +++++
.../iotdb/profile/IoTDBProfileTaskQueryDAO.java | 112 +++++++
.../IoTDBProfileThreadSnapshotQueryDAO.java | 172 +++++++++++
.../iotdb/query/IoTDBAggregationQueryDAO.java | 132 ++++++++
.../plugin/iotdb/query/IoTDBAlarmQueryDAO.java | 104 +++++++
.../iotdb/query/IoTDBBrowserLogQueryDAO.java | 130 ++++++++
.../plugin/iotdb/query/IoTDBEventQueryDAO.java | 197 ++++++++++++
.../plugin/iotdb/query/IoTDBLogQueryDAO.java | 144 +++++++++
.../plugin/iotdb/query/IoTDBMetadataQueryDAO.java | 226 ++++++++++++++
.../plugin/iotdb/query/IoTDBMetricsQueryDAO.java | 256 ++++++++++++++++
.../iotdb/query/IoTDBTopNRecordsQueryDAO.java | 120 ++++++++
.../plugin/iotdb/query/IoTDBTopologyQueryDAO.java | 260 ++++++++++++++++
.../plugin/iotdb/query/IoTDBTraceQueryDAO.java | 161 ++++++++++
...alking.oap.server.library.module.ModuleProvider | 19 ++
test/e2e-v2/cases/alarm/iotdb/docker-compose.yml | 57 ++++
test/e2e-v2/cases/alarm/iotdb/e2e.yaml | 45 +++
.../cases/cluster/zk/iotdb/docker-compose.yml | 111 +++++++
test/e2e-v2/cases/cluster/zk/iotdb/e2e.yaml | 48 +++
test/e2e-v2/cases/event/iotdb/docker-compose.yml | 45 +++
test/e2e-v2/cases/event/iotdb/e2e.yaml | 35 +++
test/e2e-v2/cases/log/iotdb/docker-compose.yml | 59 ++++
test/e2e-v2/cases/log/iotdb/e2e.yaml | 48 +++
test/e2e-v2/cases/profile/iotdb/docker-compose.yml | 54 ++++
test/e2e-v2/cases/profile/iotdb/e2e.yaml | 35 +++
test/e2e-v2/cases/storage/iotdb/docker-compose.yml | 70 +++++
test/e2e-v2/cases/storage/iotdb/e2e.yaml | 129 ++++++++
test/e2e-v2/cases/ttl/iotdb/docker-compose.yml | 66 ++++
test/e2e-v2/cases/ttl/iotdb/e2e.yaml | 35 +++
test/e2e-v2/script/docker-compose/base-compose.yml | 1 +
.../known-oap-backend-dependencies.txt | 7 +
61 files changed, 4469 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/e2e.alarm.yaml b/.github/workflows/e2e.alarm.yaml
index d316052..4b8ce66 100644
--- a/.github/workflows/e2e.alarm.yaml
+++ b/.github/workflows/e2e.alarm.yaml
@@ -44,6 +44,7 @@ jobs:
- alarm/mysql/e2e.yaml
- alarm/influxdb/e2e.yaml
- alarm/postgres/e2e.yaml
+ - alarm/iotdb/e2e.yaml
steps:
- uses: actions/checkout@v2
with:
diff --git a/.github/workflows/e2e.cluster.yaml b/.github/workflows/e2e.cluster.yaml
index db4bf10..bbbafcb 100644
--- a/.github/workflows/e2e.cluster.yaml
+++ b/.github/workflows/e2e.cluster.yaml
@@ -40,6 +40,7 @@ jobs:
- cluster/zk/es/e2e.yaml
- cluster/zk/mysql/e2e.yaml
- cluster/zk/influxdb/e2e.yaml
+ - cluster/zk/iotdb/e2e.yaml
steps:
- uses: actions/checkout@v2
with:
diff --git a/.github/workflows/e2e.event.yaml b/.github/workflows/e2e.event.yaml
index 3f8868d..98f1e06 100644
--- a/.github/workflows/e2e.event.yaml
+++ b/.github/workflows/e2e.event.yaml
@@ -43,6 +43,7 @@ jobs:
- event/es/e2e.yaml
- event/mysql/e2e.yaml
- event/influxdb/e2e.yaml
+ - event/iotdb/e2e.yaml
steps:
- uses: actions/checkout@v2
with:
diff --git a/.github/workflows/e2e.log.yaml b/.github/workflows/e2e.log.yaml
index bf7d7e1..b42896d 100644
--- a/.github/workflows/e2e.log.yaml
+++ b/.github/workflows/e2e.log.yaml
@@ -44,6 +44,7 @@ jobs:
- log/mysql/e2e.yaml
- log/influxdb/e2e.yaml
- log/postgres/e2e.yaml
+ - log/iotdb/e2e.yaml
include:
- es-version: 6.3.2
config-file: log/es/e2e.yaml
diff --git a/.github/workflows/e2e.profiling.yaml b/.github/workflows/e2e.profiling.yaml
index 4adc5bf..bc5b9ac 100644
--- a/.github/workflows/e2e.profiling.yaml
+++ b/.github/workflows/e2e.profiling.yaml
@@ -44,6 +44,7 @@ jobs:
- profile/es/e2e.yaml
- profile/mysql/e2e.yaml
- profile/influxdb/e2e.yaml
+ - profile/iotdb/e2e.yaml
steps:
- uses: actions/checkout@v2
with:
diff --git a/.github/workflows/e2e.storages.yaml b/.github/workflows/e2e.storages.yaml
index d03a3a5..7b3d018 100644
--- a/.github/workflows/e2e.storages.yaml
+++ b/.github/workflows/e2e.storages.yaml
@@ -42,6 +42,7 @@ jobs:
- storage/tidb/e2e.yaml
- storage/influxdb/e2e.yaml
- storage/postgres/e2e.yaml
+ - storage/iotdb/e2e.yaml
include:
- opensearch-version: 1.1.0
config-file: storage/opensearch/e2e.yaml
diff --git a/.github/workflows/e2e.ttl.yaml b/.github/workflows/e2e.ttl.yaml
index b322f42..4457fb0 100644
--- a/.github/workflows/e2e.ttl.yaml
+++ b/.github/workflows/e2e.ttl.yaml
@@ -43,6 +43,7 @@ jobs:
- ttl/tidb/e2e.yaml
- ttl/influxdb/e2e.yaml
- ttl/postgresql/e2e.yaml
+ - ttl/iotdb/e2e.yaml
include:
- es-version: 6.3.2
config-file: ttl/es/e2e.yaml
diff --git a/CHANGES.md b/CHANGES.md
index 97f52f7..6a61289 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@ Release Notes.
* Add OpenSearch 1.2.0 to test and verify it works.
* Upgrade grpc-java to 1.42.1 and protoc to 3.19.1 to allow using native Mac osx-aarch_64 artifacts.
* Fix TopologyQuery.loadEndpointRelation bug.
+* Support using IoTDB as a new storage option.
#### UI
diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE
index d698da0..dbd92af 100755
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -327,6 +327,11 @@ The text of each license is the standard Apache 2.0 license.
Armeria 1.12.0, http://github.com/line/armeria, Apache 2.0
Brotli4j 1.6.0, https://github.com/hyperxpro/Brotli4j, Apache 2.0
micrometer 1.7.4, https://github.com/micrometer-metrics/micrometer, Apache 2.0
+ iotdb-session 0.12.3: https://github.com/apache/iotdb, Apache 2.0
+ iotdb-thrift 0.12.3: https://github.com/apache/iotdb, Apache 2.0
+ service-rpc 0.12.3: https://github.com/apache/iotdb, Apache 2.0
+ tsfile 0.12.3 https://github.com/apache/iotdb Apache 2.0
+ libthrift 0.14.1: https://github.com/apache/thrift, Apache 2.0
========================================================================
MIT licenses
@@ -393,7 +398,9 @@ EPL licenses
The following components are provided under the EPL License. See project link for details.
The text of each license is also included at licenses/LICENSE-[project].txt.
- logback 1.1.11: https://github.com/qos-ch/logback: EPL 1.0
+ logback 1.2.3: https://github.com/qos-ch/logback: EPL 1.0
+ logback-classic 1.2.3: https://github.com/qos-ch/logback: EPL 1.0
+ logback-core 1.2.3: https://github.com/qos-ch/logback: EPL 1.0
========================================================================
CDDL licenses
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index 5151546..d420b0e 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -15,6 +15,7 @@ Natively supported storage:
- TiDB
- InfluxDB
- PostgreSQL
+- IoTDB
## H2
@@ -280,6 +281,24 @@ storage:
All connection-related settings, including URL link, username, and password are found in `application.yml`.
Only part of the settings are listed here. Please follow [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
+## IoTDB
+IoTDB is a time-series database from Apache, which is one of the storage plugin options.
+IoTDB storage plugin is still in progress. Its efficiency will improve in the future.
+
+```yaml
+storage:
+ selector: ${SW_STORAGE:iotdb}
+ iotdb:
+ host: ${SW_STORAGE_IOTDB_HOST:127.0.0.1}
+ rpcPort: ${SW_STORAGE_IOTDB_RPC_PORT:6667}
+ username: ${SW_STORAGE_IOTDB_USERNAME:root}
+ password: ${SW_STORAGE_IOTDB_PASSWORD:root}
+ storageGroup: ${SW_STORAGE_IOTDB_STORAGE_GROUP:root.skywalking}
+ sessionPoolSize: ${SW_STORAGE_IOTDB_SESSIONPOOL_SIZE:16}
+ fetchTaskLogMaxSize: ${SW_STORAGE_IOTDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request
+```
+All connection related settings, including host, rpcPort, username, and password are found in `application.yml`. Please ensure the IoTDB version >= 0.12.3.
+
## More storage extension solutions
Follow the [Storage extension development guide](../../guides/storage-extention.md)
in the [Project Extensions document](../../guides/README.md#project-extensions).
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index cfde9dd..4e6fb81 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -77,6 +77,8 @@
<awaitility.version>3.0.0</awaitility.version>
<httpcore.version>4.4.13</httpcore.version>
<commons-compress.version>1.21</commons-compress.version>
+ <iotdb-session.version>0.12.3</iotdb-session.version>
+ <lz4-java.version>1.6.0</lz4-java.version>
</properties>
<dependencyManagement>
@@ -534,11 +536,21 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${iotdb-session.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>${lz4-java.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 7f29462..44c1b4a 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -188,6 +188,11 @@
<artifactId>storage-tidb-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>storage-iotdb-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- storage module -->
<!-- queryBuild module -->
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index cdce040..631be41 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -242,6 +242,14 @@ storage:
oapAnalyzer: ${SW_STORAGE_ES_OAP_ANALYZER:"{\"analyzer\":{\"oap_analyzer\":{\"type\":\"stop\"}}}"} # the oap analyzer.
oapLogAnalyzer: ${SW_STORAGE_ES_OAP_LOG_ANALYZER:"{\"analyzer\":{\"oap_log_analyzer\":{\"type\":\"standard\"}}}"} # the oap log analyzer. It could be customized by the ES analyzer configuration to support more language log formats, such as Chinese log, Japanese log and etc.
advanced: ${SW_STORAGE_ES_ADVANCED:""}
+ iotdb:
+ host: ${SW_STORAGE_IOTDB_HOST:127.0.0.1}
+ rpcPort: ${SW_STORAGE_IOTDB_RPC_PORT:6667}
+ username: ${SW_STORAGE_IOTDB_USERNAME:root}
+ password: ${SW_STORAGE_IOTDB_PASSWORD:root}
+ storageGroup: ${SW_STORAGE_IOTDB_STORAGE_GROUP:root.skywalking}
+ sessionPoolSize: ${SW_STORAGE_IOTDB_SESSIONPOOL_SIZE:16}
+ fetchTaskLogMaxSize: ${SW_STORAGE_IOTDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml
index 31bfbaa..a1f84e4 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/pom.xml
@@ -33,6 +33,7 @@
<module>storage-zipkin-elasticsearch-plugin</module>
<module>storage-influxdb-plugin</module>
<module>storage-tidb-plugin</module>
+ <module>storage-iotdb-plugin</module>
</modules>
</project>
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/pom.xml b/oap-server/server-storage-plugin/storage-iotdb-plugin/pom.xml
new file mode 100644
index 0000000..e1c6507
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>server-storage-plugin</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>8.9.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>storage-iotdb-plugin</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>server-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>library-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBClient.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBClient.java
new file mode 100644
index 0000000..e0c4680
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBClient.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
+import org.apache.skywalking.oap.server.library.util.HealthChecker;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBInsertRequest;
+
+@Slf4j
+public class IoTDBClient implements Client, HealthCheckable {
+ private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
+ private final IoTDBStorageConfig config;
+
+ private SessionPool sessionPool;
+ private final String storageGroup;
+
+ public static final String DOT = ".";
+ public static final String ALIGN_BY_DEVICE = " align by device";
+
+ public static final String TIME_BUCKET = "time_bucket";
+ public static final String TIME = "Time";
+ public static final String TIMESTAMP = "timestamp";
+
+ public IoTDBClient(IoTDBStorageConfig config) {
+ this.config = config;
+ storageGroup = config.getStorageGroup();
+ }
+
+ @Override
+ public void connect() throws IoTDBConnectionException, StatementExecutionException {
+ try {
+ sessionPool = new SessionPool(config.getHost(), config.getRpcPort(), config.getUsername(),
+ config.getPassword(), config.getSessionPoolSize());
+ sessionPool.setStorageGroup(storageGroup);
+
+ healthChecker.health();
+ } catch (StatementExecutionException e) {
+ if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+ healthChecker.unHealth(e);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ sessionPool.close();
+ this.healthChecker.health();
+ }
+
+ @Override
+ public void registerChecker(HealthChecker healthChecker) {
+ this.healthChecker.register(healthChecker);
+ }
+
+ public SessionPool getSessionPool() {
+ return sessionPool;
+ }
+
+ public IoTDBStorageConfig getConfig() {
+ return config;
+ }
+
+ /**
+ * Write data to IoTDB
+ *
+ * @param request an IoTDBInsertRequest
+ * @throws IOException IoTDBConnectionException or StatementExecutionException
+ */
+ public void write(IoTDBInsertRequest request) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("Writing data to IoTDB: {}", request);
+ }
+
+ StringBuilder devicePath = new StringBuilder();
+ devicePath.append(storageGroup).append(IoTDBClient.DOT).append(request.getModelName());
+ try {
+ // make an index value as a layer name of the storage path
+ if (!request.getIndexes().isEmpty()) {
+ request.getIndexValues().forEach(value -> devicePath.append(IoTDBClient.DOT)
+ .append(indexValue2LayerName(value)));
+ }
+ sessionPool.insertRecord(devicePath.toString(), request.getTime(),
+ request.getMeasurements(), request.getMeasurementTypes(), request.getMeasurementValues());
+ healthChecker.health();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ healthChecker.unHealth(e);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Write a list of data into IoTDB
+ *
+ * @param requestList a list of IoTDBInsertRequest
+ * @throws IOException IoTDBConnectionException or StatementExecutionException
+ */
+ public void write(List<IoTDBInsertRequest> requestList) throws IOException {
+ if (log.isDebugEnabled()) {
+ for (IoTDBInsertRequest request : requestList) {
+ log.debug("Writing data to IoTDB: {}", request);
+ }
+ }
+
+ List<String> devicePathList = new ArrayList<>();
+ List<Long> timeList = new ArrayList<>();
+ List<List<String>> timeseriesListList = new ArrayList<>();
+ List<List<TSDataType>> typesList = new ArrayList<>();
+ List<List<Object>> valuesList = new ArrayList<>();
+
+ requestList.forEach(request -> {
+ StringBuilder devicePath = new StringBuilder();
+ devicePath.append(storageGroup).append(IoTDBClient.DOT).append(request.getModelName());
+ // make an index value as a layer name of the storage path
+ if (!request.getIndexes().isEmpty()) {
+ request.getIndexValues().forEach(value -> devicePath.append(IoTDBClient.DOT)
+ .append(indexValue2LayerName(value)));
+ }
+ devicePathList.add(devicePath.toString());
+ timeList.add(request.getTime());
+ timeseriesListList.add(request.getMeasurements());
+ typesList.add(request.getMeasurementTypes());
+ valuesList.add(request.getMeasurementValues());
+ });
+
+ try {
+ sessionPool.insertRecords(devicePathList, timeList, timeseriesListList, typesList, valuesList);
+ healthChecker.health();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ healthChecker.unHealth(e);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Normal filter query for a list of data. querySQL must contain "align by device"
+ *
+ * @param modelName model name
+ * @param querySQL the SQL for query which must contain "align by device"
+ * @param storageBuilder storage builder for transforming storage result map to entity
+ * @return a list of result data
+ * @throws IOException IoTDBConnectionException or StatementExecutionException
+ */
+ public List<? super StorageData> filterQuery(String modelName, String querySQL,
+ StorageHashMapBuilder<? extends StorageData> storageBuilder)
+ throws IOException {
+ if (!querySQL.contains("align by device")) {
+ throw new IOException("querySQL must contain \"align by device\"");
+ }
+ SessionDataSetWrapper wrapper = null;
+ List<? super StorageData> storageDataList = new ArrayList<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(querySQL);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", querySQL, wrapper.getColumnNames());
+ }
+
+ List<String> columnNames = wrapper.getColumnNames();
+ IoTDBTableMetaInfo tableMetaInfo = IoTDBTableMetaInfo.get(modelName);
+ List<String> indexes = tableMetaInfo.getIndexes();
+ while (wrapper.hasNext()) {
+ Map<String, Object> map = new ConcurrentHashMap<>();
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ // transform timestamp to time_bucket
+ if (!UITemplate.INDEX_NAME.equals(modelName)) {
+ map.put(IoTDBClient.TIME_BUCKET, TimeBucket.getTimeBucket(rowRecord.getTimestamp(),
+ tableMetaInfo.getModel().getDownsampling()));
+ }
+ // field.get(0) -> Device, transform layerName to indexValue
+ String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
+ for (int i = 0; i < indexes.size(); i++) {
+ map.put(indexes.get(i), layerName2IndexValue(layerNames[i + 1]));
+ }
+ for (int i = 0; i < columnNames.size() - 2; i++) {
+ String columnName = columnNames.get(i + 2);
+ Field field = fields.get(i + 1);
+ if (field.getDataType() == null) {
+ continue;
+ }
+ if (field.getDataType().equals(TSDataType.TEXT)) {
+ map.put(columnName, field.getStringValue());
+ } else {
+ map.put(columnName, field.getObjectValue(field.getDataType()));
+ }
+ }
+ if (map.containsKey(IoTDBIndexes.NODE_TYPE_IDX)) {
+ String nodeType = (String) map.get(IoTDBIndexes.NODE_TYPE_IDX);
+ map.put(IoTDBIndexes.NODE_TYPE_IDX, Integer.valueOf(nodeType));
+ }
+ if (modelName.equals(BrowserErrorLogRecord.INDEX_NAME) || modelName.equals(LogRecord.INDEX_NAME)) {
+ map.put(IoTDBClient.TIMESTAMP, map.get("\"" + IoTDBClient.TIMESTAMP + "\""));
+ }
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ // remove double quotes
+ String key = entry.getKey();
+ if (key.contains(".")) {
+ map.put(key.substring(1, key.length() - 1), entry.getValue());
+ }
+ }
+
+ storageDataList.add(storageBuilder.storage2Entity(map));
+ }
+ healthChecker.health();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ healthChecker.unHealth(e);
+ throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + querySQL, e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+ return storageDataList;
+ }
+
+ /**
+ * Query with aggregation function: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value
+ *
+ * @param querySQL the SQL for query which should contain aggregation function
+ * @return the result of aggregation function
+ * @throws IOException IoTDBConnectionException or StatementExecutionException
+ */
+ public List<Double> queryWithAgg(String querySQL) throws IOException {
+ SessionDataSetWrapper wrapper = null;
+ List<Double> results = new ArrayList<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(querySQL);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", querySQL, wrapper.getColumnNames());
+ }
+
+ if (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ for (Field field : fields) {
+ String stringValue = field.getStringValue();
+ if (!stringValue.equals("null")) {
+ results.add(Double.parseDouble(stringValue));
+ }
+ }
+ }
+ healthChecker.health();
+ return results;
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ healthChecker.unHealth(e);
+ throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + querySQL, e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+ }
+
+ /**
+ * Delete data <= deleteTime in one timeseries
+ *
+ * @param device device name
+ * @param deleteTime deleteTime
+ * @throws IOException IoTDBConnectionException or StatementExecutionException
+ */
+ public void deleteData(String device, long deleteTime) throws IOException {
+ try {
+ sessionPool.deleteData(storageGroup + IoTDBClient.DOT + device, deleteTime);
+ healthChecker.health();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ healthChecker.unHealth(e);
+ throw new IOException(e);
+ }
+ }
+
+ public String indexValue2LayerName(String indexValue) {
+ return "\"" + indexValue + "\"";
+ }
+
+ public String layerName2IndexValue(String layerName) {
+ return layerName.substring(0, layerName.length() - 1);
+ }
+
+ public StringBuilder addQueryIndexValue(String modelName, StringBuilder query, Map<String, String> indexAndValueMap) {
+ List<String> indexes = IoTDBTableMetaInfo.get(modelName).getIndexes();
+ indexes.forEach(index -> {
+ if (indexAndValueMap.containsKey(index)) {
+ query.append(IoTDBClient.DOT).append(indexValue2LayerName(indexAndValueMap.get(index)));
+ } else {
+ query.append(IoTDBClient.DOT).append("*");
+ }
+ });
+ return query;
+ }
+
+ public StringBuilder addQueryAsterisk(String modelName, StringBuilder query) {
+ List<String> indexes = IoTDBTableMetaInfo.get(modelName).getIndexes();
+ indexes.forEach(index -> query.append(IoTDBClient.DOT).append("*"));
+ return query;
+ }
+
+ public StringBuilder addModelPath(StringBuilder query, String modelName) {
+ return query.append(storageGroup).append(IoTDBClient.DOT).append(modelName);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java
new file mode 100644
index 0000000..24b679c
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb;
+
+public interface IoTDBIndexes {
+ // Here is the indexes we choose and their order in storage path.
+ String ID_IDX = "id";
+ String ENTITY_ID_IDX = "entity_id";
+ String NODE_TYPE_IDX = "node_type";
+ String SERVICE_ID_IDX = "service_id";
+ String GROUP_IDX = "service_group";
+ String TRACE_ID_IDX = "trace_id";
+
+ static boolean isIndex(String key) {
+ return key.equals(ID_IDX) || key.equals(ENTITY_ID_IDX) || key.equals(NODE_TYPE_IDX) ||
+ key.equals(SERVICE_ID_IDX) || key.equals(GROUP_IDX) || key.equals(TRACE_ID_IDX);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageConfig.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageConfig.java
new file mode 100644
index 0000000..8e34445
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+@Getter
+@Setter
+public class IoTDBStorageConfig extends ModuleConfig {
+ private String host;
+ private int rpcPort;
+ private String username;
+ private String password;
+ private String storageGroup;
+ private int sessionPoolSize;
+
+ private int fetchTaskLogMaxSize;
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java
new file mode 100644
index 0000000..6840ede
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
+import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBBatchDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBStorageDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.cache.IoTDBNetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.management.IoTDBUITemplateManagementDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.profile.IoTDBProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.profile.IoTDBProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.profile.IoTDBProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBAggregationQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBAlarmQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBBrowserLogQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBEventQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBLogQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBMetadataQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBMetricsQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTopNRecordsQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTopologyQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTraceQueryDAO;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+@Slf4j
+public class IoTDBStorageProvider extends ModuleProvider {
+ private final IoTDBStorageConfig config;
+ private IoTDBClient client;
+
+ public IoTDBStorageProvider() {
+ config = new IoTDBStorageConfig();
+ }
+
+ @Override
+ public String name() {
+ return "iotdb";
+ }
+
+ @Override
+ public Class<? extends ModuleDefine> module() {
+ return StorageModule.class;
+ }
+
+ @Override
+ public ModuleConfig createConfigBeanIfAbsent() {
+ return config;
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException {
+ this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+
+ client = new IoTDBClient(config);
+
+ this.registerServiceImplementation(IBatchDAO.class, new IoTDBBatchDAO(client));
+ this.registerServiceImplementation(IHistoryDeleteDAO.class, new IoTDBHistoryDeleteDAO(client));
+ this.registerServiceImplementation(StorageDAO.class, new IoTDBStorageDAO(client));
+
+ this.registerServiceImplementation(INetworkAddressAliasDAO.class, new IoTDBNetworkAddressAliasDAO(client));
+
+ this.registerServiceImplementation(UITemplateManagementDAO.class, new IoTDBUITemplateManagementDAO(client));
+
+ this.registerServiceImplementation(IProfileTaskLogQueryDAO.class,
+ new IoTDBProfileTaskLogQueryDAO(client, config.getFetchTaskLogMaxSize()));
+ this.registerServiceImplementation(IProfileTaskQueryDAO.class, new IoTDBProfileTaskQueryDAO(client));
+ this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class,
+ new IoTDBProfileThreadSnapshotQueryDAO(client));
+
+ this.registerServiceImplementation(IAggregationQueryDAO.class, new IoTDBAggregationQueryDAO(client));
+ this.registerServiceImplementation(IAlarmQueryDAO.class, new IoTDBAlarmQueryDAO(client));
+ this.registerServiceImplementation(IBrowserLogQueryDAO.class, new IoTDBBrowserLogQueryDAO(client));
+ this.registerServiceImplementation(IEventQueryDAO.class, new IoTDBEventQueryDAO(client));
+ this.registerServiceImplementation(ILogQueryDAO.class, new IoTDBLogQueryDAO(client));
+ this.registerServiceImplementation(IMetadataQueryDAO.class, new IoTDBMetadataQueryDAO(client));
+ this.registerServiceImplementation(IMetricsQueryDAO.class, new IoTDBMetricsQueryDAO(client));
+ this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new IoTDBTopNRecordsQueryDAO(client));
+ this.registerServiceImplementation(ITopologyQueryDAO.class, new IoTDBTopologyQueryDAO(client));
+ this.registerServiceImplementation(ITraceQueryDAO.class, new IoTDBTraceQueryDAO(client));
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
+ .provider()
+ .getService(MetricsCreator.class);
+ HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
+ "storage_iotdb", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ client.registerChecker(healthChecker);
+ try {
+ client.connect();
+
+ IoTDBTableInstaller installer = new IoTDBTableInstaller(client, getManager());
+ getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
+ } catch (StorageException | IoTDBConnectionException | StatementExecutionException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException {
+
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[]{CoreModule.NAME};
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableInstaller.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableInstaller.java
new file mode 100644
index 0000000..3e26ff5
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableInstaller.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb;
+
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * The Design of Apache IoTDB Storage Option
+ * https://skywalking.apache.org/blog/2021-11-23-design-of-iotdb-storage-option/
+ */
+public class IoTDBTableInstaller extends ModelInstaller {
+ public IoTDBTableInstaller(Client client, ModuleManager moduleManager) {
+ super(client, moduleManager);
+ }
+
+ @Override
+ protected boolean isExists(Model model) {
+ IoTDBTableMetaInfo.addModel(model);
+ return true;
+ }
+
+ @Override
+ protected void createTable(Model model) {
+ // Automatically create table when insert data
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java
new file mode 100644
index 0000000..4b14d4f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb;
+
+import com.google.gson.JsonObject;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Builder
+@AllArgsConstructor
+public class IoTDBTableMetaInfo {
+ private static final Map<String, IoTDBTableMetaInfo> TABLE_META_INFOS = new HashMap<>();
+
+ private final Model model;
+ private final Map<String, TSDataType> columnAndTypeMap;
+ private final List<String> indexes;
+
+ public static void addModel(Model model) {
+ final List<ModelColumn> columns = model.getColumns();
+ final Map<String, String> storageAndIndexMap = new HashMap<>();
+ final Map<String, TSDataType> columnAndTypeMap = new HashMap<>();
+ final List<String> indexes = new ArrayList<>();
+
+ storageAndIndexMap.put(model.getName(), IoTDBIndexes.ID_IDX);
+ columns.forEach(column -> {
+ String columnName = column.getColumnName().getName();
+ if (IoTDBIndexes.isIndex(columnName)) {
+ storageAndIndexMap.put(column.getColumnName().getStorageName(), columnName);
+ } else {
+ columnAndTypeMap.put(columnName, typeToTSDataType(column.getType()));
+ }
+ });
+
+ // index order: id, entity_id, node_type, service_id, service_group, trace_id
+ indexes.add(IoTDBIndexes.ID_IDX);
+ if (storageAndIndexMap.containsValue(IoTDBIndexes.ENTITY_ID_IDX)) {
+ indexes.add(IoTDBIndexes.ENTITY_ID_IDX);
+ }
+ if (storageAndIndexMap.containsValue(IoTDBIndexes.NODE_TYPE_IDX)) {
+ indexes.add(IoTDBIndexes.NODE_TYPE_IDX);
+ }
+ if (storageAndIndexMap.containsValue(IoTDBIndexes.SERVICE_ID_IDX)) {
+ indexes.add(IoTDBIndexes.SERVICE_ID_IDX);
+ }
+ if (storageAndIndexMap.containsValue(IoTDBIndexes.GROUP_IDX)) {
+ indexes.add(IoTDBIndexes.GROUP_IDX);
+ }
+ if (storageAndIndexMap.containsValue(IoTDBIndexes.TRACE_ID_IDX)) {
+ indexes.add(IoTDBIndexes.TRACE_ID_IDX);
+ }
+
+ final IoTDBTableMetaInfo tableMetaInfo = IoTDBTableMetaInfo.builder().model(model)
+ .columnAndTypeMap(columnAndTypeMap).indexes(indexes).build();
+ TABLE_META_INFOS.put(model.getName(), tableMetaInfo);
+ }
+
+ public static IoTDBTableMetaInfo get(String moduleName) {
+ return TABLE_META_INFOS.get(moduleName);
+ }
+
+ private static TSDataType typeToTSDataType(Class<?> type) {
+ if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) {
+ return TSDataType.INT32;
+ } else if (Long.class.equals(type) || long.class.equals(type)) {
+ return TSDataType.INT64;
+ } else if (Float.class.equals(type) || float.class.equals(type)) {
+ return TSDataType.FLOAT;
+ } else if (Double.class.equals(type) || double.class.equals(type)) {
+ return TSDataType.DOUBLE;
+ } else if (Boolean.class.equals(type) || boolean.class.equals(type)) {
+ return TSDataType.BOOLEAN;
+ } else if (String.class.equals(type)) {
+ return TSDataType.TEXT;
+ } else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
+ return TSDataType.TEXT;
+ } else if (byte[].class.equals(type)) {
+ return TSDataType.TEXT;
+ } else if (JsonObject.class.equals(type)) {
+ return TSDataType.TEXT;
+ } else if (List.class.isAssignableFrom(type)) {
+ return TSDataType.TEXT;
+ } else {
+ throw new IllegalArgumentException("Unsupported data type: " + type.getName());
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBBatchDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBBatchDAO.java
new file mode 100644
index 0000000..fba54c2
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBBatchDAO.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBBatchDAO implements IBatchDAO {
+ private final IoTDBClient client;
+
+ @Override
+ public void insert(InsertRequest insertRequest) {
+ try {
+ client.write((IoTDBInsertRequest) insertRequest);
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
+ if (CollectionUtils.isEmpty(prepareRequests)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+ }
+ List<IoTDBInsertRequest> tempPrepareRequests = new ArrayList<>(prepareRequests.size());
+ prepareRequests.forEach(prepareRequest -> tempPrepareRequests.add((IoTDBInsertRequest) prepareRequest));
+ try {
+ client.write(tempPrepareRequests);
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBHistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBHistoryDeleteDAO.java
new file mode 100644
index 0000000..5d71b16
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBHistoryDeleteDAO.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBHistoryDeleteDAO implements IHistoryDeleteDAO {
+ private final IoTDBClient client;
+
+ @Override
+ public void deleteHistory(Model model, String timeBucketColumnName, int ttl) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("TTL execution log, model: {}, TTL: {}", model.getName(), ttl);
+ }
+ long deadline = Long.parseLong(new DateTime().plusDays(-ttl).toString("yyyyMMddHHmm"));
+ client.deleteData(model.getName(), TimeBucket.getTimestamp(deadline));
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBInsertRequest.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBInsertRequest.java
new file mode 100644
index 0000000..659a5be
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBInsertRequest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBTableMetaInfo;
+
+@Getter
+@Setter
+@ToString
+@Slf4j
+public class IoTDBInsertRequest implements InsertRequest, UpdateRequest {
+ private String modelName;
+ private long time;
+ private List<String> indexes;
+ private List<String> indexValues;
+ private List<String> measurements;
+ private List<TSDataType> measurementTypes;
+ private List<Object> measurementValues;
+
+ public <T extends StorageData> IoTDBInsertRequest(String modelName, long time, T storageData,
+ StorageHashMapBuilder<T> storageBuilder) {
+ this.modelName = modelName;
+ this.time = time;
+ indexes = IoTDBTableMetaInfo.get(modelName).getIndexes();
+ indexValues = new ArrayList<>(indexes.size());
+ Map<String, Object> storageMap = storageBuilder.entity2Storage(storageData);
+
+ indexes.forEach(index -> {
+ if (index.equals(IoTDBIndexes.ID_IDX)) {
+ indexValues.add(storageData.id());
+ } else if (storageMap.containsKey(index)) {
+ // avoid `service_group` be "null" when inserting
+ if (index.equals(IoTDBIndexes.GROUP_IDX) && storageMap.get(index) == null) {
+ indexValues.add("");
+ } else {
+ indexValues.add(String.valueOf(storageMap.get(index)));
+ }
+ storageMap.remove(index);
+ }
+ });
+
+ // time_bucket has changed to time before calling this method, so remove it from measurements
+ storageMap.remove(IoTDBClient.TIME_BUCKET);
+ // processing value to make it suitable for storage
+ Iterator<Map.Entry<String, Object>> entryIterator = storageMap.entrySet().iterator();
+ while (entryIterator.hasNext()) {
+ Map.Entry<String, Object> entry = entryIterator.next();
+ // IoTDB doesn't allow insert null value.
+ if (entry.getValue() == null) {
+ entryIterator.remove();
+ }
+ if (entry.getValue() instanceof StorageDataComplexObject) {
+ storageMap.put(entry.getKey(), ((StorageDataComplexObject) entry.getValue()).toStorageData());
+ }
+ }
+
+ measurements = new ArrayList<>(storageMap.keySet());
+ Map<String, TSDataType> columnAndTypeMap = IoTDBTableMetaInfo.get(modelName).getColumnAndTypeMap();
+ measurementTypes = new ArrayList<>(measurements.size());
+ for (String measurement : measurements) {
+ measurementTypes.add(columnAndTypeMap.get(measurement));
+ }
+ measurementValues = new ArrayList<>(storageMap.values());
+
+ // IoTDB doesn't allow a measurement named `timestamp` or contains `.`
+ for (String key : storageMap.keySet()) {
+ if (key.equals(IoTDBClient.TIMESTAMP) || key.contains(".")) {
+ int idx = measurements.indexOf(key);
+ measurements.set(idx, "\"" + key + "\"");
+ }
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBManagementDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBManagementDAO.java
new file mode 100644
index 0000000..a804896
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBManagementDAO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
+import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+import java.io.IOException;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBManagementDAO implements IManagementDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<ManagementData> storageBuilder;
+
+ @Override
+ public void insert(Model model, ManagementData storageData) throws IOException {
+ IoTDBInsertRequest request = new IoTDBInsertRequest(model.getName(), 1L, storageData, storageBuilder);
+ client.write(request);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBMetricsDAO.java
new file mode 100644
index 0000000..e93baa3
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBMetricsDAO.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBMetricsDAO implements IMetricsDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<Metrics> storageBuilder;
+
+ @Override
+ public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ for (Metrics metric : metrics) {
+ query.append(", ");
+ query = client.addModelPath(query, model.getName());
+ query.append(IoTDBClient.DOT).append(client.indexValue2LayerName(metric.id()));
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+ String queryString = query.toString().replaceFirst(", ", "");
+ List<? super StorageData> storageDataList = client.filterQuery(model.getName(), queryString, storageBuilder);
+ List<Metrics> newMetrics = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> newMetrics.add((Metrics) storageData));
+ return newMetrics;
+ }
+
+ @Override
+ public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
+ final long timestamp = TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling());
+ return new IoTDBInsertRequest(model.getName(), timestamp, metrics, storageBuilder);
+ }
+
+ @Override
+ public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
+ return (UpdateRequest) prepareBatchInsert(model, metrics);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBNoneStreamDAO.java
new file mode 100644
index 0000000..324fa90
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBNoneStreamDAO.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
+
+import java.io.IOException;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
+import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@RequiredArgsConstructor
+public class IoTDBNoneStreamDAO implements INoneStreamDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<NoneStream> storageBuilder;
+
+ @Override
+ public void insert(Model model, NoneStream noneStream) throws IOException {
+ final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling());
+ final IoTDBInsertRequest request = new IoTDBInsertRequest(model.getName(), timestamp, noneStream, storageBuilder);
+ client.write(request);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBRecordDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBRecordDAO.java
new file mode 100644
index 0000000..c6ae21a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBRecordDAO.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
+
+import java.util.List;
+import java.util.Objects;
+import lombok.RequiredArgsConstructor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+
+@RequiredArgsConstructor
+public class IoTDBRecordDAO implements IRecordDAO {
+ private final StorageHashMapBuilder<Record> storageBuilder;
+
+ @Override
+ public InsertRequest prepareBatchInsert(Model model, Record record) {
+ final long timestamp = TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling());
+ IoTDBInsertRequest request = new IoTDBInsertRequest(model.getName(), timestamp, record, storageBuilder);
+
+ // transform tags of SegmentRecord, LogRecord, AlarmRecord to tag1, tag2, ...
+ List<String> measurements = request.getMeasurements();
+ List<TSDataType> measurementTypes = request.getMeasurementTypes();
+ List<Object> measurementValues = request.getMeasurementValues();
+ List<Tag> rawTags = null;
+ if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
+ rawTags = ((SegmentRecord) record).getTagsRawData();
+ measurementTypes.remove(measurements.indexOf(SegmentRecord.TAGS));
+ measurementValues.remove(measurements.indexOf(SegmentRecord.TAGS));
+ measurements.remove(SegmentRecord.TAGS);
+ } else if (LogRecord.INDEX_NAME.equals(model.getName())) {
+ rawTags = ((LogRecord) record).getTags();
+ measurementTypes.remove(measurements.indexOf(LogRecord.TAGS));
+ measurementValues.remove(measurements.indexOf(LogRecord.TAGS));
+ measurements.remove(LogRecord.TAGS);
+ } else if (AlarmRecord.INDEX_NAME.equals(model.getName())) {
+ rawTags = ((AlarmRecord) record).getTags();
+ measurementTypes.remove(measurements.indexOf(AlarmRecord.TAGS));
+ measurementValues.remove(measurements.indexOf(AlarmRecord.TAGS));
+ measurements.remove(AlarmRecord.TAGS);
+ }
+ if (Objects.nonNull(rawTags)) {
+ rawTags.forEach(rawTag -> {
+ if (rawTag.getKey().contains(".")) {
+ measurements.add("\"" + rawTag.getKey() + "\"");
+ } else {
+ measurements.add(rawTag.getKey());
+ }
+ measurementTypes.add(TSDataType.TEXT);
+ measurementValues.add(rawTag.getValue());
+ });
+ }
+ request.setMeasurements(measurements);
+ request.setMeasurementTypes(measurementTypes);
+ request.setMeasurementValues(measurementValues);
+ return request;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBStorageDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBStorageDAO.java
new file mode 100644
index 0000000..936897f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBStorageDAO.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
+import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@RequiredArgsConstructor
+public class IoTDBStorageDAO implements StorageDAO {
+ private final IoTDBClient ioTDBClient;
+
+ @Override
+ public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
+ return new IoTDBMetricsDAO(ioTDBClient, (StorageHashMapBuilder<Metrics>) storageBuilder);
+ }
+
+ @Override
+ public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
+ return new IoTDBRecordDAO((StorageHashMapBuilder<Record>) storageBuilder);
+ }
+
+ @Override
+ public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) {
+ return new IoTDBNoneStreamDAO(ioTDBClient, (StorageHashMapBuilder<NoneStream>) storageBuilder);
+ }
+
+ @Override
+ public IManagementDAO newManagementDao(StorageBuilder storageBuilder) {
+ return new IoTDBManagementDAO(ioTDBClient, (StorageHashMapBuilder<ManagementData>) storageBuilder);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/cache/IoTDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/cache/IoTDBNetworkAddressAliasDAO.java
new file mode 100644
index 0000000..47b7999
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/cache/IoTDBNetworkAddressAliasDAO.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.cache;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+ private final NetworkAddressAlias.Builder storageBuilder = new NetworkAddressAlias.Builder();
+ private final IoTDBClient client;
+
+ @Override
+ public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, NetworkAddressAlias.INDEX_NAME);
+ query = client.addQueryAsterisk(NetworkAddressAlias.INDEX_NAME, query);
+ query.append(" where ").append(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET).append(" >= ").append(timeBucket)
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ try {
+ List<? super StorageData> storageDataList = client.filterQuery(NetworkAddressAlias.INDEX_NAME,
+ query.toString(), storageBuilder);
+ List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> networkAddressAliases.add((NetworkAddressAlias) storageData));
+ return networkAddressAliases;
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+ return new ArrayList<>();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/management/IoTDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/management/IoTDBUITemplateManagementDAO.java
new file mode 100644
index 0000000..0bf9c20
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/management/IoTDBUITemplateManagementDAO.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.management;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
+import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBInsertRequest;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBUITemplateManagementDAO implements UITemplateManagementDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<UITemplate> storageBuilder = new UITemplate.Builder();
+ private static final long UI_TEMPLATE_TIMESTAMP = 1L;
+
+ @Override
+ public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, UITemplate.INDEX_NAME);
+ query = client.addQueryAsterisk(UITemplate.INDEX_NAME, query);
+ if (!includingDisabled) {
+ query.append(" where ").append(UITemplate.DISABLED).append(" = ").append(BooleanUtils.FALSE);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(UITemplate.INDEX_NAME, query.toString(),
+ storageBuilder);
+ List<DashboardConfiguration> dashboardConfigurationList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData ->
+ dashboardConfigurationList.add(new DashboardConfiguration().fromEntity((UITemplate) storageData)));
+ return dashboardConfigurationList;
+ }
+
+ @Override
+ public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
+ final UITemplate uiTemplate = setting.toEntity();
+
+ IoTDBInsertRequest request = new IoTDBInsertRequest(UITemplate.INDEX_NAME, UI_TEMPLATE_TIMESTAMP,
+ uiTemplate, storageBuilder);
+ client.write(request);
+ return TemplateChangeStatus.builder().status(true).build();
+ }
+
+ @Override
+ public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
+ final UITemplate uiTemplate = setting.toEntity();
+
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, UITemplate.INDEX_NAME);
+ query.append(IoTDBClient.DOT).append(client.indexValue2LayerName(uiTemplate.id()))
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+ List<? super StorageData> queryResult = client.filterQuery(UITemplate.INDEX_NAME, query.toString(), storageBuilder);
+ if (queryResult.size() == 0) {
+ return TemplateChangeStatus.builder().status(false).message("Can't find the template").build();
+ } else {
+ IoTDBInsertRequest request = new IoTDBInsertRequest(UITemplate.INDEX_NAME, UI_TEMPLATE_TIMESTAMP,
+ uiTemplate, storageBuilder);
+ client.write(request);
+ return TemplateChangeStatus.builder().status(true).build();
+ }
+ }
+
+ @Override
+ public TemplateChangeStatus disableTemplate(String name) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, UITemplate.INDEX_NAME);
+ query.append(IoTDBClient.DOT).append(client.indexValue2LayerName(name))
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> queryResult = client.filterQuery(UITemplate.INDEX_NAME, query.toString(), storageBuilder);
+ if (queryResult.size() == 0) {
+ return TemplateChangeStatus.builder().status(false).message("Can't find the template").build();
+ } else {
+ final UITemplate uiTemplate = (UITemplate) queryResult.get(0);
+ uiTemplate.setDisabled(BooleanUtils.TRUE);
+ IoTDBInsertRequest request = new IoTDBInsertRequest(UITemplate.INDEX_NAME, UI_TEMPLATE_TIMESTAMP,
+ uiTemplate, storageBuilder);
+ client.write(request);
+ return TemplateChangeStatus.builder().status(true).build();
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskLogQueryDAO.java
new file mode 100644
index 0000000..4695ed8
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskLogQueryDAO.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.profile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<ProfileTaskLogRecord> storageBuilder = new ProfileTaskLogRecord.Builder();
+ private final int fetchTaskLogMaxSize;
+
+ @Override
+ public List<ProfileTaskLog> getTaskLogList() throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ProfileTaskLogRecord.INDEX_NAME);
+ query = client.addQueryAsterisk(ProfileTaskLogRecord.INDEX_NAME, query);
+ query.append(" limit ").append(fetchTaskLogMaxSize).append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ProfileTaskLogRecord.INDEX_NAME, query.toString(), storageBuilder);
+ List<ProfileTaskLogRecord> profileTaskLogRecordList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> profileTaskLogRecordList.add((ProfileTaskLogRecord) storageData));
+ // resort by self, because of the query result order by time.
+ profileTaskLogRecordList.sort((ProfileTaskLogRecord r1, ProfileTaskLogRecord r2) ->
+ Long.compare(r2.getOperationTime(), r1.getOperationTime()));
+ List<ProfileTaskLog> profileTaskLogList = new ArrayList<>(profileTaskLogRecordList.size());
+ profileTaskLogRecordList.forEach(profileTaskLogRecord -> profileTaskLogList.add(parseLog(profileTaskLogRecord)));
+ return profileTaskLogList;
+ }
+
+ private ProfileTaskLog parseLog(ProfileTaskLogRecord record) {
+ return ProfileTaskLog.builder()
+ .id(record.id())
+ .taskId(record.getTaskId())
+ .instanceId(record.getInstanceId())
+ .operationType(ProfileTaskLogOperationType.parse(record.getOperationType()))
+ .operationTime(record.getOperationTime())
+ .build();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskQueryDAO.java
new file mode 100644
index 0000000..0a2b443
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskQueryDAO.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.profile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+ private final IoTDBClient client;
+ private final ProfileTaskRecord.Builder storageBuilder = new ProfileTaskRecord.Builder();
+
+ @Override
+ public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket,
+ Long endTimeBucket, Integer limit) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ProfileTaskRecord.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ if (StringUtil.isNotEmpty(serviceId)) {
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
+ }
+ query = client.addQueryIndexValue(ProfileTaskRecord.INDEX_NAME, query, indexAndValueMap);
+
+ StringBuilder where = new StringBuilder(" where ");
+ if (StringUtil.isNotEmpty(endpointName)) {
+ where.append(ProfileTaskRecord.ENDPOINT_NAME).append(" = \"").append(endpointName).append("\"").append(" and ");
+ }
+ if (Objects.nonNull(startTimeBucket)) {
+ where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTimeBucket)).append(" and ");
+ }
+ if (Objects.nonNull(endTimeBucket)) {
+ where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTimeBucket)).append(" and ");
+ }
+ if (where.length() > 7) {
+ int length = where.length();
+ where.delete(length - 5, length);
+ query.append(where);
+ }
+ if (Objects.nonNull(limit)) {
+ query.append(" limit ").append(limit);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ProfileTaskRecord.INDEX_NAME, query.toString(), storageBuilder);
+ List<ProfileTask> profileTaskList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> profileTaskList.add(record2ProfileTask((ProfileTaskRecord) storageData)));
+ return profileTaskList;
+ }
+
+ @Override
+ public ProfileTask getById(String id) throws IOException {
+ if (StringUtil.isEmpty(id)) {
+ return null;
+ }
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ProfileTaskRecord.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.ID_IDX, id);
+ query = client.addQueryIndexValue(ProfileTaskRecord.INDEX_NAME, query, indexAndValueMap);
+ query.append(" limit 1").append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ProfileTaskRecord.INDEX_NAME, query.toString(), storageBuilder);
+ return record2ProfileTask((ProfileTaskRecord) storageDataList.get(0));
+ }
+
+ private static ProfileTask record2ProfileTask(ProfileTaskRecord record) {
+ return ProfileTask.builder()
+ .id(record.id())
+ .serviceId(record.getServiceId())
+ .endpointName(record.getEndpointName())
+ .startTime(record.getStartTime())
+ .createTime(record.getCreateTime())
+ .duration(record.getDuration())
+ .minDurationThreshold(record.getMinDurationThreshold())
+ .dumpPeriod(record.getDumpPeriod())
+ .maxSamplingCount(record.getMaxSamplingCount())
+ .build();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileThreadSnapshotQueryDAO.java
new file mode 100644
index 0000000..44cfcfb
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileThreadSnapshotQueryDAO.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.profile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@RequiredArgsConstructor
+public class IoTDBProfileThreadSnapshotQueryDAO implements IProfileThreadSnapshotQueryDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<ProfileThreadSnapshotRecord> profileThreadSnapshotRecordBuilder = new ProfileThreadSnapshotRecord.Builder();
+ private final StorageHashMapBuilder<SegmentRecord> segmentRecordBuilder = new SegmentRecord.Builder();
+
+ @Override
+ public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ProfileThreadSnapshotRecord.INDEX_NAME);
+ query = client.addQueryAsterisk(ProfileThreadSnapshotRecord.INDEX_NAME, query);
+ query.append(" where ").append(ProfileThreadSnapshotRecord.TASK_ID).append(" = \"").append(taskId).append("\"")
+ .append(" and ").append(ProfileThreadSnapshotRecord.SEQUENCE).append(" = 0")
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ProfileThreadSnapshotRecord.INDEX_NAME,
+ query.toString(), profileThreadSnapshotRecordBuilder);
+ // We can insure the size of List, so use ArrayList to improve visit speed. (Other storage plugin use LinkedList)
+ final List<String> segmentIds = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> segmentIds.add(((ProfileThreadSnapshotRecord) storageData).getSegmentId()));
+ if (segmentIds.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
+ // https://github.com/apache/iotdb/discussions/3888
+ query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, SegmentRecord.INDEX_NAME);
+ query = client.addQueryAsterisk(SegmentRecord.INDEX_NAME, query);
+ query.append(" where ").append(SegmentRecord.SEGMENT_ID).append(" in (");
+ for (String segmentId : segmentIds) {
+ query.append("\"").append(segmentId).append("\"").append(", ");
+ }
+ query.delete(query.length() - 2, query.length()).append(")").append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ storageDataList = client.filterQuery(SegmentRecord.INDEX_NAME, query.toString(), segmentRecordBuilder);
+ List<SegmentRecord> segmentRecordList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> segmentRecordList.add((SegmentRecord) storageData));
+ // resort by self, because of the select query result order by time.
+ segmentRecordList.sort((SegmentRecord r1, SegmentRecord r2) -> Long.compare(r2.getStartTime(), r1.getStartTime()));
+
+ List<BasicTrace> result = new ArrayList<>(segmentRecordList.size());
+ segmentRecordList.forEach(segmentRecord -> {
+ BasicTrace basicTrace = new BasicTrace();
+ basicTrace.setSegmentId(segmentRecord.getSegmentId());
+ basicTrace.setStart(String.valueOf(segmentRecord.getStartTime()));
+ basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(segmentRecord.getEndpointId()).getEndpointName());
+ basicTrace.setDuration(segmentRecord.getLatency());
+ basicTrace.setError(BooleanUtils.valueToBoolean(segmentRecord.getIsError()));
+ basicTrace.getTraceIds().add(segmentRecord.getTraceId());
+ result.add(basicTrace);
+ });
+ return result;
+ }
+
+ @Override
+ public int queryMinSequence(String segmentId, long start, long end) throws IOException {
+ return querySequenceWithAgg("min_value", segmentId, start, end);
+ }
+
+ @Override
+ public int queryMaxSequence(String segmentId, long start, long end) throws IOException {
+ return querySequenceWithAgg("max_value", segmentId, start, end);
+ }
+
+ @Override
+ public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ProfileThreadSnapshotRecord.INDEX_NAME);
+ query = client.addQueryAsterisk(ProfileThreadSnapshotRecord.INDEX_NAME, query);
+ query.append(" where ").append(ProfileThreadSnapshotRecord.SEGMENT_ID).append(" = \"").append(segmentId).append("\"")
+ .append(" and ").append(ProfileThreadSnapshotRecord.SEQUENCE).append(" >= ").append(minSequence)
+ .append(" and ").append(ProfileThreadSnapshotRecord.SEQUENCE).append(" <= ").append(maxSequence)
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ProfileThreadSnapshotRecord.INDEX_NAME,
+ query.toString(), profileThreadSnapshotRecordBuilder);
+ List<ProfileThreadSnapshotRecord> profileThreadSnapshotRecordList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> profileThreadSnapshotRecordList.add((ProfileThreadSnapshotRecord) storageData));
+ return profileThreadSnapshotRecordList;
+ }
+
+ @Override
+ public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, SegmentRecord.INDEX_NAME);
+ query = client.addQueryAsterisk(SegmentRecord.INDEX_NAME, query);
+ query.append(" where ").append(SegmentRecord.SEGMENT_ID).append(" = \"").append(segmentId).append("\"")
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(SegmentRecord.INDEX_NAME,
+ query.toString(), segmentRecordBuilder);
+ if (storageDataList.isEmpty()) {
+ return null;
+ }
+ return (SegmentRecord) storageDataList.get(0);
+ }
+
+ private int querySequenceWithAgg(String aggType, String segmentId, long start, long end) throws IOException {
+ // This method has poor efficiency. It queries all data which meets a condition without aggregation function
+ // See https://github.com/apache/iotdb/discussions/3907
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ProfileThreadSnapshotRecord.INDEX_NAME);
+ query = client.addQueryAsterisk(ProfileThreadSnapshotRecord.INDEX_NAME, query);
+ query.append(" where ").append(ProfileThreadSnapshotRecord.SEGMENT_ID).append(" = \"").append(segmentId).append("\"")
+ .append(" and ").append(ProfileThreadSnapshotRecord.DUMP_TIME).append(" >= ").append(start)
+ .append(" and ").append(ProfileThreadSnapshotRecord.DUMP_TIME).append(" <= ").append(end)
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+ List<? super StorageData> storageDataList = client.filterQuery(ProfileThreadSnapshotRecord.INDEX_NAME,
+ query.toString(), profileThreadSnapshotRecordBuilder);
+
+ if (aggType.equals("min_value")) {
+ int minValue = Integer.MAX_VALUE;
+ for (Object storageData : storageDataList) {
+ ProfileThreadSnapshotRecord profileThreadSnapshotRecord = (ProfileThreadSnapshotRecord) storageData;
+ int sequence = profileThreadSnapshotRecord.getSequence();
+ minValue = Math.min(minValue, sequence);
+ }
+ return minValue;
+ } else if (aggType.equals("max_value")) {
+ int maxValue = Integer.MIN_VALUE;
+ for (Object storageData : storageDataList) {
+ ProfileThreadSnapshotRecord profileThreadSnapshotRecord = (ProfileThreadSnapshotRecord) storageData;
+ int sequence = profileThreadSnapshotRecord.getSequence();
+ maxValue = Math.max(maxValue, sequence);
+ }
+ return maxValue;
+ } else {
+ throw new IOException("Wrong aggregation function");
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAggregationQueryDAO.java
new file mode 100644
index 0000000..0572b46
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAggregationQueryDAO.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
+import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBAggregationQueryDAO implements IAggregationQueryDAO {
+ private final IoTDBClient client;
+
+ @Override
+ public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueColumnName, Duration duration,
+ List<KeyValue> additionalConditions) throws IOException {
+ // This method maybe have poor efficiency. It queries all data which meets a condition without aggregation function.
+ // https://github.com/apache/iotdb/issues/4006
+ StringBuilder query = new StringBuilder();
+ query.append(String.format("select %s from ", valueColumnName));
+ query = client.addModelPath(query, condition.getName());
+
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ List<KeyValue> measurementConditions = new ArrayList<>();
+ if (additionalConditions != null) {
+ for (KeyValue additionalCondition : additionalConditions) {
+ String key = additionalCondition.getKey();
+ if (IoTDBIndexes.isIndex(key)) {
+ indexAndValueMap.put(key, additionalCondition.getValue());
+ } else {
+ measurementConditions.add(additionalCondition);
+ }
+ }
+ }
+ if (!indexAndValueMap.isEmpty()) {
+ query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
+ } else {
+ query = client.addQueryAsterisk(condition.getName(), query);
+ }
+
+ query.append(" where ").append(IoTDBClient.TIME).append(" >= ").append(duration.getStartTimestamp())
+ .append(" and ").append(IoTDBClient.TIME).append(" <= ").append(duration.getEndTimestamp());
+ if (!measurementConditions.isEmpty()) {
+ for (KeyValue measurementCondition : measurementConditions) {
+ query.append(" and ").append(measurementCondition.getKey()).append(" = \"")
+ .append(measurementCondition.getValue()).append("\"");
+ }
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ List<SelectedRecord> topEntities = new ArrayList<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(query.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
+ }
+
+ Map<String, Double> entityIdAndSumMap = new HashMap<>();
+ Map<String, Integer> entityIdAndCountMap = new HashMap<>();
+ while (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
+ String entityId = client.layerName2IndexValue(layerNames[2]);
+ double value = Double.parseDouble(fields.get(1).getStringValue());
+ entityIdAndSumMap.merge(entityId, value, Double::sum);
+ entityIdAndCountMap.merge(entityId, 1, Integer::sum);
+ }
+
+ entityIdAndSumMap.forEach((String entityId, Double sum) -> {
+ double count = entityIdAndCountMap.get(entityId);
+ double avg = sum / count;
+ SelectedRecord topNEntity = new SelectedRecord();
+ topNEntity.setId(entityId);
+ topNEntity.setValue(String.valueOf((long) avg));
+ topEntities.add(topNEntity);
+ });
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+
+ if (condition.getOrder().equals(Order.DES)) {
+ topEntities.sort((SelectedRecord t1, SelectedRecord t2) ->
+ Double.compare(Double.parseDouble(t2.getValue()), Double.parseDouble(t1.getValue())));
+ } else {
+ topEntities.sort(Comparator.comparingDouble((SelectedRecord t) -> Double.parseDouble(t.getValue())));
+ }
+ int limit = condition.getTopN();
+ return limit > topEntities.size() ? topEntities : topEntities.subList(0, limit);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAlarmQueryDAO.java
new file mode 100644
index 0000000..00d7a58
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAlarmQueryDAO.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
+import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
+import org.apache.skywalking.oap.server.core.query.type.Alarms;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@RequiredArgsConstructor
+public class IoTDBAlarmQueryDAO implements IAlarmQueryDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<AlarmRecord> storageBuilder = new AlarmRecord.Builder();
+
+ @Override
+ public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
+ StringBuilder query = new StringBuilder();
+ // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
+ // https://github.com/apache/iotdb/discussions/3888
+ query.append("select * from ");
+ query = client.addModelPath(query, AlarmRecord.INDEX_NAME);
+ query = client.addQueryAsterisk(AlarmRecord.INDEX_NAME, query);
+
+ StringBuilder where = new StringBuilder(" where ");
+ if (Objects.nonNull(scopeId)) {
+ where.append(AlarmRecord.SCOPE).append(" = ").append(scopeId).append(" and ");
+ }
+ if (startTB != 0 && endTB != 0) {
+ where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB)).append(" and ");
+ where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB)).append(" and ");
+ }
+ if (!Strings.isNullOrEmpty(keyword)) {
+ where.append(AlarmRecord.ALARM_MESSAGE).append(" like '%").append(keyword).append("%'").append(" and ");
+ }
+ if (CollectionUtils.isNotEmpty(tags)) {
+ for (final Tag tag : tags) {
+ where.append(tag.getKey()).append(" = \"").append(tag.getValue()).append("\"").append(" and ");
+ }
+ }
+ if (where.length() > 7) {
+ int length = where.length();
+ where.delete(length - 5, length);
+ query.append(where);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ Alarms alarms = new Alarms();
+ List<? super StorageData> storageDataList = client.filterQuery(AlarmRecord.INDEX_NAME, query.toString(), storageBuilder);
+ int limitCount = 0;
+ for (int i = from; i < storageDataList.size(); i++) {
+ if (limitCount < limit) {
+ limitCount++;
+ AlarmRecord alarmRecord = (AlarmRecord) storageDataList.get(i);
+ alarms.getMsgs().add(parseMessage(alarmRecord));
+ }
+ }
+ alarms.setTotal(storageDataList.size());
+ // resort by self, because of the select query result order by time.
+ alarms.getMsgs().sort((AlarmMessage m1, AlarmMessage m2) -> Long.compare(m2.getStartTime(), m1.getStartTime()));
+ return alarms;
+ }
+
+ private AlarmMessage parseMessage(AlarmRecord alarmRecord) {
+ AlarmMessage message = new AlarmMessage();
+ message.setId(alarmRecord.getId0());
+ message.setId1(alarmRecord.getId1());
+ message.setMessage(alarmRecord.getAlarmMessage());
+ message.setStartTime(alarmRecord.getStartTime());
+ message.setScope(Scope.Finder.valueOf(alarmRecord.getScope()));
+ message.setScopeId(alarmRecord.getScope());
+ if (!CollectionUtils.isEmpty(alarmRecord.getTagsRawData())) {
+ parserDataBinary(alarmRecord.getTagsRawData(), message.getTags());
+ }
+ return message;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBBrowserLogQueryDAO.java
new file mode 100644
index 0000000..c1f6a83
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBBrowserLogQueryDAO.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
+import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
+import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBBrowserLogQueryDAO implements IBrowserLogQueryDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<BrowserErrorLogRecord> storageBuilder = new BrowserErrorLogRecord.Builder();
+
+ @Override
+ public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId,
+ BrowserErrorCategory category, long startSecondTB,
+ long endSecondTB, int limit, int from) throws IOException {
+ StringBuilder query = new StringBuilder();
+ // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
+ // https://github.com/apache/iotdb/discussions/3888
+ query.append("select * from ");
+ query = client.addModelPath(query, BrowserErrorLogRecord.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ if (StringUtil.isNotEmpty(serviceId)) {
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
+ }
+ query = client.addQueryIndexValue(BrowserErrorLogRecord.INDEX_NAME, query, indexAndValueMap);
+
+ StringBuilder where = new StringBuilder(" where ");
+ if (startSecondTB != 0 && endSecondTB != 0) {
+ where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startSecondTB)).append(" and ");
+ where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endSecondTB)).append(" and ");
+ }
+ if (StringUtil.isNotEmpty(serviceVersionId)) {
+ where.append(BrowserErrorLogRecord.SERVICE_VERSION_ID).append(" = \"").append(serviceVersionId).append("\"").append(" and ");
+ }
+ if (StringUtil.isNotEmpty(pagePathId)) {
+ where.append(BrowserErrorLogRecord.PAGE_PATH_ID).append(" = \"").append(pagePathId).append("\"").append(" and ");
+ }
+ if (Objects.nonNull(category)) {
+ where.append(BrowserErrorLogRecord.ERROR_CATEGORY).append(" = ").append(category.getValue()).append(" and ");
+ }
+ if (where.length() > 7) {
+ int length = where.length();
+ where.delete(length - 5, length);
+ query.append(where);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(BrowserErrorLogRecord.INDEX_NAME, query.toString(), storageBuilder);
+ List<BrowserErrorLogRecord> browserErrorLogRecordList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> browserErrorLogRecordList.add((BrowserErrorLogRecord) storageData));
+ // resort by self, because of the select query result order by time.
+ browserErrorLogRecordList.sort((BrowserErrorLogRecord b1, BrowserErrorLogRecord b2) -> Long.compare(b2.getTimestamp(), b1.getTimestamp()));
+ BrowserErrorLogs logs = new BrowserErrorLogs();
+ int limitCount = 0;
+ for (int i = from; i < browserErrorLogRecordList.size(); i++) {
+ if (limitCount < limit) {
+ limitCount++;
+ BrowserErrorLogRecord record = browserErrorLogRecordList.get(i);
+ if (CollectionUtils.isNotEmpty(record.getDataBinary())) {
+ BrowserErrorLog log = iotdbParserDataBinary(record.getDataBinary());
+ logs.getLogs().add(log);
+ }
+ }
+ }
+ logs.setTotal(storageDataList.size());
+ return logs;
+ }
+
+ private BrowserErrorLog iotdbParserDataBinary(byte[] dataBinaryBase64) {
+ try {
+ BrowserErrorLog log = new BrowserErrorLog();
+ org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
+ .parseFrom(dataBinaryBase64);
+ log.setService(browserErrorLog.getService());
+ log.setServiceVersion(browserErrorLog.getServiceVersion());
+ log.setTime(browserErrorLog.getTime());
+ log.setPagePath(browserErrorLog.getPagePath());
+ log.setCategory(ErrorCategory.valueOf(browserErrorLog.getCategory().name().toUpperCase()));
+ log.setGrade(browserErrorLog.getGrade());
+ log.setMessage(browserErrorLog.getMessage());
+ log.setLine(browserErrorLog.getLine());
+ log.setCol(browserErrorLog.getCol());
+ log.setStack(browserErrorLog.getStack());
+ log.setErrorUrl(browserErrorLog.getErrorUrl());
+ log.setFirstReportedError(browserErrorLog.getFirstReportedError());
+
+ return log;
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEventQueryDAO.java
new file mode 100644
index 0000000..061dfca
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEventQueryDAO.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import com.google.common.base.Strings;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.query.PaginationUtils;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
+import org.apache.skywalking.oap.server.core.query.type.event.EventType;
+import org.apache.skywalking.oap.server.core.query.type.event.Events;
+import org.apache.skywalking.oap.server.core.query.type.event.Source;
+import org.apache.skywalking.oap.server.core.source.Event;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBEventQueryDAO implements IEventQueryDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<Event> storageBuilder = new Event.Builder();
+
+ @Override
+ public Events queryEvents(EventQueryCondition condition) throws Exception {
+ // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
+ // https://github.com/apache/iotdb/discussions/3888
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, Event.INDEX_NAME);
+ query = client.addQueryAsterisk(Event.INDEX_NAME, query);
+ StringBuilder where = whereSQL(condition);
+ if (where.length() > 0) {
+ query.append(" where ").append(where);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(Event.INDEX_NAME, query.toString(), storageBuilder);
+ final Events events = new Events();
+ int limitCount = 0;
+ PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
+ for (int i = page.getFrom(); i < storageDataList.size(); i++) {
+ if (limitCount < page.getLimit()) {
+ limitCount++;
+ Event event = (Event) storageDataList.get(i);
+ events.getEvents().add(parseEvent(event));
+ }
+ }
+ events.setTotal(storageDataList.size());
+ // resort by self, because of the select query result order by time.
+ final Order order = Objects.isNull(condition.getOrder()) ? Order.DES : condition.getOrder();
+ if (Order.DES.equals(order)) {
+ events.getEvents().sort(
+ (org.apache.skywalking.oap.server.core.query.type.event.Event e1,
+ org.apache.skywalking.oap.server.core.query.type.event.Event e2)
+ -> Long.compare(e2.getStartTime(), e1.getStartTime()));
+ } else {
+ events.getEvents().sort(
+ Comparator.comparingLong(org.apache.skywalking.oap.server.core.query.type.event.Event::getStartTime));
+ }
+ return events;
+ }
+
+ @Override
+ public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
+ // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
+ // https://github.com/apache/iotdb/discussions/3888
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, Event.INDEX_NAME);
+ query = client.addQueryAsterisk(Event.INDEX_NAME, query);
+ StringBuilder where = whereSQL(conditionList);
+ if (where.length() > 0) {
+ query.append(" where ").append(where);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(Event.INDEX_NAME, query.toString(), storageBuilder);
+ final Events events = new Events();
+ EventQueryCondition condition = conditionList.get(0);
+ int limitCount = 0;
+ PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
+ for (int i = page.getFrom(); i < storageDataList.size(); i++) {
+ if (limitCount < page.getLimit()) {
+ limitCount++;
+ Event event = (Event) storageDataList.get(i);
+ events.getEvents().add(parseEvent(event));
+ }
+ }
+ events.setTotal(storageDataList.size());
+ // resort by self, because of the select query result order by time.
+ final Order order = Objects.isNull(condition.getOrder()) ? Order.DES : condition.getOrder();
+ if (Order.DES.equals(order)) {
+ events.getEvents().sort(
+ (org.apache.skywalking.oap.server.core.query.type.event.Event e1,
+ org.apache.skywalking.oap.server.core.query.type.event.Event e2)
+ -> Long.compare(e2.getStartTime(), e1.getStartTime()));
+ } else {
+ events.getEvents().sort(
+ Comparator.comparingLong(org.apache.skywalking.oap.server.core.query.type.event.Event::getStartTime));
+ }
+ return events;
+ }
+
+ private StringBuilder whereSQL(final EventQueryCondition condition) {
+ StringBuilder where = new StringBuilder();
+ if (!Strings.isNullOrEmpty(condition.getUuid())) {
+ where.append(Event.UUID).append(" = \"").append(condition.getUuid()).append("\"").append(" and ");
+ }
+ final Source source = condition.getSource();
+ if (source != null) {
+ if (!Strings.isNullOrEmpty(source.getService())) {
+ where.append(Event.SERVICE).append(" = \"").append(source.getService()).append("\"").append(" and ");
+ }
+ if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
+ where.append(Event.SERVICE_INSTANCE).append(" = \"").append(source.getServiceInstance()).append("\"").append(" and ");
+ }
+ if (!Strings.isNullOrEmpty(source.getEndpoint())) {
+ where.append(Event.ENDPOINT).append(" = \"").append(source.getEndpoint()).append("\"").append(" and ");
+ }
+ }
+ if (!Strings.isNullOrEmpty(condition.getName())) {
+ where.append(Event.NAME).append(" = \"").append(condition.getName()).append("\"").append(" and ");
+ }
+ if (condition.getType() != null) {
+ where.append(Event.TYPE).append(" = \"").append(condition.getType().name()).append("\"").append(" and ");
+ }
+ final Duration time = condition.getTime();
+ if (time != null) {
+ if (time.getStartTimestamp() > 0) {
+ where.append(Event.START_TIME).append(" > ").append(time.getStartTimestamp()).append(" and ");
+ }
+ if (time.getEndTimestamp() > 0) {
+ where.append(Event.END_TIME).append(" < ").append(time.getEndTimestamp()).append(" and ");
+ }
+ }
+ if (where.length() > 0) {
+ int length = where.length();
+ where.delete(length - 5, length);
+ return where;
+ }
+ return new StringBuilder();
+ }
+
+ private StringBuilder whereSQL(final List<EventQueryCondition> conditions) {
+ StringBuilder where = new StringBuilder();
+ boolean isFirstCondition = true;
+ for (EventQueryCondition condition : conditions) {
+ StringBuilder subWhere = whereSQL(condition);
+ if (subWhere.length() > 0) {
+ if (isFirstCondition) {
+ where.append("(").append(subWhere).append(")");
+ isFirstCondition = false;
+ } else {
+ where.append(" or (").append(subWhere).append(")");
+
+ }
+ }
+ }
+ return where;
+ }
+
+ private org.apache.skywalking.oap.server.core.query.type.event.Event parseEvent(final Event event) {
+ final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
+ resultEvent.setUuid(event.getUuid());
+ resultEvent.setSource(new Source(event.getService(), event.getServiceInstance(), event.getEndpoint()));
+ resultEvent.setName(event.getName());
+ resultEvent.setType(EventType.parse(event.getType()));
+ resultEvent.setMessage(event.getMessage());
+ resultEvent.setParameters(event.getParameters());
+ resultEvent.setStartTime(event.getStartTime());
+ resultEvent.setEndTime(event.getEndTime());
+ return resultEvent;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBLogQueryDAO.java
new file mode 100644
index 0000000..56f0f9f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBLogQueryDAO.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.logging.v3.LogTags;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.ContentType;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
+import org.apache.skywalking.oap.server.core.query.type.Log;
+import org.apache.skywalking.oap.server.core.query.type.Logs;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBLogQueryDAO implements ILogQueryDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<LogRecord> storageBuilder = new LogRecord.Builder();
+
+ @Override
+ public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId,
+ TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB,
+ long endTB, List<Tag> tags, List<String> keywordsOfContent,
+ List<String> excludingKeywordsOfContent) throws IOException {
+ StringBuilder query = new StringBuilder();
+ // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
+ // https://github.com/apache/iotdb/discussions/3888
+ query.append("select * from ");
+ query = client.addModelPath(query, LogRecord.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ if (StringUtil.isNotEmpty(serviceId)) {
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
+ }
+ if (Objects.nonNull(relatedTrace) && StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
+ indexAndValueMap.put(IoTDBIndexes.TRACE_ID_IDX, relatedTrace.getTraceId());
+ }
+ query = client.addQueryIndexValue(LogRecord.INDEX_NAME, query, indexAndValueMap);
+
+ StringBuilder where = new StringBuilder(" where ");
+ if (startTB != 0 && endTB != 0) {
+ where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB)).append(" and ");
+ where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB)).append(" and ");
+ }
+ if (StringUtil.isNotEmpty(serviceInstanceId)) {
+ where.append(AbstractLogRecord.SERVICE_INSTANCE_ID).append(" = \"").append(serviceInstanceId).append("\"").append(" and ");
+ }
+ if (StringUtil.isNotEmpty(endpointId)) {
+ where.append(AbstractLogRecord.ENDPOINT_ID).append(" = \"").append(endpointId).append("\"").append(" and ");
+ }
+ if (Objects.nonNull(relatedTrace)) {
+ if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
+ where.append(AbstractLogRecord.TRACE_SEGMENT_ID).append(" = \"").append(relatedTrace.getSegmentId()).append("\"").append(" and ");
+ }
+ if (Objects.nonNull(relatedTrace.getSpanId())) {
+ where.append(AbstractLogRecord.SPAN_ID).append(" = ").append(relatedTrace.getSpanId()).append(" and ");
+ }
+ }
+ if (CollectionUtils.isNotEmpty(tags)) {
+ for (final Tag tag : tags) {
+ where.append(tag.getKey()).append(" = \"").append(tag.getValue()).append("\"").append(" and ");
+ }
+ }
+ if (where.length() > 7) {
+ int length = where.length();
+ where.delete(length - 5, length);
+ query.append(where);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ Logs logs = new Logs();
+ List<? super StorageData> storageDataList = client.filterQuery(LogRecord.INDEX_NAME, query.toString(), storageBuilder);
+ int limitCount = 0;
+ for (int i = from; i < storageDataList.size(); i++) {
+ if (limitCount < limit) {
+ limitCount++;
+ LogRecord logRecord = (LogRecord) storageDataList.get(i);
+ Log log = new Log();
+ log.setServiceId(logRecord.getServiceId());
+ log.setServiceInstanceId(logRecord.getServiceInstanceId());
+ log.setEndpointId(logRecord.getEndpointId());
+ log.setTraceId(logRecord.getTraceId());
+ log.setTimestamp(logRecord.getTimestamp());
+ log.setContentType(ContentType.instanceOf(logRecord.getContentType()));
+ log.setContent(logRecord.getContent());
+ if (CollectionUtils.isNotEmpty(logRecord.getTagsRawData())) {
+ iotdbParserDataBinary(logRecord.getTagsRawData(), log.getTags());
+ }
+ logs.getLogs().add(log);
+ }
+ }
+ logs.setTotal(storageDataList.size());
+ // resort by self, because of the select query result order by time.
+ if (Order.DES.equals(queryOrder)) {
+ logs.getLogs().sort((Log l1, Log l2) -> Long.compare(l2.getTimestamp(), l1.getTimestamp()));
+ } else {
+ logs.getLogs().sort(Comparator.comparingLong(Log::getTimestamp));
+ }
+ return logs;
+ }
+
+ private void iotdbParserDataBinary(byte[] tagsRawData, List<KeyValue> tags) {
+ try {
+ LogTags logTags = LogTags.parseFrom(tagsRawData);
+ logTags.getDataList().forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java
new file mode 100644
index 0000000..46a40f6
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import com.google.common.base.Strings;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.enumeration.Language;
+import org.apache.skywalking.oap.server.core.query.type.Attribute;
+import org.apache.skywalking.oap.server.core.query.type.Database;
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBMetadataQueryDAO implements IMetadataQueryDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<ServiceTraffic> serviceBuilder = new ServiceTraffic.Builder();
+ private final StorageHashMapBuilder<EndpointTraffic> endpointBuilder = new EndpointTraffic.Builder();
+ private final StorageHashMapBuilder<InstanceTraffic> instanceBuilder = new InstanceTraffic.Builder();
+
+ @Override
+ public List<Service> getAllServices(String group) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(NodeType.Normal.value()));
+ if (StringUtil.isNotEmpty(group)) {
+ indexAndValueMap.put(IoTDBIndexes.GROUP_IDX, group);
+ }
+ query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
+ List<Service> serviceList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> serviceList.add(buildService((ServiceTraffic) storageData)));
+ return serviceList;
+ }
+
+ @Override
+ public List<Service> getAllBrowserServices() throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(NodeType.Browser.value()));
+ query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
+ List<Service> serviceList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> serviceList.add(buildService((ServiceTraffic) storageData)));
+ return serviceList;
+ }
+
+ @Override
+ public List<Database> getAllDatabases() throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(NodeType.Database.value()));
+ query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
+ List<Database> databaseList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> {
+ ServiceTraffic serviceTraffic = (ServiceTraffic) storageData;
+ Database database = new Database();
+ database.setId(serviceTraffic.id());
+ database.setName(serviceTraffic.getName());
+ databaseList.add(database);
+ });
+ return databaseList;
+ }
+
+ @Override
+ public List<Service> searchServices(final NodeType nodeType, final String keyword) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(nodeType.value()));
+ query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
+ if (!Strings.isNullOrEmpty(keyword)) {
+ query.append(" where ").append(ServiceTraffic.NAME).append(" like '%").append(keyword).append("%'");
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
+ List<Service> serviceList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> serviceList.add(buildService((ServiceTraffic) storageData)));
+ return serviceList;
+ }
+
+ @Override
+ public Service searchService(final NodeType nodeType, final String serviceCode) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(nodeType.value()));
+ query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
+ query.append(" where ").append(ServiceTraffic.NAME).append(" = \"").append(serviceCode).append("\"")
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
+ if (storageDataList.isEmpty()) {
+ return null;
+ }
+ return buildService((ServiceTraffic) storageDataList.get(0));
+ }
+
+ @Override
+ public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, EndpointTraffic.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
+ query = client.addQueryIndexValue(EndpointTraffic.INDEX_NAME, query, indexAndValueMap);
+ if (!Strings.isNullOrEmpty(keyword)) {
+ query.append(" where ").append(EndpointTraffic.NAME).append(" like '%").append(keyword).append("%'");
+ }
+ query.append(" limit ").append(limit).append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(EndpointTraffic.INDEX_NAME, query.toString(), endpointBuilder);
+ List<Endpoint> endpointList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> {
+ EndpointTraffic endpointTraffic = (EndpointTraffic) storageData;
+ Endpoint endpoint = new Endpoint();
+ endpoint.setId(endpointTraffic.id());
+ endpoint.setName(endpointTraffic.getName());
+ endpointList.add(endpoint);
+ });
+ return endpointList;
+ }
+
+ @Override
+ public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
+ final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, InstanceTraffic.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
+ query = client.addQueryIndexValue(InstanceTraffic.INDEX_NAME, query, indexAndValueMap);
+ query.append(" where ").append(InstanceTraffic.LAST_PING_TIME_BUCKET).append(" >= ").append(minuteTimeBucket)
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(InstanceTraffic.INDEX_NAME, query.toString(), instanceBuilder);
+ List<ServiceInstance> serviceInstanceList = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> {
+ InstanceTraffic instanceTraffic = (InstanceTraffic) storageData;
+ if (instanceTraffic.getName() == null) {
+ instanceTraffic.setName("");
+ }
+ ServiceInstance serviceInstance = new ServiceInstance();
+ serviceInstance.setId(instanceTraffic.id());
+ serviceInstance.setName(instanceTraffic.getName());
+ serviceInstance.setInstanceUUID(serviceInstance.getId());
+
+ JsonObject properties = instanceTraffic.getProperties();
+ if (properties != null) {
+ for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
+ String key = property.getKey();
+ String value = property.getValue().getAsString();
+ if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
+ serviceInstance.setLanguage(Language.value(value));
+ } else {
+ serviceInstance.getAttributes().add(new Attribute(key, value));
+ }
+ }
+ } else {
+ serviceInstance.setLanguage(Language.UNKNOWN);
+ }
+ serviceInstanceList.add(serviceInstance);
+ });
+ return serviceInstanceList;
+ }
+
+ private Service buildService(ServiceTraffic serviceTraffic) {
+ Service service = new Service();
+ service.setId(serviceTraffic.id());
+ service.setName(serviceTraffic.getName());
+ service.setGroup(serviceTraffic.getGroup());
+ return service;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetricsQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetricsQueryDAO.java
new file mode 100644
index 0000000..eff62d1
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetricsQueryDAO.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.query.PointOfTime;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.MetricsCondition;
+import org.apache.skywalking.oap.server.core.query.sql.Function;
+import org.apache.skywalking.oap.server.core.query.type.HeatMap;
+import org.apache.skywalking.oap.server.core.query.type.IntValues;
+import org.apache.skywalking.oap.server.core.query.type.KVInt;
+import org.apache.skywalking.oap.server.core.query.type.MetricsValues;
+import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
+import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBMetricsQueryDAO implements IMetricsQueryDAO {
+ private final IoTDBClient client;
+
+ @Override
+ public long readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException {
+ final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+ final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
+ if (function == Function.Latest) {
+ return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
+ }
+
+ StringBuilder query = new StringBuilder();
+ String op;
+ if (function == Function.Avg) {
+ op = "avg";
+ } else {
+ op = "sum";
+ }
+ query.append(String.format("select %s(%s) from ", op, valueColumnName));
+ query = client.addModelPath(query, condition.getName());
+ final String entityId = condition.getEntity().buildId();
+ if (entityId != null) {
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.ENTITY_ID_IDX, entityId);
+ query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
+ } else {
+ query = client.addQueryAsterisk(condition.getName(), query);
+ }
+ query.append(" where ").append(String.format("%s >= %s and %s <= %s",
+ IoTDBClient.TIME, duration.getStartTimestamp(), IoTDBClient.TIME, duration.getEndTimestamp()))
+ .append(" group by level = 3");
+
+ List<Double> results = client.queryWithAgg(query.toString());
+ if (results.size() > 0) {
+ double result = results.get(0);
+ return (long) result;
+ } else {
+ return defaultValue;
+ }
+ }
+
+ @Override
+ public MetricsValues readMetricsValues(final MetricsCondition condition,
+ final String valueColumnName,
+ final Duration duration) throws IOException {
+ final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
+ final List<String> ids = new ArrayList<>(pointOfTimes.size());
+ pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
+
+ StringBuilder query = new StringBuilder();
+ query.append("select ").append(valueColumnName).append(" from ");
+ for (String id : ids) {
+ query = client.addModelPath(query, condition.getName());
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.ID_IDX, id);
+ query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
+ query.append(", ");
+ }
+ String queryString = query.toString();
+ if (ids.size() > 0) {
+ queryString = queryString.substring(0, queryString.lastIndexOf(","));
+ }
+ queryString += IoTDBClient.ALIGN_BY_DEVICE;
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ MetricsValues metricsValues = new MetricsValues();
+ // Label is null, because in readMetricsValues, no label parameter.
+ final IntValues intValues = metricsValues.getValues();
+ try {
+ wrapper = sessionPool.executeQueryStatement(queryString);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", queryString, wrapper.getColumnNames());
+ }
+
+ while (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
+ String id = client.layerName2IndexValue(layerNames[1]);
+
+ Field valueField = fields.get(1);
+ TSDataType valueType = valueField.getDataType();
+ long value = 0;
+ if (TSDataType.INT32.equals(valueType)) {
+ value = valueField.getIntV();
+ } else if (TSDataType.INT64.equals(valueType)) {
+ value = valueField.getLongV();
+ }
+
+ KVInt kv = new KVInt();
+ kv.setId(id);
+ kv.setValue(value);
+ intValues.addKVInt(kv);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+ metricsValues.setValues(Util.sortValues(intValues, ids, ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName())));
+ return metricsValues;
+ }
+
+ @Override
+ public List<MetricsValues> readLabeledMetricsValues(MetricsCondition condition, String valueColumnName, List<String> labels, Duration duration) throws IOException {
+ final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
+ List<String> ids = new ArrayList<>(pointOfTimes.size());
+ pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
+
+ StringBuilder query = new StringBuilder();
+ query.append("select ").append(valueColumnName).append(" from ");
+ for (String id : ids) {
+ query = client.addModelPath(query, condition.getName());
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.ID_IDX, id);
+ query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
+ query.append(", ");
+ }
+ String queryString = query.toString();
+ if (ids.size() > 0) {
+ queryString = queryString.substring(0, queryString.lastIndexOf(","));
+ }
+ queryString += IoTDBClient.ALIGN_BY_DEVICE;
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ Map<String, DataTable> idMap = new HashMap<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(queryString);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", queryString, wrapper.getColumnNames());
+ }
+
+ while (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
+ String id = client.layerName2IndexValue(layerNames[1]);
+
+ DataTable multipleValues = new DataTable(5);
+ multipleValues.toObject(fields.get(1).getStringValue());
+ idMap.put(id, multipleValues);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+ return Util.composeLabelValue(condition, labels, ids, idMap);
+ }
+
+ @Override
+ public HeatMap readHeatMap(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException {
+ final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
+ List<String> ids = new ArrayList<>(pointOfTimes.size());
+ pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
+
+ StringBuilder query = new StringBuilder();
+ query.append("select ").append(valueColumnName).append(" from ");
+ for (String id : ids) {
+ query = client.addModelPath(query, condition.getName());
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.ID_IDX, id);
+ query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
+ query.append(", ");
+ }
+ String queryString = query.toString();
+ if (ids.size() > 0) {
+ queryString = queryString.substring(0, queryString.lastIndexOf(","));
+ }
+ queryString += IoTDBClient.ALIGN_BY_DEVICE;
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ HeatMap heatMap = new HeatMap();
+ final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
+ try {
+ wrapper = sessionPool.executeQueryStatement(queryString);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", queryString, wrapper.getColumnNames());
+ }
+
+ while (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
+ String id = client.layerName2IndexValue(layerNames[1]);
+
+ heatMap.buildColumn(id, fields.get(1).getStringValue(), defaultValue);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+ heatMap.fixMissingColumns(ids, defaultValue);
+ return heatMap;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopNRecordsQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopNRecordsQueryDAO.java
new file mode 100644
index 0000000..da2f6b3
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopNRecordsQueryDAO.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
+import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
+import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBTableMetaInfo;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBTopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
+ private final IoTDBClient client;
+
+ @Override
+ public List<SelectedRecord> readSampledRecords(TopNCondition condition, String valueColumnName, Duration duration) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select ").append(TopN.STATEMENT).append(", ").append(valueColumnName)
+ .append(" from ");
+ query = client.addModelPath(query, condition.getName());
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ if (StringUtil.isNotEmpty(condition.getParentService())) {
+ final String serviceId = IDManager.ServiceID.buildId(condition.getParentService(), condition.isNormal());
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
+ }
+ query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
+
+ StringBuilder where = new StringBuilder(" where ");
+ if (Objects.nonNull(duration)) {
+ where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(duration.getStartTimeBucketInSec())).append(" and ");
+ where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(duration.getEndTimeBucketInSec()));
+ }
+ if (where.length() > 7) {
+ query.append(where);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ List<SelectedRecord> records = new ArrayList<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(query.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
+ }
+
+ List<String> indexes = IoTDBTableMetaInfo.get(condition.getName()).getIndexes();
+ int traceIdIdx = indexes.indexOf(IoTDBIndexes.TRACE_ID_IDX);
+
+ while (wrapper.hasNext()) {
+ SelectedRecord record = new SelectedRecord();
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ record.setName(fields.get(1).getStringValue());
+
+ String traceId = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"")[traceIdIdx + 1];
+ traceId = client.layerName2IndexValue(traceId);
+ record.setRefId(traceId);
+
+ record.setId(record.getRefId());
+ record.setValue(String.valueOf(fields.get(2).getObjectValue(fields.get(2).getDataType())));
+ records.add(record);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+
+ // resort by self, because of the select query result order by time.
+ if (Order.DES.equals(condition.getOrder())) {
+ records.sort((SelectedRecord s1, SelectedRecord s2) ->
+ Long.compare(Long.parseLong(s2.getValue()), Long.parseLong(s1.getValue())));
+ } else {
+ records.sort(Comparator.comparingLong((SelectedRecord s) -> Long.parseLong(s.getValue())));
+ }
+ return records;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopologyQueryDAO.java
new file mode 100644
index 0000000..dd343b8
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopologyQueryDAO.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionDataSetWrapper;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
+import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
+import org.apache.skywalking.oap.server.core.query.type.Call;
+import org.apache.skywalking.oap.server.core.source.DetectPoint;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+
+@Slf4j
+@RequiredArgsConstructor
+public class IoTDBTopologyQueryDAO implements ITopologyQueryDAO {
+ private final IoTDBClient client;
+
+ @Override
+ public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB,
+ List<String> serviceIds) throws IOException {
+ return loadServiceCalls(
+ ServiceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
+ ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
+ ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
+ serviceIds, DetectPoint.SERVER);
+ }
+
+ @Override
+ public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB,
+ List<String> serviceIds) throws IOException {
+ return loadServiceCalls(
+ ServiceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
+ ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
+ ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
+ serviceIds, DetectPoint.CLIENT);
+ }
+
+ @Override
+ public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB) throws IOException {
+ return loadServiceCalls(
+ ServiceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
+ ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
+ ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
+ new ArrayList<>(0), DetectPoint.SERVER);
+ }
+
+ @Override
+ public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB) throws IOException {
+ return loadServiceCalls(
+ ServiceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
+ ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
+ ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
+ new ArrayList<>(0), DetectPoint.CLIENT);
+ }
+
+ @Override
+ public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId,
+ String serverServiceId,
+ long startTB, long endTB) throws IOException {
+ return loadServiceInstanceCalls(
+ ServiceInstanceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
+ ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
+ ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
+ clientServiceId, serverServiceId, DetectPoint.SERVER);
+ }
+
+ @Override
+ public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId,
+ String serverServiceId,
+ long startTB, long endTB) throws IOException {
+ return loadServiceInstanceCalls(
+ ServiceInstanceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
+ ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
+ ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
+ clientServiceId, serverServiceId, DetectPoint.CLIENT);
+ }
+
+ @Override
+ public List<Call.CallDetail> loadEndpointRelation(long startTB, long endTB, String destEndpointId) throws IOException {
+ List<Call.CallDetail> calls = loadEndpointFromSide(
+ EndpointRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
+ EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
+ EndpointRelationServerSideMetrics.DEST_ENDPOINT,
+ destEndpointId, false);
+ calls.addAll(loadEndpointFromSide(
+ EndpointRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
+ EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
+ EndpointRelationServerSideMetrics.DEST_ENDPOINT,
+ destEndpointId, true));
+ return calls;
+ }
+
+ private List<Call.CallDetail> loadServiceCalls(String tableName, long startTB, long endTB,
+ String sourceCName, String destCName,
+ List<String> serviceIds, DetectPoint detectPoint) throws IOException {
+ // This method don't use "group by" like other storage plugin.
+ StringBuilder query = new StringBuilder();
+ query.append("select ").append(ServiceRelationServerSideMetrics.COMPONENT_ID).append(" from ");
+ query = client.addModelPath(query, tableName);
+ query = client.addQueryAsterisk(tableName, query);
+ query.append(" where ").append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB))
+ .append(" and ").append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB));
+ if (serviceIds.size() > 0) {
+ query.append(" and (");
+ for (int i = 0; i < serviceIds.size(); i++) {
+ query.append(sourceCName).append(" = \"").append(serviceIds.get(i))
+ .append("\" or ")
+ .append(destCName).append(" = \"").append(serviceIds.get(i)).append("\"");
+ if (i != serviceIds.size() - 1) {
+ query.append(" or ");
+ }
+ }
+ query.append(")");
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ List<Call.CallDetail> calls = new ArrayList<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(query.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
+ }
+
+ while (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ Call.CallDetail call = new Call.CallDetail();
+ String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
+ String entityId = client.layerName2IndexValue(layerNames[2]);
+ final int componentId = fields.get(1).getIntV();
+ call.buildFromServiceRelation(entityId, componentId, detectPoint);
+ calls.add(call);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+ return calls;
+ }
+
+ private List<Call.CallDetail> loadServiceInstanceCalls(String tableName, long startTB, long endTB,
+ String sourceCName, String descCName,
+ String sourceServiceId, String destServiceId,
+ DetectPoint detectPoint) throws IOException {
+ // This method don't use "group by" like other storage plugin.
+ StringBuilder query = new StringBuilder();
+ query.append("select ").append(ServiceInstanceRelationServerSideMetrics.COMPONENT_ID).append(" from ");
+ query = client.addModelPath(query, tableName);
+ query = client.addQueryAsterisk(tableName, query);
+ query.append(" where ").append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB))
+ .append(" and ").append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB));
+ query.append(" and ((").append(sourceCName).append(" = \"").append(sourceServiceId).append("\"")
+ .append(" and ").append(descCName).append(" = \"").append(destServiceId).append("\"")
+ .append(") or (").append(sourceCName).append(" = \"").append(destServiceId).append("\"")
+ .append(" and ").append(descCName).append(" = \"").append(sourceServiceId).append(" \"))")
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ List<Call.CallDetail> calls = new ArrayList<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(query.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
+ }
+ while (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ Call.CallDetail call = new Call.CallDetail();
+ String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
+ String entityId = client.layerName2IndexValue(layerNames[2]);
+ final int componentId = fields.get(1).getIntV();
+ call.buildFromInstanceRelation(entityId, componentId, detectPoint);
+ calls.add(call);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+ return calls;
+ }
+
+ private List<Call.CallDetail> loadEndpointFromSide(String tableName, long startTB, long endTB,
+ String sourceCName, String destCName,
+ String id, boolean isSourceId) throws IOException {
+ // This method don't use "group by" like other storage plugin.
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, tableName);
+ query = client.addQueryAsterisk(tableName, query);
+ query.append(" where ").append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB))
+ .append(" and ").append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB));
+ query.append(" and ").append(isSourceId ? sourceCName : destCName).append(" = \"").append(id).append("\"")
+ .append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ SessionPool sessionPool = client.getSessionPool();
+ SessionDataSetWrapper wrapper = null;
+ List<Call.CallDetail> calls = new ArrayList<>();
+ try {
+ wrapper = sessionPool.executeQueryStatement(query.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
+ }
+ while (wrapper.hasNext()) {
+ RowRecord rowRecord = wrapper.next();
+ List<Field> fields = rowRecord.getFields();
+ Call.CallDetail call = new Call.CallDetail();
+ String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
+ String entityId = client.layerName2IndexValue(layerNames[2]);
+ call.buildFromEndpointRelation(entityId, DetectPoint.SERVER);
+ calls.add(call);
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ if (wrapper != null) {
+ sessionPool.closeResultSet(wrapper);
+ }
+ }
+ return calls;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java
new file mode 100644
index 0000000..d88bdf7
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
+
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
+import org.apache.skywalking.oap.server.core.query.type.Span;
+import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
+import org.apache.skywalking.oap.server.core.query.type.TraceState;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
+import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
+
+@RequiredArgsConstructor
+public class IoTDBTraceQueryDAO implements ITraceQueryDAO {
+ private final IoTDBClient client;
+ private final StorageHashMapBuilder<SegmentRecord> storageBuilder = new SegmentRecord.Builder();
+
+ @Override
+ public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
+ String serviceId, String serviceInstanceId,
+ String endpointId, String traceId, int limit, int from,
+ TraceState traceState, QueryOrder queryOrder, List<Tag> tags)
+ throws IOException {
+ StringBuilder query = new StringBuilder();
+ // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
+ // https://github.com/apache/iotdb/discussions/3888
+ query.append("select * from ");
+ query = client.addModelPath(query, SegmentRecord.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ if (StringUtil.isNotEmpty(serviceId)) {
+ indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
+ }
+ if (!Strings.isNullOrEmpty(traceId)) {
+ indexAndValueMap.put(IoTDBIndexes.TRACE_ID_IDX, traceId);
+ }
+ query = client.addQueryIndexValue(SegmentRecord.INDEX_NAME, query, indexAndValueMap);
+
+ StringBuilder where = new StringBuilder(" where ");
+ if (startSecondTB != 0 && endSecondTB != 0) {
+ where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startSecondTB)).append(" and ");
+ where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endSecondTB)).append(" and ");
+ }
+ if (minDuration != 0) {
+ where.append(SegmentRecord.LATENCY).append(" >= ").append(minDuration).append(" and ");
+ }
+ if (maxDuration != 0) {
+ where.append(SegmentRecord.LATENCY).append(" <= ").append(maxDuration).append(" and ");
+ }
+ if (StringUtil.isNotEmpty(serviceInstanceId)) {
+ where.append(SegmentRecord.SERVICE_INSTANCE_ID).append(" = \"").append(serviceInstanceId).append("\"").append(" and ");
+ }
+ if (!Strings.isNullOrEmpty(endpointId)) {
+ where.append(SegmentRecord.ENDPOINT_ID).append(" = \"").append(endpointId).append("\"").append(" and ");
+ }
+ if (CollectionUtils.isNotEmpty(tags)) {
+ for (final Tag tag : tags) {
+ where.append(tag.getKey()).append(" = \"").append(tag.getValue()).append("\"").append(" and ");
+ }
+ }
+ switch (traceState) {
+ case ERROR:
+ where.append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.TRUE).append(" and ");
+ break;
+ case SUCCESS:
+ where.append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.FALSE).append(" and ");
+ break;
+ }
+ if (where.length() > 7) {
+ int length = where.length();
+ where.delete(length - 5, length);
+ query.append(where);
+ }
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ TraceBrief traceBrief = new TraceBrief();
+ List<? super StorageData> storageDataList = client.filterQuery(SegmentRecord.INDEX_NAME, query.toString(), storageBuilder);
+ int limitCount = 0;
+ for (int i = from; i < storageDataList.size(); i++) {
+ if (limitCount < limit) {
+ limitCount++;
+ SegmentRecord segmentRecord = (SegmentRecord) storageDataList.get(i);
+ BasicTrace basicTrace = new BasicTrace();
+ basicTrace.setSegmentId(segmentRecord.getSegmentId());
+ basicTrace.setStart(String.valueOf(segmentRecord.getStartTime()));
+ basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(segmentRecord.getEndpointId()).getEndpointName());
+ basicTrace.setDuration(segmentRecord.getLatency());
+ basicTrace.setError(BooleanUtils.valueToBoolean(segmentRecord.getIsError()));
+ basicTrace.getTraceIds().add(segmentRecord.getTraceId());
+ traceBrief.getTraces().add(basicTrace);
+ }
+ }
+ traceBrief.setTotal(storageDataList.size());
+ // resort by self, because of the select query result order by time.
+ switch (queryOrder) {
+ case BY_START_TIME:
+ traceBrief.getTraces().sort((BasicTrace b1, BasicTrace b2) ->
+ Long.compare(Long.parseLong(b2.getStart()), Long.parseLong(b1.getStart())));
+ break;
+ case BY_DURATION:
+ traceBrief.getTraces().sort((BasicTrace b1, BasicTrace b2) ->
+ Integer.compare(b2.getDuration(), b1.getDuration()));
+ break;
+ }
+ return traceBrief;
+ }
+
+ @Override
+ public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
+ StringBuilder query = new StringBuilder();
+ query.append("select * from ");
+ query = client.addModelPath(query, SegmentRecord.INDEX_NAME);
+ Map<String, String> indexAndValueMap = new HashMap<>();
+ indexAndValueMap.put(IoTDBIndexes.TRACE_ID_IDX, traceId);
+ query = client.addQueryIndexValue(SegmentRecord.INDEX_NAME, query, indexAndValueMap);
+ query.append(IoTDBClient.ALIGN_BY_DEVICE);
+
+ List<? super StorageData> storageDataList = client.filterQuery(SegmentRecord.INDEX_NAME, query.toString(), storageBuilder);
+ List<SegmentRecord> segmentRecords = new ArrayList<>(storageDataList.size());
+ storageDataList.forEach(storageData -> segmentRecords.add((SegmentRecord) storageData));
+ return segmentRecords;
+ }
+
+ @Override
+ public List<Span> doFlexibleTraceQuery(String traceId) {
+ return Collections.emptyList();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000..3cdccc3
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBStorageProvider
\ No newline at end of file
diff --git a/test/e2e-v2/cases/alarm/iotdb/docker-compose.yml b/test/e2e-v2/cases/alarm/iotdb/docker-compose.yml
new file mode 100644
index 0000000..be0a45d
--- /dev/null
+++ b/test/e2e-v2/cases/alarm/iotdb/docker-compose.yml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ iotdb:
+ image: apache/iotdb:0.12.3-node
+ expose:
+ - 6667
+ networks:
+ - e2e
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_STORAGE: iotdb
+ SW_SEARCHABLE_ALARM_TAG_KEYS: level,receivers
+ ports:
+ - 12800
+ depends_on:
+ iotdb:
+ condition: service_healthy
+ volumes:
+ - ../alarm-settings.yml:/skywalking/config/alarm-settings.yml
+
+ provider:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: provider
+ ports:
+ - 9090
+ depends_on:
+ oap:
+ condition: service_healthy
+
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/alarm/iotdb/e2e.yaml b/test/e2e-v2/cases/alarm/iotdb/e2e.yaml
new file mode 100644
index 0000000..a44a76b
--- /dev/null
+++ b/test/e2e-v2/cases/alarm/iotdb/e2e.yaml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+trigger:
+ action: http
+ interval: 3s
+ times: 10
+ url: http://${provider_host}:${provider_9090}/users
+ method: POST
+ body: '{"id":"123","name":"skywalking"}'
+ headers:
+ "Content-Type": "application/json"
+
+verify:
+ retry:
+ count: 20
+ interval: 3s
+ cases:
+ - includes:
+ - ../alarm-cases.yaml
diff --git a/test/e2e-v2/cases/cluster/zk/iotdb/docker-compose.yml b/test/e2e-v2/cases/cluster/zk/iotdb/docker-compose.yml
new file mode 100644
index 0000000..0abf7ee
--- /dev/null
+++ b/test/e2e-v2/cases/cluster/zk/iotdb/docker-compose.yml
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ zk:
+ image: zookeeper:3.5
+ expose:
+ - 2181
+ networks:
+ - e2e
+ healthcheck:
+ test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/2181" ]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ iotdb:
+ image: apache/iotdb:0.12.3-node
+ expose:
+ - 6667
+ networks:
+ - e2e
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ oap1:
+ extends:
+ file: ../../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_CLUSTER: zookeeper
+ SW_STORAGE: iotdb
+ depends_on:
+ zk:
+ condition: service_healthy
+ iotdb:
+ condition: service_healthy
+
+ oap2:
+ extends:
+ file: ../../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_CLUSTER: zookeeper
+ SW_STORAGE: iotdb
+ depends_on:
+ zk:
+ condition: service_healthy
+ iotdb:
+ condition: service_healthy
+ oap1:
+ condition: service_healthy
+
+ ui:
+ extends:
+ file: ../../../../script/docker-compose/base-compose.yml
+ service: ui
+ environment:
+ - SW_OAP_ADDRESS=http://oap1:12800,http://oap2:12800
+ depends_on:
+ oap1:
+ condition: service_healthy
+ oap2:
+ condition: service_healthy
+ ports:
+ - 8080
+
+ provider1:
+ extends:
+ file: ../../../../script/docker-compose/base-compose.yml
+ service: provider
+ environment:
+ SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap2:11800
+ SW_AGENT_INSTANCE_NAME: provider1
+ depends_on:
+ oap2:
+ condition: service_healthy
+
+ consumer:
+ extends:
+ file: ../../../../script/docker-compose/base-compose.yml
+ service: consumer
+ environment:
+ SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap1:11800
+ PROVIDER_URL: http://provider1:9090
+ depends_on:
+ oap1:
+ condition: service_healthy
+ provider1:
+ condition: service_healthy
+ ports:
+ - 9092
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/cluster/zk/iotdb/e2e.yaml b/test/e2e-v2/cases/cluster/zk/iotdb/e2e.yaml
new file mode 100644
index 0000000..f7b4fee
--- /dev/null
+++ b/test/e2e-v2/cases/cluster/zk/iotdb/e2e.yaml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../../script/env
+ steps:
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+trigger:
+ action: http
+ interval: 3s
+ times: 10
+ url: http://${consumer_host}:${consumer_9092}/users
+ method: POST
+ body: '{"id":"123","name":"skywalking"}'
+ headers:
+ "Content-Type": "application/json"
+
+verify:
+ # verify with retry strategy
+ retry:
+ # max retry count
+ count: 20
+ # the interval between two retries, in millisecond.
+ interval: 3s
+ cases:
+ - includes:
+ - ../../cluster-cases.yaml
diff --git a/test/e2e-v2/cases/event/iotdb/docker-compose.yml b/test/e2e-v2/cases/event/iotdb/docker-compose.yml
new file mode 100644
index 0000000..d5639a5
--- /dev/null
+++ b/test/e2e-v2/cases/event/iotdb/docker-compose.yml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3.8'
+
+services:
+ iotdb:
+ image: apache/iotdb:0.12.3-node
+ expose:
+ - 6667
+ networks:
+ - e2e
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_STORAGE: iotdb
+ depends_on:
+ iotdb:
+ condition: service_healthy
+ ports:
+ - 11800
+ - 12800
+
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/event/iotdb/e2e.yaml b/test/e2e-v2/cases/event/iotdb/e2e.yaml
new file mode 100644
index 0000000..fe22de0
--- /dev/null
+++ b/test/e2e-v2/cases/event/iotdb/e2e.yaml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+verify:
+ retry:
+ count: 20
+ interval: 3s
+ cases:
+ - includes:
+ - ../event-cases.yaml
diff --git a/test/e2e-v2/cases/log/iotdb/docker-compose.yml b/test/e2e-v2/cases/log/iotdb/docker-compose.yml
new file mode 100644
index 0000000..ac0c160
--- /dev/null
+++ b/test/e2e-v2/cases/log/iotdb/docker-compose.yml
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ iotdb:
+ image: apache/iotdb:0.12.3-node
+ expose:
+ - 6667
+ networks:
+ - e2e
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ oap:
+ extends:
+ file: ../log-base-compose.yml
+ service: oap
+ environment:
+ SW_STORAGE: iotdb
+ SW_STORAGE_IOTDB_HOST: iotdb
+ ports:
+ - 12800
+ networks:
+ - e2e
+ depends_on:
+ iotdb:
+ condition: service_healthy
+
+ provider:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: provider
+ ports:
+ - 9090
+ networks:
+ - e2e
+ depends_on:
+ oap:
+ condition: service_healthy
+
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/log/iotdb/e2e.yaml b/test/e2e-v2/cases/log/iotdb/e2e.yaml
new file mode 100644
index 0000000..1edeacf
--- /dev/null
+++ b/test/e2e-v2/cases/log/iotdb/e2e.yaml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+trigger:
+ action: http
+ interval: 3s
+ times: 10
+ url: http://${provider_host}:${provider_9090}/users
+ method: POST
+ body: '{"id":"123","name":"skywalking"}'
+ headers:
+ "Content-Type": "application/json"
+
+verify:
+ # verify with retry strategy
+ retry:
+ # max retry count
+ count: 20
+ # the interval between two retries, in millisecond.
+ interval: 10s
+ cases:
+ - includes:
+ - ../log-cases.yaml
diff --git a/test/e2e-v2/cases/profile/iotdb/docker-compose.yml b/test/e2e-v2/cases/profile/iotdb/docker-compose.yml
new file mode 100644
index 0000000..1292925
--- /dev/null
+++ b/test/e2e-v2/cases/profile/iotdb/docker-compose.yml
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ iotdb:
+ image: apache/iotdb:0.12.3-node
+ expose:
+ - 6667
+ networks:
+ - e2e
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ provider:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: provider
+ depends_on:
+ oap:
+ condition: service_healthy
+ ports:
+ - 9090
+
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_STORAGE: iotdb
+ depends_on:
+ iotdb:
+ condition: service_healthy
+ ports:
+ - 12800
+
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/profile/iotdb/e2e.yaml b/test/e2e-v2/cases/profile/iotdb/e2e.yaml
new file mode 100644
index 0000000..107ef28
--- /dev/null
+++ b/test/e2e-v2/cases/profile/iotdb/e2e.yaml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+verify:
+ retry:
+ count: 20
+ interval: 3s
+ cases:
+ - includes:
+ - ../profile-cases.yaml
diff --git a/test/e2e-v2/cases/storage/iotdb/docker-compose.yml b/test/e2e-v2/cases/storage/iotdb/docker-compose.yml
new file mode 100644
index 0000000..7626b2f
--- /dev/null
+++ b/test/e2e-v2/cases/storage/iotdb/docker-compose.yml
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ iotdb:
+ image: apache/iotdb:0.12.3-node
+ expose:
+ - 6667
+ networks:
+ - e2e
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_STORAGE: iotdb
+ ports:
+ - 12800
+ networks:
+ - e2e
+ depends_on:
+ iotdb:
+ condition: service_healthy
+
+ provider:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: provider
+ ports:
+ - 9090
+ networks:
+ - e2e
+ depends_on:
+ oap:
+ condition: service_healthy
+
+ consumer:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: consumer
+ ports:
+ - 9092
+ depends_on:
+ oap:
+ condition: service_healthy
+ provider:
+ condition: service_healthy
+
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/storage/iotdb/e2e.yaml b/test/e2e-v2/cases/storage/iotdb/e2e.yaml
new file mode 100644
index 0000000..73f746c
--- /dev/null
+++ b/test/e2e-v2/cases/storage/iotdb/e2e.yaml
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+trigger:
+ action: http
+ interval: 3s
+ times: 10
+ url: http://${consumer_host}:${consumer_9092}/info
+ method: POST
+
+verify:
+ # verify with retry strategy
+ retry:
+ # max retry count
+ count: 20
+ # the interval between two retries, in millisecond.
+ interval: 10s
+ cases:
+ # service list
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls
+ expected: ../expected/service.yml
+ # service metrics
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name service_sla --service-name=e2e-service-provider | yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+ # service endpoint
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql endpoint list --keyword=info --service-name=e2e-service-provider
+ expected: ../expected/service-endpoint.yml
+ # service endpoint metrics
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name endpoint_cpm --endpoint-name=POST:/info --service-name=e2e-service-provider |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+ # dependency service
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql dependency service --service-name="e2e-service-provider"
+ expected: ../expected/dependency-services.yml
+ # service instance list
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
+ expected: ../expected/service-instance.yml
+ # service instance jvm metrics
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider | yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
+
+ # trace segment list
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls
+ expected: ../expected/traces-list.yml
+ # native tracing: trace detail
+ - query: |
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $( \
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls \
+ | yq e '.traces | select(.[].endpointnames[0]=="POST:/info") | .[0].traceids[0]' - \
+ )
+ expected: ../expected/trace-info-detail.yml
+
+ # native event: event list
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql event list
+ expected: ../expected/event-list.yml
+
+ # native profile: create task
+ - query: |
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql \
+ profile create --service-name=e2e-service-provider \
+ --endpoint-name=POST:/info \
+ --start-time=$((($(date +%s)+5)*1000)) \
+ --duration=1 --min-duration-threshold=0 \
+ --dump-period=10 --max-sampling-count=9
+ expected: ../expected/profile-create.yml
+ # native profile: sleep to wait agent notices and query profile list
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile list -service-name=e2e-service-provider --endpoint-name=POST:/info
+ expected: ../expected/profile-list.yml
+
+ # native profile: sleep to wait segment report and query profiled segment list
+ - query: |
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile segment-list --task-id=$( \
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile list --service-name=e2e-service-provider --endpoint-name=POST:/info | yq e '.[0].id' - \
+ )
+ expected: ../expected/profile-segment-list.yml
+
+ # native profile: query profiled segment
+ - query: |
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile profiled-segment --segment-id=$( \
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile segment-list --task-id=$( \
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile list --service-name=e2e-service-provider --endpoint-name=POST:/info | yq e '.[0].id' - \
+ ) | yq e '.[0].segmentid' - \
+ )
+ expected: ../expected/profile-segment-detail.yml
+
+ # native profile: query profiled segment analyze
+ - query: |
+ segmentid=$( \
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile segment-list --task-id=$( \
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile list --service-name=e2e-service-provider --endpoint-name=POST:/info | yq e '.[0].id' - \
+ ) | yq e '.[0].segmentid' - \
+ );
+ start=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile profiled-segment --segment-id=$segmentid|yq e '.spans.[0].starttime' -);
+ end=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile profiled-segment --segment-id=$segmentid|yq e '.spans.[0].endtime' -);
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile profiled-analyze --segment-id=$segmentid --time-ranges=$(echo $start"-"$end)
+ expected: ../expected/profile-segment-analyze.yml
+
+ - query: |
+ curl -s -XPOST http://${provider_host}:${provider_9090}/users -d '{"id":"123","name":"SinglesBar"}' -H "Content-Type: application/json" > /dev/null;
+ sleep 5;
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $( \
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls --order startTime --service-name "e2e-service-provider" --endpoint-name "POST:/users" \
+ | yq e '.traces[0].traceids[0]' - \
+ )
+ expected: ../expected/trace-users-detail.yml
diff --git a/test/e2e-v2/cases/ttl/iotdb/docker-compose.yml b/test/e2e-v2/cases/ttl/iotdb/docker-compose.yml
new file mode 100644
index 0000000..2a5c238
--- /dev/null
+++ b/test/e2e-v2/cases/ttl/iotdb/docker-compose.yml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.1'
+
+services:
+ iotdb:
+ image: apache/iotdb:0.12.3-node
+ expose:
+ - 6667
+ networks:
+ - e2e
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ oap:
+ extends:
+ file: ../../../script/docker-compose/base-compose.yml
+ service: oap
+ environment:
+ SW_STORAGE: iotdb
+ SW_CORE_DATA_KEEPER_EXECUTE_PERIOD: 1
+ SW_CORE_METRICS_DATA_TTL: 7
+ depends_on:
+ iotdb:
+ condition: service_healthy
+ ports:
+ - 12800
+
+ sender:
+ image: "adoptopenjdk/openjdk8:alpine-jre"
+ volumes:
+ - ./../../../java-test-service/e2e-mock-sender/target/e2e-mock-sender-2.0.0.jar:/e2e-mock-sender-2.0.0.jar
+ command: [ "java", "-jar", "/e2e-mock-sender-2.0.0.jar" ]
+ environment:
+ OAP_HOST: oap
+ OAP_GRPC_PORT: 11800
+ networks:
+ - e2e
+ ports:
+ - 9093
+ healthcheck:
+ test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ oap:
+ condition: service_healthy
+networks:
+ e2e:
diff --git a/test/e2e-v2/cases/ttl/iotdb/e2e.yaml b/test/e2e-v2/cases/ttl/iotdb/e2e.yaml
new file mode 100644
index 0000000..3793093
--- /dev/null
+++ b/test/e2e-v2/cases/ttl/iotdb/e2e.yaml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to show how to write configuration files and can be used to test.
+
+setup:
+ env: compose
+ file: docker-compose.yml
+ timeout: 20m
+ init-system-environment: ../../../script/env
+ steps:
+ - name: install yq
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
+ - name: install swctl
+ command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
+
+verify:
+ retry:
+ count: 30
+ interval: 3s
+ cases:
+ - includes:
+ - ../ttl-cases.yaml
diff --git a/test/e2e-v2/script/docker-compose/base-compose.yml b/test/e2e-v2/script/docker-compose/base-compose.yml
index 26e6ccc..07d51d1 100644
--- a/test/e2e-v2/script/docker-compose/base-compose.yml
+++ b/test/e2e-v2/script/docker-compose/base-compose.yml
@@ -33,6 +33,7 @@ services:
SW_STORAGE_ES_CLUSTER_NODES: es:9200
SW_JDBC_URL: jdbc:mysql://mysql:3306/swtest
SW_STORAGE_INFLUXDB_URL: http://influxdb:8086
+ SW_STORAGE_IOTDB_HOST: iotdb
SW_CONFIG_ETCD_PERIOD: 1
SW_CONFIG_ETCD_ENDPOINTS: http://etcd:2379
SW_CLUSTER_ETCD_ENDPOINTS: http://etcd:2379
diff --git a/tools/dependencies/known-oap-backend-dependencies.txt b/tools/dependencies/known-oap-backend-dependencies.txt
index ad972f8..0c1ad0b 100755
--- a/tools/dependencies/known-oap-backend-dependencies.txt
+++ b/tools/dependencies/known-oap-backend-dependencies.txt
@@ -61,6 +61,8 @@ httpclient-4.5.13.jar
httpcore-4.4.13.jar
httpcore-nio-4.4.13.jar
influxdb-java-2.15.jar
+iotdb-session-0.12.3.jar
+iotdb-thrift-0.12.3.jar
jackson-annotations-2.12.2.jar
jackson-core-2.12.2.jar
jackson-databind-2.12.2.jar
@@ -89,11 +91,14 @@ jsr305-3.0.2.jar
kafka-clients-2.4.1.jar
kotlin-reflect-1.1.1.jar
kotlin-stdlib-1.1.60.jar
+libthrift-0.14.1.jar
listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
log4j-api-2.14.1.jar
log4j-core-2.14.1.jar
log4j-over-slf4j-1.7.30.jar
log4j-slf4j-impl-2.14.1.jar
+logback-classic-1.2.3.jar
+logback-core-1.2.3.jar
logging-interceptor-3.13.1.jar
lz4-java-1.6.0.jar
micrometer-core-1.7.4.jar
@@ -132,6 +137,7 @@ protobuf-java-util-3.17.3.jar
reactive-streams-1.0.2.jar
reflectasm-1.11.7.jar
retrofit-2.5.0.jar
+service-rpc-0.12.3.jar
simpleclient-0.6.0.jar
simpleclient_common-0.6.0.jar
simpleclient_hotspot-0.6.0.jar
@@ -140,6 +146,7 @@ slf4j-api-1.7.30.jar
snakeyaml-1.28.jar
snappy-java-1.1.7.3.jar
swagger-annotations-1.6.3.jar
+tsfile-0.12.3.jar
vavr-0.10.3.jar
vavr-match-0.10.3.jar
zipkin-2.9.1.jar