You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/29 02:14:45 UTC
[skywalking] 01/01: Revert IoTDB as storage option due to negative perf testing result.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch revert-iotdb
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 6b9ccb3f228b1b341586dca70798a61aa1af4eed
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Nov 29 10:14:18 2021 +0800
Revert IoTDB as storage option due to negative perf testing result.
---
.github/workflows/e2e.alarm.yaml | 1 -
.github/workflows/e2e.cluster.yaml | 1 -
.github/workflows/e2e.event.yaml | 1 -
.github/workflows/e2e.log.yaml | 1 -
.github/workflows/e2e.profiling.yaml | 1 -
.github/workflows/e2e.storages.yaml | 1 -
.github/workflows/e2e.ttl.yaml | 1 -
CHANGES.md | 1 -
dist-material/release-docs/LICENSE | 9 +-
docs/en/setup/backend/backend-storage.md | 19 --
oap-server-bom/pom.xml | 16 +-
oap-server/server-starter/pom.xml | 5 -
.../src/main/resources/application.yml | 8 -
oap-server/server-storage-plugin/pom.xml | 1 -
.../storage-iotdb-plugin/pom.xml | 59 ----
.../server/storage/plugin/iotdb/IoTDBClient.java | 338 ---------------------
.../server/storage/plugin/iotdb/IoTDBIndexes.java | 34 ---
.../storage/plugin/iotdb/IoTDBStorageConfig.java | 36 ---
.../storage/plugin/iotdb/IoTDBStorageProvider.java | 158 ----------
.../storage/plugin/iotdb/IoTDBTableInstaller.java | 45 ---
.../storage/plugin/iotdb/IoTDBTableMetaInfo.java | 114 -------
.../storage/plugin/iotdb/base/IoTDBBatchDAO.java | 64 ----
.../plugin/iotdb/base/IoTDBHistoryDeleteDAO.java | 44 ---
.../plugin/iotdb/base/IoTDBInsertRequest.java | 105 -------
.../plugin/iotdb/base/IoTDBManagementDAO.java | 42 ---
.../storage/plugin/iotdb/base/IoTDBMetricsDAO.java | 69 -----
.../plugin/iotdb/base/IoTDBNoneStreamDAO.java | 41 ---
.../storage/plugin/iotdb/base/IoTDBRecordDAO.java | 82 -----
.../storage/plugin/iotdb/base/IoTDBStorageDAO.java | 58 ----
.../iotdb/cache/IoTDBNetworkAddressAliasDAO.java | 57 ----
.../management/IoTDBUITemplateManagementDAO.java | 113 -------
.../iotdb/profile/IoTDBProfileTaskLogQueryDAO.java | 69 -----
.../iotdb/profile/IoTDBProfileTaskQueryDAO.java | 112 -------
.../IoTDBProfileThreadSnapshotQueryDAO.java | 172 -----------
.../iotdb/query/IoTDBAggregationQueryDAO.java | 132 --------
.../plugin/iotdb/query/IoTDBAlarmQueryDAO.java | 104 -------
.../iotdb/query/IoTDBBrowserLogQueryDAO.java | 130 --------
.../plugin/iotdb/query/IoTDBEventQueryDAO.java | 197 ------------
.../plugin/iotdb/query/IoTDBLogQueryDAO.java | 144 ---------
.../plugin/iotdb/query/IoTDBMetadataQueryDAO.java | 226 --------------
.../plugin/iotdb/query/IoTDBMetricsQueryDAO.java | 256 ----------------
.../iotdb/query/IoTDBTopNRecordsQueryDAO.java | 120 --------
.../plugin/iotdb/query/IoTDBTopologyQueryDAO.java | 260 ----------------
.../plugin/iotdb/query/IoTDBTraceQueryDAO.java | 161 ----------
...alking.oap.server.library.module.ModuleProvider | 19 --
test/e2e-v2/cases/alarm/iotdb/docker-compose.yml | 57 ----
test/e2e-v2/cases/alarm/iotdb/e2e.yaml | 45 ---
.../cases/cluster/zk/iotdb/docker-compose.yml | 111 -------
test/e2e-v2/cases/cluster/zk/iotdb/e2e.yaml | 48 ---
test/e2e-v2/cases/event/iotdb/docker-compose.yml | 45 ---
test/e2e-v2/cases/event/iotdb/e2e.yaml | 35 ---
test/e2e-v2/cases/log/iotdb/docker-compose.yml | 59 ----
test/e2e-v2/cases/log/iotdb/e2e.yaml | 48 ---
test/e2e-v2/cases/profile/iotdb/docker-compose.yml | 54 ----
test/e2e-v2/cases/profile/iotdb/e2e.yaml | 35 ---
test/e2e-v2/cases/storage/iotdb/docker-compose.yml | 70 -----
test/e2e-v2/cases/storage/iotdb/e2e.yaml | 129 --------
test/e2e-v2/cases/ttl/iotdb/docker-compose.yml | 66 ----
test/e2e-v2/cases/ttl/iotdb/e2e.yaml | 35 ---
test/e2e-v2/script/docker-compose/base-compose.yml | 1 -
.../known-oap-backend-dependencies.txt | 7 -
61 files changed, 3 insertions(+), 4469 deletions(-)
diff --git a/.github/workflows/e2e.alarm.yaml b/.github/workflows/e2e.alarm.yaml
index 4b8ce66..d316052 100644
--- a/.github/workflows/e2e.alarm.yaml
+++ b/.github/workflows/e2e.alarm.yaml
@@ -44,7 +44,6 @@ jobs:
- alarm/mysql/e2e.yaml
- alarm/influxdb/e2e.yaml
- alarm/postgres/e2e.yaml
- - alarm/iotdb/e2e.yaml
steps:
- uses: actions/checkout@v2
with:
diff --git a/.github/workflows/e2e.cluster.yaml b/.github/workflows/e2e.cluster.yaml
index bbbafcb..db4bf10 100644
--- a/.github/workflows/e2e.cluster.yaml
+++ b/.github/workflows/e2e.cluster.yaml
@@ -40,7 +40,6 @@ jobs:
- cluster/zk/es/e2e.yaml
- cluster/zk/mysql/e2e.yaml
- cluster/zk/influxdb/e2e.yaml
- - cluster/zk/iotdb/e2e.yaml
steps:
- uses: actions/checkout@v2
with:
diff --git a/.github/workflows/e2e.event.yaml b/.github/workflows/e2e.event.yaml
index 98f1e06..3f8868d 100644
--- a/.github/workflows/e2e.event.yaml
+++ b/.github/workflows/e2e.event.yaml
@@ -43,7 +43,6 @@ jobs:
- event/es/e2e.yaml
- event/mysql/e2e.yaml
- event/influxdb/e2e.yaml
- - event/iotdb/e2e.yaml
steps:
- uses: actions/checkout@v2
with:
diff --git a/.github/workflows/e2e.log.yaml b/.github/workflows/e2e.log.yaml
index b42896d..bf7d7e1 100644
--- a/.github/workflows/e2e.log.yaml
+++ b/.github/workflows/e2e.log.yaml
@@ -44,7 +44,6 @@ jobs:
- log/mysql/e2e.yaml
- log/influxdb/e2e.yaml
- log/postgres/e2e.yaml
- - log/iotdb/e2e.yaml
include:
- es-version: 6.3.2
config-file: log/es/e2e.yaml
diff --git a/.github/workflows/e2e.profiling.yaml b/.github/workflows/e2e.profiling.yaml
index bc5b9ac..4adc5bf 100644
--- a/.github/workflows/e2e.profiling.yaml
+++ b/.github/workflows/e2e.profiling.yaml
@@ -44,7 +44,6 @@ jobs:
- profile/es/e2e.yaml
- profile/mysql/e2e.yaml
- profile/influxdb/e2e.yaml
- - profile/iotdb/e2e.yaml
steps:
- uses: actions/checkout@v2
with:
diff --git a/.github/workflows/e2e.storages.yaml b/.github/workflows/e2e.storages.yaml
index 7b3d018..d03a3a5 100644
--- a/.github/workflows/e2e.storages.yaml
+++ b/.github/workflows/e2e.storages.yaml
@@ -42,7 +42,6 @@ jobs:
- storage/tidb/e2e.yaml
- storage/influxdb/e2e.yaml
- storage/postgres/e2e.yaml
- - storage/iotdb/e2e.yaml
include:
- opensearch-version: 1.1.0
config-file: storage/opensearch/e2e.yaml
diff --git a/.github/workflows/e2e.ttl.yaml b/.github/workflows/e2e.ttl.yaml
index 4457fb0..b322f42 100644
--- a/.github/workflows/e2e.ttl.yaml
+++ b/.github/workflows/e2e.ttl.yaml
@@ -43,7 +43,6 @@ jobs:
- ttl/tidb/e2e.yaml
- ttl/influxdb/e2e.yaml
- ttl/postgresql/e2e.yaml
- - ttl/iotdb/e2e.yaml
include:
- es-version: 6.3.2
config-file: ttl/es/e2e.yaml
diff --git a/CHANGES.md b/CHANGES.md
index c06c95c..5205a52 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,7 +59,6 @@ Release Notes.
* Add OpenSearch 1.2.0 to test and verify it works.
* Upgrade grpc-java to 1.42.1 and protoc to 3.19.1 to allow using native Mac osx-aarch_64 artifacts.
* Fix TopologyQuery.loadEndpointRelation bug.
-* Support using IoTDB as a new storage option.
* Add customized envoy ALS protocol receiver for satellite transmit batch data.
#### UI
diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE
index dbd92af..d698da0 100755
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -327,11 +327,6 @@ The text of each license is the standard Apache 2.0 license.
Armeria 1.12.0, http://github.com/line/armeria, Apache 2.0
Brotli4j 1.6.0, https://github.com/hyperxpro/Brotli4j, Apache 2.0
micrometer 1.7.4, https://github.com/micrometer-metrics/micrometer, Apache 2.0
- iotdb-session 0.12.3: https://github.com/apache/iotdb, Apache 2.0
- iotdb-thrift 0.12.3: https://github.com/apache/iotdb, Apache 2.0
- service-rpc 0.12.3: https://github.com/apache/iotdb, Apache 2.0
- tsfile 0.12.3 https://github.com/apache/iotdb Apache 2.0
- libthrift 0.14.1: https://github.com/apache/thrift, Apache 2.0
========================================================================
MIT licenses
@@ -398,9 +393,7 @@ EPL licenses
The following components are provided under the EPL License. See project link for details.
The text of each license is also included at licenses/LICENSE-[project].txt.
- logback 1.2.3: https://github.com/qos-ch/logback: EPL 1.0
- logback-classic 1.2.3: https://github.com/qos-ch/logback: EPL 1.0
- logback-core 1.2.3: https://github.com/qos-ch/logback: EPL 1.0
+ logback 1.1.11: https://github.com/qos-ch/logback: EPL 1.0
========================================================================
CDDL licenses
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index d420b0e..5151546 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -15,7 +15,6 @@ Natively supported storage:
- TiDB
- InfluxDB
- PostgreSQL
-- IoTDB
## H2
@@ -281,24 +280,6 @@ storage:
All connection-related settings, including URL link, username, and password are found in `application.yml`.
Only part of the settings are listed here. Please follow [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
-## IoTDB
-IoTDB is a time-series database from Apache, which is one of the storage plugin options.
-IoTDB storage plugin is still in progress. Its efficiency will improve in the future.
-
-```yaml
-storage:
- selector: ${SW_STORAGE:iotdb}
- iotdb:
- host: ${SW_STORAGE_IOTDB_HOST:127.0.0.1}
- rpcPort: ${SW_STORAGE_IOTDB_RPC_PORT:6667}
- username: ${SW_STORAGE_IOTDB_USERNAME:root}
- password: ${SW_STORAGE_IOTDB_PASSWORD:root}
- storageGroup: ${SW_STORAGE_IOTDB_STORAGE_GROUP:root.skywalking}
- sessionPoolSize: ${SW_STORAGE_IOTDB_SESSIONPOOL_SIZE:16}
- fetchTaskLogMaxSize: ${SW_STORAGE_IOTDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request
-```
-All connection related settings, including host, rpcPort, username, and password are found in `application.yml`. Please ensure the IoTDB version >= 0.12.3.
-
## More storage extension solutions
Follow the [Storage extension development guide](../../guides/storage-extention.md)
in the [Project Extensions document](../../guides/README.md#project-extensions).
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index 4e6fb81..cfde9dd 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -77,8 +77,6 @@
<awaitility.version>3.0.0</awaitility.version>
<httpcore.version>4.4.13</httpcore.version>
<commons-compress.version>1.21</commons-compress.version>
- <iotdb-session.version>0.12.3</iotdb-session.version>
- <lz4-java.version>1.6.0</lz4-java.version>
</properties>
<dependencyManagement>
@@ -536,21 +534,11 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-session</artifactId>
- <version>${iotdb-session.version}</version>
- </dependency>
- <dependency>
- <groupId>org.lz4</groupId>
- <artifactId>lz4-java</artifactId>
- <version>${lz4-java.version}</version>
- </dependency>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 44c1b4a..7f29462 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -188,11 +188,6 @@
<artifactId>storage-tidb-plugin</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>storage-iotdb-plugin</artifactId>
- <version>${project.version}</version>
- </dependency>
<!-- storage module -->
<!-- queryBuild module -->
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 631be41..cdce040 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -242,14 +242,6 @@ storage:
oapAnalyzer: ${SW_STORAGE_ES_OAP_ANALYZER:"{\"analyzer\":{\"oap_analyzer\":{\"type\":\"stop\"}}}"} # the oap analyzer.
oapLogAnalyzer: ${SW_STORAGE_ES_OAP_LOG_ANALYZER:"{\"analyzer\":{\"oap_log_analyzer\":{\"type\":\"standard\"}}}"} # the oap log analyzer. It could be customized by the ES analyzer configuration to support more language log formats, such as Chinese log, Japanese log and etc.
advanced: ${SW_STORAGE_ES_ADVANCED:""}
- iotdb:
- host: ${SW_STORAGE_IOTDB_HOST:127.0.0.1}
- rpcPort: ${SW_STORAGE_IOTDB_RPC_PORT:6667}
- username: ${SW_STORAGE_IOTDB_USERNAME:root}
- password: ${SW_STORAGE_IOTDB_PASSWORD:root}
- storageGroup: ${SW_STORAGE_IOTDB_STORAGE_GROUP:root.skywalking}
- sessionPoolSize: ${SW_STORAGE_IOTDB_SESSIONPOOL_SIZE:16}
- fetchTaskLogMaxSize: ${SW_STORAGE_IOTDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml
index a1f84e4..31bfbaa 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/pom.xml
@@ -33,7 +33,6 @@
<module>storage-zipkin-elasticsearch-plugin</module>
<module>storage-influxdb-plugin</module>
<module>storage-tidb-plugin</module>
- <module>storage-iotdb-plugin</module>
</modules>
</project>
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/pom.xml b/oap-server/server-storage-plugin/storage-iotdb-plugin/pom.xml
deleted file mode 100644
index e1c6507..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>server-storage-plugin</artifactId>
- <groupId>org.apache.skywalking</groupId>
- <version>8.9.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>storage-iotdb-plugin</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>server-core</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.skywalking</groupId>
- <artifactId>library-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-session</artifactId>
- <exclusions>
- <exclusion>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.lz4</groupId>
- <artifactId>lz4-java</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBClient.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBClient.java
deleted file mode 100644
index e0c4680..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBClient.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.library.client.Client;
-import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
-import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
-import org.apache.skywalking.oap.server.library.util.HealthChecker;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBInsertRequest;
-
-@Slf4j
-public class IoTDBClient implements Client, HealthCheckable {
- private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
- private final IoTDBStorageConfig config;
-
- private SessionPool sessionPool;
- private final String storageGroup;
-
- public static final String DOT = ".";
- public static final String ALIGN_BY_DEVICE = " align by device";
-
- public static final String TIME_BUCKET = "time_bucket";
- public static final String TIME = "Time";
- public static final String TIMESTAMP = "timestamp";
-
- public IoTDBClient(IoTDBStorageConfig config) {
- this.config = config;
- storageGroup = config.getStorageGroup();
- }
-
- @Override
- public void connect() throws IoTDBConnectionException, StatementExecutionException {
- try {
- sessionPool = new SessionPool(config.getHost(), config.getRpcPort(), config.getUsername(),
- config.getPassword(), config.getSessionPoolSize());
- sessionPool.setStorageGroup(storageGroup);
-
- healthChecker.health();
- } catch (StatementExecutionException e) {
- if (e.getStatusCode() != TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
- healthChecker.unHealth(e);
- throw e;
- }
- }
- }
-
- @Override
- public void shutdown() {
- sessionPool.close();
- this.healthChecker.health();
- }
-
- @Override
- public void registerChecker(HealthChecker healthChecker) {
- this.healthChecker.register(healthChecker);
- }
-
- public SessionPool getSessionPool() {
- return sessionPool;
- }
-
- public IoTDBStorageConfig getConfig() {
- return config;
- }
-
- /**
- * Write data to IoTDB
- *
- * @param request an IoTDBInsertRequest
- * @throws IOException IoTDBConnectionException or StatementExecutionException
- */
- public void write(IoTDBInsertRequest request) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("Writing data to IoTDB: {}", request);
- }
-
- StringBuilder devicePath = new StringBuilder();
- devicePath.append(storageGroup).append(IoTDBClient.DOT).append(request.getModelName());
- try {
- // make an index value as a layer name of the storage path
- if (!request.getIndexes().isEmpty()) {
- request.getIndexValues().forEach(value -> devicePath.append(IoTDBClient.DOT)
- .append(indexValue2LayerName(value)));
- }
- sessionPool.insertRecord(devicePath.toString(), request.getTime(),
- request.getMeasurements(), request.getMeasurementTypes(), request.getMeasurementValues());
- healthChecker.health();
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- healthChecker.unHealth(e);
- throw new IOException(e);
- }
- }
-
- /**
- * Write a list of data into IoTDB
- *
- * @param requestList a list of IoTDBInsertRequest
- * @throws IOException IoTDBConnectionException or StatementExecutionException
- */
- public void write(List<IoTDBInsertRequest> requestList) throws IOException {
- if (log.isDebugEnabled()) {
- for (IoTDBInsertRequest request : requestList) {
- log.debug("Writing data to IoTDB: {}", request);
- }
- }
-
- List<String> devicePathList = new ArrayList<>();
- List<Long> timeList = new ArrayList<>();
- List<List<String>> timeseriesListList = new ArrayList<>();
- List<List<TSDataType>> typesList = new ArrayList<>();
- List<List<Object>> valuesList = new ArrayList<>();
-
- requestList.forEach(request -> {
- StringBuilder devicePath = new StringBuilder();
- devicePath.append(storageGroup).append(IoTDBClient.DOT).append(request.getModelName());
- // make an index value as a layer name of the storage path
- if (!request.getIndexes().isEmpty()) {
- request.getIndexValues().forEach(value -> devicePath.append(IoTDBClient.DOT)
- .append(indexValue2LayerName(value)));
- }
- devicePathList.add(devicePath.toString());
- timeList.add(request.getTime());
- timeseriesListList.add(request.getMeasurements());
- typesList.add(request.getMeasurementTypes());
- valuesList.add(request.getMeasurementValues());
- });
-
- try {
- sessionPool.insertRecords(devicePathList, timeList, timeseriesListList, typesList, valuesList);
- healthChecker.health();
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- healthChecker.unHealth(e);
- throw new IOException(e);
- }
- }
-
- /**
- * Normal filter query for a list of data. querySQL must contain "align by device"
- *
- * @param modelName model name
- * @param querySQL the SQL for query which must contain "align by device"
- * @param storageBuilder storage builder for transforming storage result map to entity
- * @return a list of result data
- * @throws IOException IoTDBConnectionException or StatementExecutionException
- */
- public List<? super StorageData> filterQuery(String modelName, String querySQL,
- StorageHashMapBuilder<? extends StorageData> storageBuilder)
- throws IOException {
- if (!querySQL.contains("align by device")) {
- throw new IOException("querySQL must contain \"align by device\"");
- }
- SessionDataSetWrapper wrapper = null;
- List<? super StorageData> storageDataList = new ArrayList<>();
- try {
- wrapper = sessionPool.executeQueryStatement(querySQL);
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", querySQL, wrapper.getColumnNames());
- }
-
- List<String> columnNames = wrapper.getColumnNames();
- IoTDBTableMetaInfo tableMetaInfo = IoTDBTableMetaInfo.get(modelName);
- List<String> indexes = tableMetaInfo.getIndexes();
- while (wrapper.hasNext()) {
- Map<String, Object> map = new ConcurrentHashMap<>();
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- // transform timestamp to time_bucket
- if (!UITemplate.INDEX_NAME.equals(modelName)) {
- map.put(IoTDBClient.TIME_BUCKET, TimeBucket.getTimeBucket(rowRecord.getTimestamp(),
- tableMetaInfo.getModel().getDownsampling()));
- }
- // field.get(0) -> Device, transform layerName to indexValue
- String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
- for (int i = 0; i < indexes.size(); i++) {
- map.put(indexes.get(i), layerName2IndexValue(layerNames[i + 1]));
- }
- for (int i = 0; i < columnNames.size() - 2; i++) {
- String columnName = columnNames.get(i + 2);
- Field field = fields.get(i + 1);
- if (field.getDataType() == null) {
- continue;
- }
- if (field.getDataType().equals(TSDataType.TEXT)) {
- map.put(columnName, field.getStringValue());
- } else {
- map.put(columnName, field.getObjectValue(field.getDataType()));
- }
- }
- if (map.containsKey(IoTDBIndexes.NODE_TYPE_IDX)) {
- String nodeType = (String) map.get(IoTDBIndexes.NODE_TYPE_IDX);
- map.put(IoTDBIndexes.NODE_TYPE_IDX, Integer.valueOf(nodeType));
- }
- if (modelName.equals(BrowserErrorLogRecord.INDEX_NAME) || modelName.equals(LogRecord.INDEX_NAME)) {
- map.put(IoTDBClient.TIMESTAMP, map.get("\"" + IoTDBClient.TIMESTAMP + "\""));
- }
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- // remove double quotes
- String key = entry.getKey();
- if (key.contains(".")) {
- map.put(key.substring(1, key.length() - 1), entry.getValue());
- }
- }
-
- storageDataList.add(storageBuilder.storage2Entity(map));
- }
- healthChecker.health();
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- healthChecker.unHealth(e);
- throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + querySQL, e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
- return storageDataList;
- }
-
- /**
- * Query with aggregation function: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value
- *
- * @param querySQL the SQL for query which should contain aggregation function
- * @return the result of aggregation function
- * @throws IOException IoTDBConnectionException or StatementExecutionException
- */
- public List<Double> queryWithAgg(String querySQL) throws IOException {
- SessionDataSetWrapper wrapper = null;
- List<Double> results = new ArrayList<>();
- try {
- wrapper = sessionPool.executeQueryStatement(querySQL);
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", querySQL, wrapper.getColumnNames());
- }
-
- if (wrapper.hasNext()) {
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- for (Field field : fields) {
- String stringValue = field.getStringValue();
- if (!stringValue.equals("null")) {
- results.add(Double.parseDouble(stringValue));
- }
- }
- }
- healthChecker.health();
- return results;
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- healthChecker.unHealth(e);
- throw new IOException(e.getMessage() + System.lineSeparator() + "SQL Statement: " + querySQL, e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
- }
-
- /**
- * Delete data <= deleteTime in one timeseries
- *
- * @param device device name
- * @param deleteTime deleteTime
- * @throws IOException IoTDBConnectionException or StatementExecutionException
- */
- public void deleteData(String device, long deleteTime) throws IOException {
- try {
- sessionPool.deleteData(storageGroup + IoTDBClient.DOT + device, deleteTime);
- healthChecker.health();
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- healthChecker.unHealth(e);
- throw new IOException(e);
- }
- }
-
- public String indexValue2LayerName(String indexValue) {
- return "\"" + indexValue + "\"";
- }
-
- public String layerName2IndexValue(String layerName) {
- return layerName.substring(0, layerName.length() - 1);
- }
-
- public StringBuilder addQueryIndexValue(String modelName, StringBuilder query, Map<String, String> indexAndValueMap) {
- List<String> indexes = IoTDBTableMetaInfo.get(modelName).getIndexes();
- indexes.forEach(index -> {
- if (indexAndValueMap.containsKey(index)) {
- query.append(IoTDBClient.DOT).append(indexValue2LayerName(indexAndValueMap.get(index)));
- } else {
- query.append(IoTDBClient.DOT).append("*");
- }
- });
- return query;
- }
-
- public StringBuilder addQueryAsterisk(String modelName, StringBuilder query) {
- List<String> indexes = IoTDBTableMetaInfo.get(modelName).getIndexes();
- indexes.forEach(index -> query.append(IoTDBClient.DOT).append("*"));
- return query;
- }
-
- public StringBuilder addModelPath(StringBuilder query, String modelName) {
- return query.append(storageGroup).append(IoTDBClient.DOT).append(modelName);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java
deleted file mode 100644
index 24b679c..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBIndexes.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb;
-
-public interface IoTDBIndexes {
- // Here is the indexes we choose and their order in storage path.
- String ID_IDX = "id";
- String ENTITY_ID_IDX = "entity_id";
- String NODE_TYPE_IDX = "node_type";
- String SERVICE_ID_IDX = "service_id";
- String GROUP_IDX = "service_group";
- String TRACE_ID_IDX = "trace_id";
-
- static boolean isIndex(String key) {
- return key.equals(ID_IDX) || key.equals(ENTITY_ID_IDX) || key.equals(NODE_TYPE_IDX) ||
- key.equals(SERVICE_ID_IDX) || key.equals(GROUP_IDX) || key.equals(TRACE_ID_IDX);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageConfig.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageConfig.java
deleted file mode 100644
index 8e34445..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageConfig.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-
-@Getter
-@Setter
-public class IoTDBStorageConfig extends ModuleConfig {
- private String host;
- private int rpcPort;
- private String username;
- private String password;
- private String storageGroup;
- private int sessionPoolSize;
-
- private int fetchTaskLogMaxSize;
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java
deleted file mode 100644
index 6840ede..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBStorageProvider.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
-import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
-import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBBatchDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBStorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.cache.IoTDBNetworkAddressAliasDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.management.IoTDBUITemplateManagementDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.profile.IoTDBProfileTaskLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.profile.IoTDBProfileTaskQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.profile.IoTDBProfileThreadSnapshotQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBAggregationQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBAlarmQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBBrowserLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBEventQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBMetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBMetricsQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTopNRecordsQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTopologyQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.query.IoTDBTraceQueryDAO;
-import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-
-@Slf4j
-public class IoTDBStorageProvider extends ModuleProvider {
- private final IoTDBStorageConfig config;
- private IoTDBClient client;
-
- public IoTDBStorageProvider() {
- config = new IoTDBStorageConfig();
- }
-
- @Override
- public String name() {
- return "iotdb";
- }
-
- @Override
- public Class<? extends ModuleDefine> module() {
- return StorageModule.class;
- }
-
- @Override
- public ModuleConfig createConfigBeanIfAbsent() {
- return config;
- }
-
- @Override
- public void prepare() throws ServiceNotProvidedException {
- this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
-
- client = new IoTDBClient(config);
-
- this.registerServiceImplementation(IBatchDAO.class, new IoTDBBatchDAO(client));
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new IoTDBHistoryDeleteDAO(client));
- this.registerServiceImplementation(StorageDAO.class, new IoTDBStorageDAO(client));
-
- this.registerServiceImplementation(INetworkAddressAliasDAO.class, new IoTDBNetworkAddressAliasDAO(client));
-
- this.registerServiceImplementation(UITemplateManagementDAO.class, new IoTDBUITemplateManagementDAO(client));
-
- this.registerServiceImplementation(IProfileTaskLogQueryDAO.class,
- new IoTDBProfileTaskLogQueryDAO(client, config.getFetchTaskLogMaxSize()));
- this.registerServiceImplementation(IProfileTaskQueryDAO.class, new IoTDBProfileTaskQueryDAO(client));
- this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class,
- new IoTDBProfileThreadSnapshotQueryDAO(client));
-
- this.registerServiceImplementation(IAggregationQueryDAO.class, new IoTDBAggregationQueryDAO(client));
- this.registerServiceImplementation(IAlarmQueryDAO.class, new IoTDBAlarmQueryDAO(client));
- this.registerServiceImplementation(IBrowserLogQueryDAO.class, new IoTDBBrowserLogQueryDAO(client));
- this.registerServiceImplementation(IEventQueryDAO.class, new IoTDBEventQueryDAO(client));
- this.registerServiceImplementation(ILogQueryDAO.class, new IoTDBLogQueryDAO(client));
- this.registerServiceImplementation(IMetadataQueryDAO.class, new IoTDBMetadataQueryDAO(client));
- this.registerServiceImplementation(IMetricsQueryDAO.class, new IoTDBMetricsQueryDAO(client));
- this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new IoTDBTopNRecordsQueryDAO(client));
- this.registerServiceImplementation(ITopologyQueryDAO.class, new IoTDBTopologyQueryDAO(client));
- this.registerServiceImplementation(ITraceQueryDAO.class, new IoTDBTraceQueryDAO(client));
- }
-
- @Override
- public void start() throws ServiceNotProvidedException, ModuleStartException {
- MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
- .provider()
- .getService(MetricsCreator.class);
- HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
- "storage_iotdb", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
- client.registerChecker(healthChecker);
- try {
- client.connect();
-
- IoTDBTableInstaller installer = new IoTDBTableInstaller(client, getManager());
- getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
- } catch (StorageException | IoTDBConnectionException | StatementExecutionException e) {
- throw new ModuleStartException(e.getMessage(), e);
- }
- }
-
- @Override
- public void notifyAfterCompleted() throws ServiceNotProvidedException {
-
- }
-
- @Override
- public String[] requiredModules() {
- return new String[]{CoreModule.NAME};
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableInstaller.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableInstaller.java
deleted file mode 100644
index 3e26ff5..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableInstaller.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb;
-
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
-import org.apache.skywalking.oap.server.library.client.Client;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-
-/**
- * The Design of Apache IoTDB Storage Option
- * https://skywalking.apache.org/blog/2021-11-23-design-of-iotdb-storage-option/
- */
-public class IoTDBTableInstaller extends ModelInstaller {
- public IoTDBTableInstaller(Client client, ModuleManager moduleManager) {
- super(client, moduleManager);
- }
-
- @Override
- protected boolean isExists(Model model) {
- IoTDBTableMetaInfo.addModel(model);
- return true;
- }
-
- @Override
- protected void createTable(Model model) {
- // Automatically create table when insert data
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java
deleted file mode 100644
index 4b14d4f..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/IoTDBTableMetaInfo.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb;
-
-import com.google.gson.JsonObject;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Getter;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.skywalking.oap.server.core.analysis.NodeType;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
-import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@Getter
-@Builder
-@AllArgsConstructor
-public class IoTDBTableMetaInfo {
- private static final Map<String, IoTDBTableMetaInfo> TABLE_META_INFOS = new HashMap<>();
-
- private final Model model;
- private final Map<String, TSDataType> columnAndTypeMap;
- private final List<String> indexes;
-
- public static void addModel(Model model) {
- final List<ModelColumn> columns = model.getColumns();
- final Map<String, String> storageAndIndexMap = new HashMap<>();
- final Map<String, TSDataType> columnAndTypeMap = new HashMap<>();
- final List<String> indexes = new ArrayList<>();
-
- storageAndIndexMap.put(model.getName(), IoTDBIndexes.ID_IDX);
- columns.forEach(column -> {
- String columnName = column.getColumnName().getName();
- if (IoTDBIndexes.isIndex(columnName)) {
- storageAndIndexMap.put(column.getColumnName().getStorageName(), columnName);
- } else {
- columnAndTypeMap.put(columnName, typeToTSDataType(column.getType()));
- }
- });
-
- // index order: id, entity_id, node_type, service_id, service_group, trace_id
- indexes.add(IoTDBIndexes.ID_IDX);
- if (storageAndIndexMap.containsValue(IoTDBIndexes.ENTITY_ID_IDX)) {
- indexes.add(IoTDBIndexes.ENTITY_ID_IDX);
- }
- if (storageAndIndexMap.containsValue(IoTDBIndexes.NODE_TYPE_IDX)) {
- indexes.add(IoTDBIndexes.NODE_TYPE_IDX);
- }
- if (storageAndIndexMap.containsValue(IoTDBIndexes.SERVICE_ID_IDX)) {
- indexes.add(IoTDBIndexes.SERVICE_ID_IDX);
- }
- if (storageAndIndexMap.containsValue(IoTDBIndexes.GROUP_IDX)) {
- indexes.add(IoTDBIndexes.GROUP_IDX);
- }
- if (storageAndIndexMap.containsValue(IoTDBIndexes.TRACE_ID_IDX)) {
- indexes.add(IoTDBIndexes.TRACE_ID_IDX);
- }
-
- final IoTDBTableMetaInfo tableMetaInfo = IoTDBTableMetaInfo.builder().model(model)
- .columnAndTypeMap(columnAndTypeMap).indexes(indexes).build();
- TABLE_META_INFOS.put(model.getName(), tableMetaInfo);
- }
-
- public static IoTDBTableMetaInfo get(String moduleName) {
- return TABLE_META_INFOS.get(moduleName);
- }
-
- private static TSDataType typeToTSDataType(Class<?> type) {
- if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) {
- return TSDataType.INT32;
- } else if (Long.class.equals(type) || long.class.equals(type)) {
- return TSDataType.INT64;
- } else if (Float.class.equals(type) || float.class.equals(type)) {
- return TSDataType.FLOAT;
- } else if (Double.class.equals(type) || double.class.equals(type)) {
- return TSDataType.DOUBLE;
- } else if (Boolean.class.equals(type) || boolean.class.equals(type)) {
- return TSDataType.BOOLEAN;
- } else if (String.class.equals(type)) {
- return TSDataType.TEXT;
- } else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
- return TSDataType.TEXT;
- } else if (byte[].class.equals(type)) {
- return TSDataType.TEXT;
- } else if (JsonObject.class.equals(type)) {
- return TSDataType.TEXT;
- } else if (List.class.isAssignableFrom(type)) {
- return TSDataType.TEXT;
- } else {
- throw new IllegalArgumentException("Unsupported data type: " + type.getName());
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBBatchDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBBatchDAO.java
deleted file mode 100644
index fba54c2..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBBatchDAO.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBBatchDAO implements IBatchDAO {
- private final IoTDBClient client;
-
- @Override
- public void insert(InsertRequest insertRequest) {
- try {
- client.write((IoTDBInsertRequest) insertRequest);
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- }
- }
-
- @Override
- public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
- if (CollectionUtils.isEmpty(prepareRequests)) {
- return CompletableFuture.completedFuture(null);
- }
- if (log.isDebugEnabled()) {
- log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
- }
- List<IoTDBInsertRequest> tempPrepareRequests = new ArrayList<>(prepareRequests.size());
- prepareRequests.forEach(prepareRequest -> tempPrepareRequests.add((IoTDBInsertRequest) prepareRequest));
- try {
- client.write(tempPrepareRequests);
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- }
- return CompletableFuture.completedFuture(null);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBHistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBHistoryDeleteDAO.java
deleted file mode 100644
index 5d71b16..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBHistoryDeleteDAO.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.joda.time.DateTime;
-
-import java.io.IOException;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBHistoryDeleteDAO implements IHistoryDeleteDAO {
- private final IoTDBClient client;
-
- @Override
- public void deleteHistory(Model model, String timeBucketColumnName, int ttl) throws IOException {
- if (log.isDebugEnabled()) {
- log.debug("TTL execution log, model: {}, TTL: {}", model.getName(), ttl);
- }
- long deadline = Long.parseLong(new DateTime().plusDays(-ttl).toString("yyyyMMddHHmm"));
- client.deleteData(model.getName(), TimeBucket.getTimestamp(deadline));
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBInsertRequest.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBInsertRequest.java
deleted file mode 100644
index 659a5be..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBInsertRequest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBTableMetaInfo;
-
-@Getter
-@Setter
-@ToString
-@Slf4j
-public class IoTDBInsertRequest implements InsertRequest, UpdateRequest {
- private String modelName;
- private long time;
- private List<String> indexes;
- private List<String> indexValues;
- private List<String> measurements;
- private List<TSDataType> measurementTypes;
- private List<Object> measurementValues;
-
- public <T extends StorageData> IoTDBInsertRequest(String modelName, long time, T storageData,
- StorageHashMapBuilder<T> storageBuilder) {
- this.modelName = modelName;
- this.time = time;
- indexes = IoTDBTableMetaInfo.get(modelName).getIndexes();
- indexValues = new ArrayList<>(indexes.size());
- Map<String, Object> storageMap = storageBuilder.entity2Storage(storageData);
-
- indexes.forEach(index -> {
- if (index.equals(IoTDBIndexes.ID_IDX)) {
- indexValues.add(storageData.id());
- } else if (storageMap.containsKey(index)) {
- // avoid `service_group` be "null" when inserting
- if (index.equals(IoTDBIndexes.GROUP_IDX) && storageMap.get(index) == null) {
- indexValues.add("");
- } else {
- indexValues.add(String.valueOf(storageMap.get(index)));
- }
- storageMap.remove(index);
- }
- });
-
- // time_bucket has changed to time before calling this method, so remove it from measurements
- storageMap.remove(IoTDBClient.TIME_BUCKET);
- // processing value to make it suitable for storage
- Iterator<Map.Entry<String, Object>> entryIterator = storageMap.entrySet().iterator();
- while (entryIterator.hasNext()) {
- Map.Entry<String, Object> entry = entryIterator.next();
- // IoTDB doesn't allow insert null value.
- if (entry.getValue() == null) {
- entryIterator.remove();
- }
- if (entry.getValue() instanceof StorageDataComplexObject) {
- storageMap.put(entry.getKey(), ((StorageDataComplexObject) entry.getValue()).toStorageData());
- }
- }
-
- measurements = new ArrayList<>(storageMap.keySet());
- Map<String, TSDataType> columnAndTypeMap = IoTDBTableMetaInfo.get(modelName).getColumnAndTypeMap();
- measurementTypes = new ArrayList<>(measurements.size());
- for (String measurement : measurements) {
- measurementTypes.add(columnAndTypeMap.get(measurement));
- }
- measurementValues = new ArrayList<>(storageMap.values());
-
- // IoTDB doesn't allow a measurement named `timestamp` or contains `.`
- for (String key : storageMap.keySet()) {
- if (key.equals(IoTDBClient.TIMESTAMP) || key.contains(".")) {
- int idx = measurements.indexOf(key);
- measurements.set(idx, "\"" + key + "\"");
- }
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBManagementDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBManagementDAO.java
deleted file mode 100644
index a804896..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBManagementDAO.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
-import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-import java.io.IOException;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBManagementDAO implements IManagementDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<ManagementData> storageBuilder;
-
- @Override
- public void insert(Model model, ManagementData storageData) throws IOException {
- IoTDBInsertRequest request = new IoTDBInsertRequest(model.getName(), 1L, storageData, storageBuilder);
- client.write(request);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBMetricsDAO.java
deleted file mode 100644
index e93baa3..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBMetricsDAO.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBMetricsDAO implements IMetricsDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<Metrics> storageBuilder;
-
- @Override
- public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- for (Metrics metric : metrics) {
- query.append(", ");
- query = client.addModelPath(query, model.getName());
- query.append(IoTDBClient.DOT).append(client.indexValue2LayerName(metric.id()));
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
- String queryString = query.toString().replaceFirst(", ", "");
- List<? super StorageData> storageDataList = client.filterQuery(model.getName(), queryString, storageBuilder);
- List<Metrics> newMetrics = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> newMetrics.add((Metrics) storageData));
- return newMetrics;
- }
-
- @Override
- public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
- final long timestamp = TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling());
- return new IoTDBInsertRequest(model.getName(), timestamp, metrics, storageBuilder);
- }
-
- @Override
- public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
- return (UpdateRequest) prepareBatchInsert(model, metrics);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBNoneStreamDAO.java
deleted file mode 100644
index 324fa90..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBNoneStreamDAO.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
-
-import java.io.IOException;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
-import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@RequiredArgsConstructor
-public class IoTDBNoneStreamDAO implements INoneStreamDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<NoneStream> storageBuilder;
-
- @Override
- public void insert(Model model, NoneStream noneStream) throws IOException {
- final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling());
- final IoTDBInsertRequest request = new IoTDBInsertRequest(model.getName(), timestamp, noneStream, storageBuilder);
- client.write(request);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBRecordDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBRecordDAO.java
deleted file mode 100644
index c6ae21a..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBRecordDAO.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
-
-import java.util.List;
-import java.util.Objects;
-import lombok.RequiredArgsConstructor;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-
-@RequiredArgsConstructor
-public class IoTDBRecordDAO implements IRecordDAO {
- private final StorageHashMapBuilder<Record> storageBuilder;
-
- @Override
- public InsertRequest prepareBatchInsert(Model model, Record record) {
- final long timestamp = TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling());
- IoTDBInsertRequest request = new IoTDBInsertRequest(model.getName(), timestamp, record, storageBuilder);
-
- // transform tags of SegmentRecord, LogRecord, AlarmRecord to tag1, tag2, ...
- List<String> measurements = request.getMeasurements();
- List<TSDataType> measurementTypes = request.getMeasurementTypes();
- List<Object> measurementValues = request.getMeasurementValues();
- List<Tag> rawTags = null;
- if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
- rawTags = ((SegmentRecord) record).getTagsRawData();
- measurementTypes.remove(measurements.indexOf(SegmentRecord.TAGS));
- measurementValues.remove(measurements.indexOf(SegmentRecord.TAGS));
- measurements.remove(SegmentRecord.TAGS);
- } else if (LogRecord.INDEX_NAME.equals(model.getName())) {
- rawTags = ((LogRecord) record).getTags();
- measurementTypes.remove(measurements.indexOf(LogRecord.TAGS));
- measurementValues.remove(measurements.indexOf(LogRecord.TAGS));
- measurements.remove(LogRecord.TAGS);
- } else if (AlarmRecord.INDEX_NAME.equals(model.getName())) {
- rawTags = ((AlarmRecord) record).getTags();
- measurementTypes.remove(measurements.indexOf(AlarmRecord.TAGS));
- measurementValues.remove(measurements.indexOf(AlarmRecord.TAGS));
- measurements.remove(AlarmRecord.TAGS);
- }
- if (Objects.nonNull(rawTags)) {
- rawTags.forEach(rawTag -> {
- if (rawTag.getKey().contains(".")) {
- measurements.add("\"" + rawTag.getKey() + "\"");
- } else {
- measurements.add(rawTag.getKey());
- }
- measurementTypes.add(TSDataType.TEXT);
- measurementValues.add(rawTag.getValue());
- });
- }
- request.setMeasurements(measurements);
- request.setMeasurementTypes(measurementTypes);
- request.setMeasurementValues(measurementValues);
- return request;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBStorageDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBStorageDAO.java
deleted file mode 100644
index 936897f..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/base/IoTDBStorageDAO.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.base;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
-import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
-import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@RequiredArgsConstructor
-public class IoTDBStorageDAO implements StorageDAO {
- private final IoTDBClient ioTDBClient;
-
- @Override
- public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
- return new IoTDBMetricsDAO(ioTDBClient, (StorageHashMapBuilder<Metrics>) storageBuilder);
- }
-
- @Override
- public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
- return new IoTDBRecordDAO((StorageHashMapBuilder<Record>) storageBuilder);
- }
-
- @Override
- public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) {
- return new IoTDBNoneStreamDAO(ioTDBClient, (StorageHashMapBuilder<NoneStream>) storageBuilder);
- }
-
- @Override
- public IManagementDAO newManagementDao(StorageBuilder storageBuilder) {
- return new IoTDBManagementDAO(ioTDBClient, (StorageHashMapBuilder<ManagementData>) storageBuilder);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/cache/IoTDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/cache/IoTDBNetworkAddressAliasDAO.java
deleted file mode 100644
index 47b7999..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/cache/IoTDBNetworkAddressAliasDAO.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.cache;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO {
- private final NetworkAddressAlias.Builder storageBuilder = new NetworkAddressAlias.Builder();
- private final IoTDBClient client;
-
- @Override
- public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, NetworkAddressAlias.INDEX_NAME);
- query = client.addQueryAsterisk(NetworkAddressAlias.INDEX_NAME, query);
- query.append(" where ").append(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET).append(" >= ").append(timeBucket)
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- try {
- List<? super StorageData> storageDataList = client.filterQuery(NetworkAddressAlias.INDEX_NAME,
- query.toString(), storageBuilder);
- List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> networkAddressAliases.add((NetworkAddressAlias) storageData));
- return networkAddressAliases;
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- }
- return new ArrayList<>();
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/management/IoTDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/management/IoTDBUITemplateManagementDAO.java
deleted file mode 100644
index 0bf9c20..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/management/IoTDBUITemplateManagementDAO.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.management;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
-import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
-import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.base.IoTDBInsertRequest;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBUITemplateManagementDAO implements UITemplateManagementDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<UITemplate> storageBuilder = new UITemplate.Builder();
- private static final long UI_TEMPLATE_TIMESTAMP = 1L;
-
- @Override
- public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, UITemplate.INDEX_NAME);
- query = client.addQueryAsterisk(UITemplate.INDEX_NAME, query);
- if (!includingDisabled) {
- query.append(" where ").append(UITemplate.DISABLED).append(" = ").append(BooleanUtils.FALSE);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(UITemplate.INDEX_NAME, query.toString(),
- storageBuilder);
- List<DashboardConfiguration> dashboardConfigurationList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData ->
- dashboardConfigurationList.add(new DashboardConfiguration().fromEntity((UITemplate) storageData)));
- return dashboardConfigurationList;
- }
-
- @Override
- public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
- final UITemplate uiTemplate = setting.toEntity();
-
- IoTDBInsertRequest request = new IoTDBInsertRequest(UITemplate.INDEX_NAME, UI_TEMPLATE_TIMESTAMP,
- uiTemplate, storageBuilder);
- client.write(request);
- return TemplateChangeStatus.builder().status(true).build();
- }
-
- @Override
- public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
- final UITemplate uiTemplate = setting.toEntity();
-
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, UITemplate.INDEX_NAME);
- query.append(IoTDBClient.DOT).append(client.indexValue2LayerName(uiTemplate.id()))
- .append(IoTDBClient.ALIGN_BY_DEVICE);
- List<? super StorageData> queryResult = client.filterQuery(UITemplate.INDEX_NAME, query.toString(), storageBuilder);
- if (queryResult.size() == 0) {
- return TemplateChangeStatus.builder().status(false).message("Can't find the template").build();
- } else {
- IoTDBInsertRequest request = new IoTDBInsertRequest(UITemplate.INDEX_NAME, UI_TEMPLATE_TIMESTAMP,
- uiTemplate, storageBuilder);
- client.write(request);
- return TemplateChangeStatus.builder().status(true).build();
- }
- }
-
- @Override
- public TemplateChangeStatus disableTemplate(String name) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, UITemplate.INDEX_NAME);
- query.append(IoTDBClient.DOT).append(client.indexValue2LayerName(name))
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> queryResult = client.filterQuery(UITemplate.INDEX_NAME, query.toString(), storageBuilder);
- if (queryResult.size() == 0) {
- return TemplateChangeStatus.builder().status(false).message("Can't find the template").build();
- } else {
- final UITemplate uiTemplate = (UITemplate) queryResult.get(0);
- uiTemplate.setDisabled(BooleanUtils.TRUE);
- IoTDBInsertRequest request = new IoTDBInsertRequest(UITemplate.INDEX_NAME, UI_TEMPLATE_TIMESTAMP,
- uiTemplate, storageBuilder);
- client.write(request);
- return TemplateChangeStatus.builder().status(true).build();
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskLogQueryDAO.java
deleted file mode 100644
index 4695ed8..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskLogQueryDAO.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.profile;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<ProfileTaskLogRecord> storageBuilder = new ProfileTaskLogRecord.Builder();
- private final int fetchTaskLogMaxSize;
-
- @Override
- public List<ProfileTaskLog> getTaskLogList() throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ProfileTaskLogRecord.INDEX_NAME);
- query = client.addQueryAsterisk(ProfileTaskLogRecord.INDEX_NAME, query);
- query.append(" limit ").append(fetchTaskLogMaxSize).append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ProfileTaskLogRecord.INDEX_NAME, query.toString(), storageBuilder);
- List<ProfileTaskLogRecord> profileTaskLogRecordList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> profileTaskLogRecordList.add((ProfileTaskLogRecord) storageData));
- // resort by self, because of the query result order by time.
- profileTaskLogRecordList.sort((ProfileTaskLogRecord r1, ProfileTaskLogRecord r2) ->
- Long.compare(r2.getOperationTime(), r1.getOperationTime()));
- List<ProfileTaskLog> profileTaskLogList = new ArrayList<>(profileTaskLogRecordList.size());
- profileTaskLogRecordList.forEach(profileTaskLogRecord -> profileTaskLogList.add(parseLog(profileTaskLogRecord)));
- return profileTaskLogList;
- }
-
- private ProfileTaskLog parseLog(ProfileTaskLogRecord record) {
- return ProfileTaskLog.builder()
- .id(record.id())
- .taskId(record.getTaskId())
- .instanceId(record.getInstanceId())
- .operationType(ProfileTaskLogOperationType.parse(record.getOperationType()))
- .operationTime(record.getOperationTime())
- .build();
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskQueryDAO.java
deleted file mode 100644
index 0a2b443..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileTaskQueryDAO.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.profile;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
- private final IoTDBClient client;
- private final ProfileTaskRecord.Builder storageBuilder = new ProfileTaskRecord.Builder();
-
- @Override
- public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket,
- Long endTimeBucket, Integer limit) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ProfileTaskRecord.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- if (StringUtil.isNotEmpty(serviceId)) {
- indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
- }
- query = client.addQueryIndexValue(ProfileTaskRecord.INDEX_NAME, query, indexAndValueMap);
-
- StringBuilder where = new StringBuilder(" where ");
- if (StringUtil.isNotEmpty(endpointName)) {
- where.append(ProfileTaskRecord.ENDPOINT_NAME).append(" = \"").append(endpointName).append("\"").append(" and ");
- }
- if (Objects.nonNull(startTimeBucket)) {
- where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTimeBucket)).append(" and ");
- }
- if (Objects.nonNull(endTimeBucket)) {
- where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTimeBucket)).append(" and ");
- }
- if (where.length() > 7) {
- int length = where.length();
- where.delete(length - 5, length);
- query.append(where);
- }
- if (Objects.nonNull(limit)) {
- query.append(" limit ").append(limit);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ProfileTaskRecord.INDEX_NAME, query.toString(), storageBuilder);
- List<ProfileTask> profileTaskList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> profileTaskList.add(record2ProfileTask((ProfileTaskRecord) storageData)));
- return profileTaskList;
- }
-
- @Override
- public ProfileTask getById(String id) throws IOException {
- if (StringUtil.isEmpty(id)) {
- return null;
- }
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ProfileTaskRecord.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.ID_IDX, id);
- query = client.addQueryIndexValue(ProfileTaskRecord.INDEX_NAME, query, indexAndValueMap);
- query.append(" limit 1").append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ProfileTaskRecord.INDEX_NAME, query.toString(), storageBuilder);
- return record2ProfileTask((ProfileTaskRecord) storageDataList.get(0));
- }
-
- private static ProfileTask record2ProfileTask(ProfileTaskRecord record) {
- return ProfileTask.builder()
- .id(record.id())
- .serviceId(record.getServiceId())
- .endpointName(record.getEndpointName())
- .startTime(record.getStartTime())
- .createTime(record.getCreateTime())
- .duration(record.getDuration())
- .minDurationThreshold(record.getMinDurationThreshold())
- .dumpPeriod(record.getDumpPeriod())
- .maxSamplingCount(record.getMaxSamplingCount())
- .build();
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileThreadSnapshotQueryDAO.java
deleted file mode 100644
index 44cfcfb..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/profile/IoTDBProfileThreadSnapshotQueryDAO.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.profile;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@RequiredArgsConstructor
-public class IoTDBProfileThreadSnapshotQueryDAO implements IProfileThreadSnapshotQueryDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<ProfileThreadSnapshotRecord> profileThreadSnapshotRecordBuilder = new ProfileThreadSnapshotRecord.Builder();
- private final StorageHashMapBuilder<SegmentRecord> segmentRecordBuilder = new SegmentRecord.Builder();
-
- @Override
- public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ProfileThreadSnapshotRecord.INDEX_NAME);
- query = client.addQueryAsterisk(ProfileThreadSnapshotRecord.INDEX_NAME, query);
- query.append(" where ").append(ProfileThreadSnapshotRecord.TASK_ID).append(" = \"").append(taskId).append("\"")
- .append(" and ").append(ProfileThreadSnapshotRecord.SEQUENCE).append(" = 0")
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ProfileThreadSnapshotRecord.INDEX_NAME,
- query.toString(), profileThreadSnapshotRecordBuilder);
- // We can insure the size of List, so use ArrayList to improve visit speed. (Other storage plugin use LinkedList)
- final List<String> segmentIds = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> segmentIds.add(((ProfileThreadSnapshotRecord) storageData).getSegmentId()));
- if (segmentIds.isEmpty()) {
- return Collections.emptyList();
- }
-
- // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
- // https://github.com/apache/iotdb/discussions/3888
- query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, SegmentRecord.INDEX_NAME);
- query = client.addQueryAsterisk(SegmentRecord.INDEX_NAME, query);
- query.append(" where ").append(SegmentRecord.SEGMENT_ID).append(" in (");
- for (String segmentId : segmentIds) {
- query.append("\"").append(segmentId).append("\"").append(", ");
- }
- query.delete(query.length() - 2, query.length()).append(")").append(IoTDBClient.ALIGN_BY_DEVICE);
-
- storageDataList = client.filterQuery(SegmentRecord.INDEX_NAME, query.toString(), segmentRecordBuilder);
- List<SegmentRecord> segmentRecordList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> segmentRecordList.add((SegmentRecord) storageData));
- // resort by self, because of the select query result order by time.
- segmentRecordList.sort((SegmentRecord r1, SegmentRecord r2) -> Long.compare(r2.getStartTime(), r1.getStartTime()));
-
- List<BasicTrace> result = new ArrayList<>(segmentRecordList.size());
- segmentRecordList.forEach(segmentRecord -> {
- BasicTrace basicTrace = new BasicTrace();
- basicTrace.setSegmentId(segmentRecord.getSegmentId());
- basicTrace.setStart(String.valueOf(segmentRecord.getStartTime()));
- basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(segmentRecord.getEndpointId()).getEndpointName());
- basicTrace.setDuration(segmentRecord.getLatency());
- basicTrace.setError(BooleanUtils.valueToBoolean(segmentRecord.getIsError()));
- basicTrace.getTraceIds().add(segmentRecord.getTraceId());
- result.add(basicTrace);
- });
- return result;
- }
-
- @Override
- public int queryMinSequence(String segmentId, long start, long end) throws IOException {
- return querySequenceWithAgg("min_value", segmentId, start, end);
- }
-
- @Override
- public int queryMaxSequence(String segmentId, long start, long end) throws IOException {
- return querySequenceWithAgg("max_value", segmentId, start, end);
- }
-
- @Override
- public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ProfileThreadSnapshotRecord.INDEX_NAME);
- query = client.addQueryAsterisk(ProfileThreadSnapshotRecord.INDEX_NAME, query);
- query.append(" where ").append(ProfileThreadSnapshotRecord.SEGMENT_ID).append(" = \"").append(segmentId).append("\"")
- .append(" and ").append(ProfileThreadSnapshotRecord.SEQUENCE).append(" >= ").append(minSequence)
- .append(" and ").append(ProfileThreadSnapshotRecord.SEQUENCE).append(" <= ").append(maxSequence)
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ProfileThreadSnapshotRecord.INDEX_NAME,
- query.toString(), profileThreadSnapshotRecordBuilder);
- List<ProfileThreadSnapshotRecord> profileThreadSnapshotRecordList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> profileThreadSnapshotRecordList.add((ProfileThreadSnapshotRecord) storageData));
- return profileThreadSnapshotRecordList;
- }
-
- @Override
- public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, SegmentRecord.INDEX_NAME);
- query = client.addQueryAsterisk(SegmentRecord.INDEX_NAME, query);
- query.append(" where ").append(SegmentRecord.SEGMENT_ID).append(" = \"").append(segmentId).append("\"")
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(SegmentRecord.INDEX_NAME,
- query.toString(), segmentRecordBuilder);
- if (storageDataList.isEmpty()) {
- return null;
- }
- return (SegmentRecord) storageDataList.get(0);
- }
-
- private int querySequenceWithAgg(String aggType, String segmentId, long start, long end) throws IOException {
- // This method has poor efficiency. It queries all data which meets a condition without aggregation function
- // See https://github.com/apache/iotdb/discussions/3907
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ProfileThreadSnapshotRecord.INDEX_NAME);
- query = client.addQueryAsterisk(ProfileThreadSnapshotRecord.INDEX_NAME, query);
- query.append(" where ").append(ProfileThreadSnapshotRecord.SEGMENT_ID).append(" = \"").append(segmentId).append("\"")
- .append(" and ").append(ProfileThreadSnapshotRecord.DUMP_TIME).append(" >= ").append(start)
- .append(" and ").append(ProfileThreadSnapshotRecord.DUMP_TIME).append(" <= ").append(end)
- .append(IoTDBClient.ALIGN_BY_DEVICE);
- List<? super StorageData> storageDataList = client.filterQuery(ProfileThreadSnapshotRecord.INDEX_NAME,
- query.toString(), profileThreadSnapshotRecordBuilder);
-
- if (aggType.equals("min_value")) {
- int minValue = Integer.MAX_VALUE;
- for (Object storageData : storageDataList) {
- ProfileThreadSnapshotRecord profileThreadSnapshotRecord = (ProfileThreadSnapshotRecord) storageData;
- int sequence = profileThreadSnapshotRecord.getSequence();
- minValue = Math.min(minValue, sequence);
- }
- return minValue;
- } else if (aggType.equals("max_value")) {
- int maxValue = Integer.MIN_VALUE;
- for (Object storageData : storageDataList) {
- ProfileThreadSnapshotRecord profileThreadSnapshotRecord = (ProfileThreadSnapshotRecord) storageData;
- int sequence = profileThreadSnapshotRecord.getSequence();
- maxValue = Math.max(maxValue, sequence);
- }
- return maxValue;
- } else {
- throw new IOException("Wrong aggregation function");
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAggregationQueryDAO.java
deleted file mode 100644
index 0572b46..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAggregationQueryDAO.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.input.Duration;
-import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
-import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBAggregationQueryDAO implements IAggregationQueryDAO {
- private final IoTDBClient client;
-
- @Override
- public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueColumnName, Duration duration,
- List<KeyValue> additionalConditions) throws IOException {
- // This method maybe have poor efficiency. It queries all data which meets a condition without aggregation function.
- // https://github.com/apache/iotdb/issues/4006
- StringBuilder query = new StringBuilder();
- query.append(String.format("select %s from ", valueColumnName));
- query = client.addModelPath(query, condition.getName());
-
- Map<String, String> indexAndValueMap = new HashMap<>();
- List<KeyValue> measurementConditions = new ArrayList<>();
- if (additionalConditions != null) {
- for (KeyValue additionalCondition : additionalConditions) {
- String key = additionalCondition.getKey();
- if (IoTDBIndexes.isIndex(key)) {
- indexAndValueMap.put(key, additionalCondition.getValue());
- } else {
- measurementConditions.add(additionalCondition);
- }
- }
- }
- if (!indexAndValueMap.isEmpty()) {
- query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
- } else {
- query = client.addQueryAsterisk(condition.getName(), query);
- }
-
- query.append(" where ").append(IoTDBClient.TIME).append(" >= ").append(duration.getStartTimestamp())
- .append(" and ").append(IoTDBClient.TIME).append(" <= ").append(duration.getEndTimestamp());
- if (!measurementConditions.isEmpty()) {
- for (KeyValue measurementCondition : measurementConditions) {
- query.append(" and ").append(measurementCondition.getKey()).append(" = \"")
- .append(measurementCondition.getValue()).append("\"");
- }
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- SessionPool sessionPool = client.getSessionPool();
- SessionDataSetWrapper wrapper = null;
- List<SelectedRecord> topEntities = new ArrayList<>();
- try {
- wrapper = sessionPool.executeQueryStatement(query.toString());
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
- }
-
- Map<String, Double> entityIdAndSumMap = new HashMap<>();
- Map<String, Integer> entityIdAndCountMap = new HashMap<>();
- while (wrapper.hasNext()) {
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
- String entityId = client.layerName2IndexValue(layerNames[2]);
- double value = Double.parseDouble(fields.get(1).getStringValue());
- entityIdAndSumMap.merge(entityId, value, Double::sum);
- entityIdAndCountMap.merge(entityId, 1, Integer::sum);
- }
-
- entityIdAndSumMap.forEach((String entityId, Double sum) -> {
- double count = entityIdAndCountMap.get(entityId);
- double avg = sum / count;
- SelectedRecord topNEntity = new SelectedRecord();
- topNEntity.setId(entityId);
- topNEntity.setValue(String.valueOf((long) avg));
- topEntities.add(topNEntity);
- });
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- throw new IOException(e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
-
- if (condition.getOrder().equals(Order.DES)) {
- topEntities.sort((SelectedRecord t1, SelectedRecord t2) ->
- Double.compare(Double.parseDouble(t2.getValue()), Double.parseDouble(t1.getValue())));
- } else {
- topEntities.sort(Comparator.comparingDouble((SelectedRecord t) -> Double.parseDouble(t.getValue())));
- }
- int limit = condition.getTopN();
- return limit > topEntities.size() ? topEntities : topEntities.subList(0, limit);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAlarmQueryDAO.java
deleted file mode 100644
index 00d7a58..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBAlarmQueryDAO.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
-import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
-import org.apache.skywalking.oap.server.core.query.type.Alarms;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@RequiredArgsConstructor
-public class IoTDBAlarmQueryDAO implements IAlarmQueryDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<AlarmRecord> storageBuilder = new AlarmRecord.Builder();
-
- @Override
- public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
- StringBuilder query = new StringBuilder();
- // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
- // https://github.com/apache/iotdb/discussions/3888
- query.append("select * from ");
- query = client.addModelPath(query, AlarmRecord.INDEX_NAME);
- query = client.addQueryAsterisk(AlarmRecord.INDEX_NAME, query);
-
- StringBuilder where = new StringBuilder(" where ");
- if (Objects.nonNull(scopeId)) {
- where.append(AlarmRecord.SCOPE).append(" = ").append(scopeId).append(" and ");
- }
- if (startTB != 0 && endTB != 0) {
- where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB)).append(" and ");
- where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB)).append(" and ");
- }
- if (!Strings.isNullOrEmpty(keyword)) {
- where.append(AlarmRecord.ALARM_MESSAGE).append(" like '%").append(keyword).append("%'").append(" and ");
- }
- if (CollectionUtils.isNotEmpty(tags)) {
- for (final Tag tag : tags) {
- where.append(tag.getKey()).append(" = \"").append(tag.getValue()).append("\"").append(" and ");
- }
- }
- if (where.length() > 7) {
- int length = where.length();
- where.delete(length - 5, length);
- query.append(where);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- Alarms alarms = new Alarms();
- List<? super StorageData> storageDataList = client.filterQuery(AlarmRecord.INDEX_NAME, query.toString(), storageBuilder);
- int limitCount = 0;
- for (int i = from; i < storageDataList.size(); i++) {
- if (limitCount < limit) {
- limitCount++;
- AlarmRecord alarmRecord = (AlarmRecord) storageDataList.get(i);
- alarms.getMsgs().add(parseMessage(alarmRecord));
- }
- }
- alarms.setTotal(storageDataList.size());
- // resort by self, because of the select query result order by time.
- alarms.getMsgs().sort((AlarmMessage m1, AlarmMessage m2) -> Long.compare(m2.getStartTime(), m1.getStartTime()));
- return alarms;
- }
-
- private AlarmMessage parseMessage(AlarmRecord alarmRecord) {
- AlarmMessage message = new AlarmMessage();
- message.setId(alarmRecord.getId0());
- message.setId1(alarmRecord.getId1());
- message.setMessage(alarmRecord.getAlarmMessage());
- message.setStartTime(alarmRecord.getStartTime());
- message.setScope(Scope.Finder.valueOf(alarmRecord.getScope()));
- message.setScopeId(alarmRecord.getScope());
- if (!CollectionUtils.isEmpty(alarmRecord.getTagsRawData())) {
- parserDataBinary(alarmRecord.getTagsRawData(), message.getTags());
- }
- return message;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBBrowserLogQueryDAO.java
deleted file mode 100644
index c1f6a83..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBBrowserLogQueryDAO.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
-import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
-import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
-import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBBrowserLogQueryDAO implements IBrowserLogQueryDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<BrowserErrorLogRecord> storageBuilder = new BrowserErrorLogRecord.Builder();
-
- @Override
- public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId,
- BrowserErrorCategory category, long startSecondTB,
- long endSecondTB, int limit, int from) throws IOException {
- StringBuilder query = new StringBuilder();
- // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
- // https://github.com/apache/iotdb/discussions/3888
- query.append("select * from ");
- query = client.addModelPath(query, BrowserErrorLogRecord.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- if (StringUtil.isNotEmpty(serviceId)) {
- indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
- }
- query = client.addQueryIndexValue(BrowserErrorLogRecord.INDEX_NAME, query, indexAndValueMap);
-
- StringBuilder where = new StringBuilder(" where ");
- if (startSecondTB != 0 && endSecondTB != 0) {
- where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startSecondTB)).append(" and ");
- where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endSecondTB)).append(" and ");
- }
- if (StringUtil.isNotEmpty(serviceVersionId)) {
- where.append(BrowserErrorLogRecord.SERVICE_VERSION_ID).append(" = \"").append(serviceVersionId).append("\"").append(" and ");
- }
- if (StringUtil.isNotEmpty(pagePathId)) {
- where.append(BrowserErrorLogRecord.PAGE_PATH_ID).append(" = \"").append(pagePathId).append("\"").append(" and ");
- }
- if (Objects.nonNull(category)) {
- where.append(BrowserErrorLogRecord.ERROR_CATEGORY).append(" = ").append(category.getValue()).append(" and ");
- }
- if (where.length() > 7) {
- int length = where.length();
- where.delete(length - 5, length);
- query.append(where);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(BrowserErrorLogRecord.INDEX_NAME, query.toString(), storageBuilder);
- List<BrowserErrorLogRecord> browserErrorLogRecordList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> browserErrorLogRecordList.add((BrowserErrorLogRecord) storageData));
- // resort by self, because of the select query result order by time.
- browserErrorLogRecordList.sort((BrowserErrorLogRecord b1, BrowserErrorLogRecord b2) -> Long.compare(b2.getTimestamp(), b1.getTimestamp()));
- BrowserErrorLogs logs = new BrowserErrorLogs();
- int limitCount = 0;
- for (int i = from; i < browserErrorLogRecordList.size(); i++) {
- if (limitCount < limit) {
- limitCount++;
- BrowserErrorLogRecord record = browserErrorLogRecordList.get(i);
- if (CollectionUtils.isNotEmpty(record.getDataBinary())) {
- BrowserErrorLog log = iotdbParserDataBinary(record.getDataBinary());
- logs.getLogs().add(log);
- }
- }
- }
- logs.setTotal(storageDataList.size());
- return logs;
- }
-
- private BrowserErrorLog iotdbParserDataBinary(byte[] dataBinaryBase64) {
- try {
- BrowserErrorLog log = new BrowserErrorLog();
- org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
- .parseFrom(dataBinaryBase64);
- log.setService(browserErrorLog.getService());
- log.setServiceVersion(browserErrorLog.getServiceVersion());
- log.setTime(browserErrorLog.getTime());
- log.setPagePath(browserErrorLog.getPagePath());
- log.setCategory(ErrorCategory.valueOf(browserErrorLog.getCategory().name().toUpperCase()));
- log.setGrade(browserErrorLog.getGrade());
- log.setMessage(browserErrorLog.getMessage());
- log.setLine(browserErrorLog.getLine());
- log.setCol(browserErrorLog.getCol());
- log.setStack(browserErrorLog.getStack());
- log.setErrorUrl(browserErrorLog.getErrorUrl());
- log.setFirstReportedError(browserErrorLog.getFirstReportedError());
-
- return log;
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEventQueryDAO.java
deleted file mode 100644
index 061dfca..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBEventQueryDAO.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import com.google.common.base.Strings;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.query.PaginationUtils;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.input.Duration;
-import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
-import org.apache.skywalking.oap.server.core.query.type.event.EventType;
-import org.apache.skywalking.oap.server.core.query.type.event.Events;
-import org.apache.skywalking.oap.server.core.query.type.event.Source;
-import org.apache.skywalking.oap.server.core.source.Event;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBEventQueryDAO implements IEventQueryDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<Event> storageBuilder = new Event.Builder();
-
- @Override
- public Events queryEvents(EventQueryCondition condition) throws Exception {
- // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
- // https://github.com/apache/iotdb/discussions/3888
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, Event.INDEX_NAME);
- query = client.addQueryAsterisk(Event.INDEX_NAME, query);
- StringBuilder where = whereSQL(condition);
- if (where.length() > 0) {
- query.append(" where ").append(where);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(Event.INDEX_NAME, query.toString(), storageBuilder);
- final Events events = new Events();
- int limitCount = 0;
- PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
- for (int i = page.getFrom(); i < storageDataList.size(); i++) {
- if (limitCount < page.getLimit()) {
- limitCount++;
- Event event = (Event) storageDataList.get(i);
- events.getEvents().add(parseEvent(event));
- }
- }
- events.setTotal(storageDataList.size());
- // resort by self, because of the select query result order by time.
- final Order order = Objects.isNull(condition.getOrder()) ? Order.DES : condition.getOrder();
- if (Order.DES.equals(order)) {
- events.getEvents().sort(
- (org.apache.skywalking.oap.server.core.query.type.event.Event e1,
- org.apache.skywalking.oap.server.core.query.type.event.Event e2)
- -> Long.compare(e2.getStartTime(), e1.getStartTime()));
- } else {
- events.getEvents().sort(
- Comparator.comparingLong(org.apache.skywalking.oap.server.core.query.type.event.Event::getStartTime));
- }
- return events;
- }
-
- @Override
- public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
- // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
- // https://github.com/apache/iotdb/discussions/3888
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, Event.INDEX_NAME);
- query = client.addQueryAsterisk(Event.INDEX_NAME, query);
- StringBuilder where = whereSQL(conditionList);
- if (where.length() > 0) {
- query.append(" where ").append(where);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(Event.INDEX_NAME, query.toString(), storageBuilder);
- final Events events = new Events();
- EventQueryCondition condition = conditionList.get(0);
- int limitCount = 0;
- PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
- for (int i = page.getFrom(); i < storageDataList.size(); i++) {
- if (limitCount < page.getLimit()) {
- limitCount++;
- Event event = (Event) storageDataList.get(i);
- events.getEvents().add(parseEvent(event));
- }
- }
- events.setTotal(storageDataList.size());
- // resort by self, because of the select query result order by time.
- final Order order = Objects.isNull(condition.getOrder()) ? Order.DES : condition.getOrder();
- if (Order.DES.equals(order)) {
- events.getEvents().sort(
- (org.apache.skywalking.oap.server.core.query.type.event.Event e1,
- org.apache.skywalking.oap.server.core.query.type.event.Event e2)
- -> Long.compare(e2.getStartTime(), e1.getStartTime()));
- } else {
- events.getEvents().sort(
- Comparator.comparingLong(org.apache.skywalking.oap.server.core.query.type.event.Event::getStartTime));
- }
- return events;
- }
-
- private StringBuilder whereSQL(final EventQueryCondition condition) {
- StringBuilder where = new StringBuilder();
- if (!Strings.isNullOrEmpty(condition.getUuid())) {
- where.append(Event.UUID).append(" = \"").append(condition.getUuid()).append("\"").append(" and ");
- }
- final Source source = condition.getSource();
- if (source != null) {
- if (!Strings.isNullOrEmpty(source.getService())) {
- where.append(Event.SERVICE).append(" = \"").append(source.getService()).append("\"").append(" and ");
- }
- if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
- where.append(Event.SERVICE_INSTANCE).append(" = \"").append(source.getServiceInstance()).append("\"").append(" and ");
- }
- if (!Strings.isNullOrEmpty(source.getEndpoint())) {
- where.append(Event.ENDPOINT).append(" = \"").append(source.getEndpoint()).append("\"").append(" and ");
- }
- }
- if (!Strings.isNullOrEmpty(condition.getName())) {
- where.append(Event.NAME).append(" = \"").append(condition.getName()).append("\"").append(" and ");
- }
- if (condition.getType() != null) {
- where.append(Event.TYPE).append(" = \"").append(condition.getType().name()).append("\"").append(" and ");
- }
- final Duration time = condition.getTime();
- if (time != null) {
- if (time.getStartTimestamp() > 0) {
- where.append(Event.START_TIME).append(" > ").append(time.getStartTimestamp()).append(" and ");
- }
- if (time.getEndTimestamp() > 0) {
- where.append(Event.END_TIME).append(" < ").append(time.getEndTimestamp()).append(" and ");
- }
- }
- if (where.length() > 0) {
- int length = where.length();
- where.delete(length - 5, length);
- return where;
- }
- return new StringBuilder();
- }
-
- private StringBuilder whereSQL(final List<EventQueryCondition> conditions) {
- StringBuilder where = new StringBuilder();
- boolean isFirstCondition = true;
- for (EventQueryCondition condition : conditions) {
- StringBuilder subWhere = whereSQL(condition);
- if (subWhere.length() > 0) {
- if (isFirstCondition) {
- where.append("(").append(subWhere).append(")");
- isFirstCondition = false;
- } else {
- where.append(" or (").append(subWhere).append(")");
-
- }
- }
- }
- return where;
- }
-
- private org.apache.skywalking.oap.server.core.query.type.event.Event parseEvent(final Event event) {
- final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
- resultEvent.setUuid(event.getUuid());
- resultEvent.setSource(new Source(event.getService(), event.getServiceInstance(), event.getEndpoint()));
- resultEvent.setName(event.getName());
- resultEvent.setType(EventType.parse(event.getType()));
- resultEvent.setMessage(event.getMessage());
- resultEvent.setParameters(event.getParameters());
- resultEvent.setStartTime(event.getStartTime());
- resultEvent.setEndTime(event.getEndTime());
- return resultEvent;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBLogQueryDAO.java
deleted file mode 100644
index 56f0f9f..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBLogQueryDAO.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.apm.network.logging.v3.LogTags;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
-import org.apache.skywalking.oap.server.core.query.type.ContentType;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
-import org.apache.skywalking.oap.server.core.query.type.Log;
-import org.apache.skywalking.oap.server.core.query.type.Logs;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBLogQueryDAO implements ILogQueryDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<LogRecord> storageBuilder = new LogRecord.Builder();
-
- @Override
- public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId,
- TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB,
- long endTB, List<Tag> tags, List<String> keywordsOfContent,
- List<String> excludingKeywordsOfContent) throws IOException {
- StringBuilder query = new StringBuilder();
- // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
- // https://github.com/apache/iotdb/discussions/3888
- query.append("select * from ");
- query = client.addModelPath(query, LogRecord.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- if (StringUtil.isNotEmpty(serviceId)) {
- indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
- }
- if (Objects.nonNull(relatedTrace) && StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
- indexAndValueMap.put(IoTDBIndexes.TRACE_ID_IDX, relatedTrace.getTraceId());
- }
- query = client.addQueryIndexValue(LogRecord.INDEX_NAME, query, indexAndValueMap);
-
- StringBuilder where = new StringBuilder(" where ");
- if (startTB != 0 && endTB != 0) {
- where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB)).append(" and ");
- where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB)).append(" and ");
- }
- if (StringUtil.isNotEmpty(serviceInstanceId)) {
- where.append(AbstractLogRecord.SERVICE_INSTANCE_ID).append(" = \"").append(serviceInstanceId).append("\"").append(" and ");
- }
- if (StringUtil.isNotEmpty(endpointId)) {
- where.append(AbstractLogRecord.ENDPOINT_ID).append(" = \"").append(endpointId).append("\"").append(" and ");
- }
- if (Objects.nonNull(relatedTrace)) {
- if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
- where.append(AbstractLogRecord.TRACE_SEGMENT_ID).append(" = \"").append(relatedTrace.getSegmentId()).append("\"").append(" and ");
- }
- if (Objects.nonNull(relatedTrace.getSpanId())) {
- where.append(AbstractLogRecord.SPAN_ID).append(" = ").append(relatedTrace.getSpanId()).append(" and ");
- }
- }
- if (CollectionUtils.isNotEmpty(tags)) {
- for (final Tag tag : tags) {
- where.append(tag.getKey()).append(" = \"").append(tag.getValue()).append("\"").append(" and ");
- }
- }
- if (where.length() > 7) {
- int length = where.length();
- where.delete(length - 5, length);
- query.append(where);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- Logs logs = new Logs();
- List<? super StorageData> storageDataList = client.filterQuery(LogRecord.INDEX_NAME, query.toString(), storageBuilder);
- int limitCount = 0;
- for (int i = from; i < storageDataList.size(); i++) {
- if (limitCount < limit) {
- limitCount++;
- LogRecord logRecord = (LogRecord) storageDataList.get(i);
- Log log = new Log();
- log.setServiceId(logRecord.getServiceId());
- log.setServiceInstanceId(logRecord.getServiceInstanceId());
- log.setEndpointId(logRecord.getEndpointId());
- log.setTraceId(logRecord.getTraceId());
- log.setTimestamp(logRecord.getTimestamp());
- log.setContentType(ContentType.instanceOf(logRecord.getContentType()));
- log.setContent(logRecord.getContent());
- if (CollectionUtils.isNotEmpty(logRecord.getTagsRawData())) {
- iotdbParserDataBinary(logRecord.getTagsRawData(), log.getTags());
- }
- logs.getLogs().add(log);
- }
- }
- logs.setTotal(storageDataList.size());
- // resort by self, because of the select query result order by time.
- if (Order.DES.equals(queryOrder)) {
- logs.getLogs().sort((Log l1, Log l2) -> Long.compare(l2.getTimestamp(), l1.getTimestamp()));
- } else {
- logs.getLogs().sort(Comparator.comparingLong(Log::getTimestamp));
- }
- return logs;
- }
-
- private void iotdbParserDataBinary(byte[] tagsRawData, List<KeyValue> tags) {
- try {
- LogTags logTags = LogTags.parseFrom(tagsRawData);
- logTags.getDataList().forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java
deleted file mode 100644
index 46a40f6..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetadataQueryDAO.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import com.google.common.base.Strings;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.analysis.NodeType;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.query.enumeration.Language;
-import org.apache.skywalking.oap.server.core.query.type.Attribute;
-import org.apache.skywalking.oap.server.core.query.type.Database;
-import org.apache.skywalking.oap.server.core.query.type.Endpoint;
-import org.apache.skywalking.oap.server.core.query.type.Service;
-import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBMetadataQueryDAO implements IMetadataQueryDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<ServiceTraffic> serviceBuilder = new ServiceTraffic.Builder();
- private final StorageHashMapBuilder<EndpointTraffic> endpointBuilder = new EndpointTraffic.Builder();
- private final StorageHashMapBuilder<InstanceTraffic> instanceBuilder = new InstanceTraffic.Builder();
-
- @Override
- public List<Service> getAllServices(String group) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(NodeType.Normal.value()));
- if (StringUtil.isNotEmpty(group)) {
- indexAndValueMap.put(IoTDBIndexes.GROUP_IDX, group);
- }
- query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
- List<Service> serviceList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> serviceList.add(buildService((ServiceTraffic) storageData)));
- return serviceList;
- }
-
- @Override
- public List<Service> getAllBrowserServices() throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(NodeType.Browser.value()));
- query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
- List<Service> serviceList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> serviceList.add(buildService((ServiceTraffic) storageData)));
- return serviceList;
- }
-
- @Override
- public List<Database> getAllDatabases() throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(NodeType.Database.value()));
- query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
- List<Database> databaseList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> {
- ServiceTraffic serviceTraffic = (ServiceTraffic) storageData;
- Database database = new Database();
- database.setId(serviceTraffic.id());
- database.setName(serviceTraffic.getName());
- databaseList.add(database);
- });
- return databaseList;
- }
-
- @Override
- public List<Service> searchServices(final NodeType nodeType, final String keyword) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(nodeType.value()));
- query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
- if (!Strings.isNullOrEmpty(keyword)) {
- query.append(" where ").append(ServiceTraffic.NAME).append(" like '%").append(keyword).append("%'");
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
- List<Service> serviceList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> serviceList.add(buildService((ServiceTraffic) storageData)));
- return serviceList;
- }
-
- @Override
- public Service searchService(final NodeType nodeType, final String serviceCode) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, ServiceTraffic.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.NODE_TYPE_IDX, String.valueOf(nodeType.value()));
- query = client.addQueryIndexValue(ServiceTraffic.INDEX_NAME, query, indexAndValueMap);
- query.append(" where ").append(ServiceTraffic.NAME).append(" = \"").append(serviceCode).append("\"")
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(ServiceTraffic.INDEX_NAME, query.toString(), serviceBuilder);
- if (storageDataList.isEmpty()) {
- return null;
- }
- return buildService((ServiceTraffic) storageDataList.get(0));
- }
-
- @Override
- public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, EndpointTraffic.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
- query = client.addQueryIndexValue(EndpointTraffic.INDEX_NAME, query, indexAndValueMap);
- if (!Strings.isNullOrEmpty(keyword)) {
- query.append(" where ").append(EndpointTraffic.NAME).append(" like '%").append(keyword).append("%'");
- }
- query.append(" limit ").append(limit).append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(EndpointTraffic.INDEX_NAME, query.toString(), endpointBuilder);
- List<Endpoint> endpointList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> {
- EndpointTraffic endpointTraffic = (EndpointTraffic) storageData;
- Endpoint endpoint = new Endpoint();
- endpoint.setId(endpointTraffic.id());
- endpoint.setName(endpointTraffic.getName());
- endpointList.add(endpoint);
- });
- return endpointList;
- }
-
- @Override
- public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
- final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, InstanceTraffic.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
- query = client.addQueryIndexValue(InstanceTraffic.INDEX_NAME, query, indexAndValueMap);
- query.append(" where ").append(InstanceTraffic.LAST_PING_TIME_BUCKET).append(" >= ").append(minuteTimeBucket)
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(InstanceTraffic.INDEX_NAME, query.toString(), instanceBuilder);
- List<ServiceInstance> serviceInstanceList = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> {
- InstanceTraffic instanceTraffic = (InstanceTraffic) storageData;
- if (instanceTraffic.getName() == null) {
- instanceTraffic.setName("");
- }
- ServiceInstance serviceInstance = new ServiceInstance();
- serviceInstance.setId(instanceTraffic.id());
- serviceInstance.setName(instanceTraffic.getName());
- serviceInstance.setInstanceUUID(serviceInstance.getId());
-
- JsonObject properties = instanceTraffic.getProperties();
- if (properties != null) {
- for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
- String key = property.getKey();
- String value = property.getValue().getAsString();
- if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
- serviceInstance.setLanguage(Language.value(value));
- } else {
- serviceInstance.getAttributes().add(new Attribute(key, value));
- }
- }
- } else {
- serviceInstance.setLanguage(Language.UNKNOWN);
- }
- serviceInstanceList.add(serviceInstance);
- });
- return serviceInstanceList;
- }
-
- private Service buildService(ServiceTraffic serviceTraffic) {
- Service service = new Service();
- service.setId(serviceTraffic.id());
- service.setName(serviceTraffic.getName());
- service.setGroup(serviceTraffic.getGroup());
- return service;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetricsQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetricsQueryDAO.java
deleted file mode 100644
index eff62d1..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBMetricsQueryDAO.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
-import org.apache.skywalking.oap.server.core.query.PointOfTime;
-import org.apache.skywalking.oap.server.core.query.input.Duration;
-import org.apache.skywalking.oap.server.core.query.input.MetricsCondition;
-import org.apache.skywalking.oap.server.core.query.sql.Function;
-import org.apache.skywalking.oap.server.core.query.type.HeatMap;
-import org.apache.skywalking.oap.server.core.query.type.IntValues;
-import org.apache.skywalking.oap.server.core.query.type.KVInt;
-import org.apache.skywalking.oap.server.core.query.type.MetricsValues;
-import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBMetricsQueryDAO implements IMetricsQueryDAO {
- private final IoTDBClient client;
-
- @Override
- public long readMetricsValue(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException {
- final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
- final Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
- if (function == Function.Latest) {
- return readMetricsValues(condition, valueColumnName, duration).getValues().latestValue(defaultValue);
- }
-
- StringBuilder query = new StringBuilder();
- String op;
- if (function == Function.Avg) {
- op = "avg";
- } else {
- op = "sum";
- }
- query.append(String.format("select %s(%s) from ", op, valueColumnName));
- query = client.addModelPath(query, condition.getName());
- final String entityId = condition.getEntity().buildId();
- if (entityId != null) {
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.ENTITY_ID_IDX, entityId);
- query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
- } else {
- query = client.addQueryAsterisk(condition.getName(), query);
- }
- query.append(" where ").append(String.format("%s >= %s and %s <= %s",
- IoTDBClient.TIME, duration.getStartTimestamp(), IoTDBClient.TIME, duration.getEndTimestamp()))
- .append(" group by level = 3");
-
- List<Double> results = client.queryWithAgg(query.toString());
- if (results.size() > 0) {
- double result = results.get(0);
- return (long) result;
- } else {
- return defaultValue;
- }
- }
-
- @Override
- public MetricsValues readMetricsValues(final MetricsCondition condition,
- final String valueColumnName,
- final Duration duration) throws IOException {
- final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
- final List<String> ids = new ArrayList<>(pointOfTimes.size());
- pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
-
- StringBuilder query = new StringBuilder();
- query.append("select ").append(valueColumnName).append(" from ");
- for (String id : ids) {
- query = client.addModelPath(query, condition.getName());
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.ID_IDX, id);
- query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
- query.append(", ");
- }
- String queryString = query.toString();
- if (ids.size() > 0) {
- queryString = queryString.substring(0, queryString.lastIndexOf(","));
- }
- queryString += IoTDBClient.ALIGN_BY_DEVICE;
-
- SessionPool sessionPool = client.getSessionPool();
- SessionDataSetWrapper wrapper = null;
- MetricsValues metricsValues = new MetricsValues();
- // Label is null, because in readMetricsValues, no label parameter.
- final IntValues intValues = metricsValues.getValues();
- try {
- wrapper = sessionPool.executeQueryStatement(queryString);
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", queryString, wrapper.getColumnNames());
- }
-
- while (wrapper.hasNext()) {
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
- String id = client.layerName2IndexValue(layerNames[1]);
-
- Field valueField = fields.get(1);
- TSDataType valueType = valueField.getDataType();
- long value = 0;
- if (TSDataType.INT32.equals(valueType)) {
- value = valueField.getIntV();
- } else if (TSDataType.INT64.equals(valueType)) {
- value = valueField.getLongV();
- }
-
- KVInt kv = new KVInt();
- kv.setId(id);
- kv.setValue(value);
- intValues.addKVInt(kv);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- throw new IOException(e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
- metricsValues.setValues(Util.sortValues(intValues, ids, ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName())));
- return metricsValues;
- }
-
- @Override
- public List<MetricsValues> readLabeledMetricsValues(MetricsCondition condition, String valueColumnName, List<String> labels, Duration duration) throws IOException {
- final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
- List<String> ids = new ArrayList<>(pointOfTimes.size());
- pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
-
- StringBuilder query = new StringBuilder();
- query.append("select ").append(valueColumnName).append(" from ");
- for (String id : ids) {
- query = client.addModelPath(query, condition.getName());
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.ID_IDX, id);
- query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
- query.append(", ");
- }
- String queryString = query.toString();
- if (ids.size() > 0) {
- queryString = queryString.substring(0, queryString.lastIndexOf(","));
- }
- queryString += IoTDBClient.ALIGN_BY_DEVICE;
-
- SessionPool sessionPool = client.getSessionPool();
- SessionDataSetWrapper wrapper = null;
- Map<String, DataTable> idMap = new HashMap<>();
- try {
- wrapper = sessionPool.executeQueryStatement(queryString);
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", queryString, wrapper.getColumnNames());
- }
-
- while (wrapper.hasNext()) {
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
- String id = client.layerName2IndexValue(layerNames[1]);
-
- DataTable multipleValues = new DataTable(5);
- multipleValues.toObject(fields.get(1).getStringValue());
- idMap.put(id, multipleValues);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- throw new IOException(e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
- return Util.composeLabelValue(condition, labels, ids, idMap);
- }
-
- @Override
- public HeatMap readHeatMap(MetricsCondition condition, String valueColumnName, Duration duration) throws IOException {
- final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
- List<String> ids = new ArrayList<>(pointOfTimes.size());
- pointOfTimes.forEach(pointOfTime -> ids.add(pointOfTime.id(condition.getEntity().buildId())));
-
- StringBuilder query = new StringBuilder();
- query.append("select ").append(valueColumnName).append(" from ");
- for (String id : ids) {
- query = client.addModelPath(query, condition.getName());
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.ID_IDX, id);
- query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
- query.append(", ");
- }
- String queryString = query.toString();
- if (ids.size() > 0) {
- queryString = queryString.substring(0, queryString.lastIndexOf(","));
- }
- queryString += IoTDBClient.ALIGN_BY_DEVICE;
-
- SessionPool sessionPool = client.getSessionPool();
- SessionDataSetWrapper wrapper = null;
- HeatMap heatMap = new HeatMap();
- final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
- try {
- wrapper = sessionPool.executeQueryStatement(queryString);
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", queryString, wrapper.getColumnNames());
- }
-
- while (wrapper.hasNext()) {
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
- String id = client.layerName2IndexValue(layerNames[1]);
-
- heatMap.buildColumn(id, fields.get(1).getStringValue(), defaultValue);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- throw new IOException(e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
- heatMap.fixMissingColumns(ids, defaultValue);
- return heatMap;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopNRecordsQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopNRecordsQueryDAO.java
deleted file mode 100644
index da2f6b3..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopNRecordsQueryDAO.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
-import org.apache.skywalking.oap.server.core.query.enumeration.Order;
-import org.apache.skywalking.oap.server.core.query.input.Duration;
-import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
-import org.apache.skywalking.oap.server.core.query.type.SelectedRecord;
-import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBTableMetaInfo;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBTopNRecordsQueryDAO implements ITopNRecordsQueryDAO {
- private final IoTDBClient client;
-
- @Override
- public List<SelectedRecord> readSampledRecords(TopNCondition condition, String valueColumnName, Duration duration) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select ").append(TopN.STATEMENT).append(", ").append(valueColumnName)
- .append(" from ");
- query = client.addModelPath(query, condition.getName());
- Map<String, String> indexAndValueMap = new HashMap<>();
- if (StringUtil.isNotEmpty(condition.getParentService())) {
- final String serviceId = IDManager.ServiceID.buildId(condition.getParentService(), condition.isNormal());
- indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
- }
- query = client.addQueryIndexValue(condition.getName(), query, indexAndValueMap);
-
- StringBuilder where = new StringBuilder(" where ");
- if (Objects.nonNull(duration)) {
- where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(duration.getStartTimeBucketInSec())).append(" and ");
- where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(duration.getEndTimeBucketInSec()));
- }
- if (where.length() > 7) {
- query.append(where);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- SessionPool sessionPool = client.getSessionPool();
- SessionDataSetWrapper wrapper = null;
- List<SelectedRecord> records = new ArrayList<>();
- try {
- wrapper = sessionPool.executeQueryStatement(query.toString());
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
- }
-
- List<String> indexes = IoTDBTableMetaInfo.get(condition.getName()).getIndexes();
- int traceIdIdx = indexes.indexOf(IoTDBIndexes.TRACE_ID_IDX);
-
- while (wrapper.hasNext()) {
- SelectedRecord record = new SelectedRecord();
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- record.setName(fields.get(1).getStringValue());
-
- String traceId = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"")[traceIdIdx + 1];
- traceId = client.layerName2IndexValue(traceId);
- record.setRefId(traceId);
-
- record.setId(record.getRefId());
- record.setValue(String.valueOf(fields.get(2).getObjectValue(fields.get(2).getDataType())));
- records.add(record);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- throw new IOException(e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
-
- // resort by self, because of the select query result order by time.
- if (Order.DES.equals(condition.getOrder())) {
- records.sort((SelectedRecord s1, SelectedRecord s2) ->
- Long.compare(Long.parseLong(s2.getValue()), Long.parseLong(s1.getValue())));
- } else {
- records.sort(Comparator.comparingLong((SelectedRecord s) -> Long.parseLong(s.getValue())));
- }
- return records;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopologyQueryDAO.java
deleted file mode 100644
index dd343b8..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTopologyQueryDAO.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.pool.SessionDataSetWrapper;
-import org.apache.iotdb.session.pool.SessionPool;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.query.type.Call;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-
-@Slf4j
-@RequiredArgsConstructor
-public class IoTDBTopologyQueryDAO implements ITopologyQueryDAO {
- private final IoTDBClient client;
-
- @Override
- public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB,
- List<String> serviceIds) throws IOException {
- return loadServiceCalls(
- ServiceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
- serviceIds, DetectPoint.SERVER);
- }
-
- @Override
- public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB,
- List<String> serviceIds) throws IOException {
- return loadServiceCalls(
- ServiceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
- serviceIds, DetectPoint.CLIENT);
- }
-
- @Override
- public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB) throws IOException {
- return loadServiceCalls(
- ServiceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
- new ArrayList<>(0), DetectPoint.SERVER);
- }
-
- @Override
- public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB) throws IOException {
- return loadServiceCalls(
- ServiceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationClientSideMetrics.DEST_SERVICE_ID,
- new ArrayList<>(0), DetectPoint.CLIENT);
- }
-
- @Override
- public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId,
- String serverServiceId,
- long startTB, long endTB) throws IOException {
- return loadServiceInstanceCalls(
- ServiceInstanceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
- ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
- clientServiceId, serverServiceId, DetectPoint.SERVER);
- }
-
- @Override
- public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId,
- String serverServiceId,
- long startTB, long endTB) throws IOException {
- return loadServiceInstanceCalls(
- ServiceInstanceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
- ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
- clientServiceId, serverServiceId, DetectPoint.CLIENT);
- }
-
- @Override
- public List<Call.CallDetail> loadEndpointRelation(long startTB, long endTB, String destEndpointId) throws IOException {
- List<Call.CallDetail> calls = loadEndpointFromSide(
- EndpointRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
- EndpointRelationServerSideMetrics.DEST_ENDPOINT,
- destEndpointId, false);
- calls.addAll(loadEndpointFromSide(
- EndpointRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
- EndpointRelationServerSideMetrics.DEST_ENDPOINT,
- destEndpointId, true));
- return calls;
- }
-
- private List<Call.CallDetail> loadServiceCalls(String tableName, long startTB, long endTB,
- String sourceCName, String destCName,
- List<String> serviceIds, DetectPoint detectPoint) throws IOException {
- // This method don't use "group by" like other storage plugin.
- StringBuilder query = new StringBuilder();
- query.append("select ").append(ServiceRelationServerSideMetrics.COMPONENT_ID).append(" from ");
- query = client.addModelPath(query, tableName);
- query = client.addQueryAsterisk(tableName, query);
- query.append(" where ").append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB))
- .append(" and ").append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB));
- if (serviceIds.size() > 0) {
- query.append(" and (");
- for (int i = 0; i < serviceIds.size(); i++) {
- query.append(sourceCName).append(" = \"").append(serviceIds.get(i))
- .append("\" or ")
- .append(destCName).append(" = \"").append(serviceIds.get(i)).append("\"");
- if (i != serviceIds.size() - 1) {
- query.append(" or ");
- }
- }
- query.append(")");
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- SessionPool sessionPool = client.getSessionPool();
- SessionDataSetWrapper wrapper = null;
- List<Call.CallDetail> calls = new ArrayList<>();
- try {
- wrapper = sessionPool.executeQueryStatement(query.toString());
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
- }
-
- while (wrapper.hasNext()) {
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- Call.CallDetail call = new Call.CallDetail();
- String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
- String entityId = client.layerName2IndexValue(layerNames[2]);
- final int componentId = fields.get(1).getIntV();
- call.buildFromServiceRelation(entityId, componentId, detectPoint);
- calls.add(call);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- throw new IOException(e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
- return calls;
- }
-
- private List<Call.CallDetail> loadServiceInstanceCalls(String tableName, long startTB, long endTB,
- String sourceCName, String descCName,
- String sourceServiceId, String destServiceId,
- DetectPoint detectPoint) throws IOException {
- // This method don't use "group by" like other storage plugin.
- StringBuilder query = new StringBuilder();
- query.append("select ").append(ServiceInstanceRelationServerSideMetrics.COMPONENT_ID).append(" from ");
- query = client.addModelPath(query, tableName);
- query = client.addQueryAsterisk(tableName, query);
- query.append(" where ").append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB))
- .append(" and ").append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB));
- query.append(" and ((").append(sourceCName).append(" = \"").append(sourceServiceId).append("\"")
- .append(" and ").append(descCName).append(" = \"").append(destServiceId).append("\"")
- .append(") or (").append(sourceCName).append(" = \"").append(destServiceId).append("\"")
- .append(" and ").append(descCName).append(" = \"").append(sourceServiceId).append(" \"))")
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- SessionPool sessionPool = client.getSessionPool();
- SessionDataSetWrapper wrapper = null;
- List<Call.CallDetail> calls = new ArrayList<>();
- try {
- wrapper = sessionPool.executeQueryStatement(query.toString());
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
- }
- while (wrapper.hasNext()) {
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- Call.CallDetail call = new Call.CallDetail();
- String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
- String entityId = client.layerName2IndexValue(layerNames[2]);
- final int componentId = fields.get(1).getIntV();
- call.buildFromInstanceRelation(entityId, componentId, detectPoint);
- calls.add(call);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- throw new IOException(e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
- return calls;
- }
-
- private List<Call.CallDetail> loadEndpointFromSide(String tableName, long startTB, long endTB,
- String sourceCName, String destCName,
- String id, boolean isSourceId) throws IOException {
- // This method don't use "group by" like other storage plugin.
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, tableName);
- query = client.addQueryAsterisk(tableName, query);
- query.append(" where ").append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startTB))
- .append(" and ").append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endTB));
- query.append(" and ").append(isSourceId ? sourceCName : destCName).append(" = \"").append(id).append("\"")
- .append(IoTDBClient.ALIGN_BY_DEVICE);
-
- SessionPool sessionPool = client.getSessionPool();
- SessionDataSetWrapper wrapper = null;
- List<Call.CallDetail> calls = new ArrayList<>();
- try {
- wrapper = sessionPool.executeQueryStatement(query.toString());
- if (log.isDebugEnabled()) {
- log.debug("SQL: {}, columnNames: {}", query, wrapper.getColumnNames());
- }
- while (wrapper.hasNext()) {
- RowRecord rowRecord = wrapper.next();
- List<Field> fields = rowRecord.getFields();
- Call.CallDetail call = new Call.CallDetail();
- String[] layerNames = fields.get(0).getStringValue().split("\\" + IoTDBClient.DOT + "\"");
- String entityId = client.layerName2IndexValue(layerNames[2]);
- call.buildFromEndpointRelation(entityId, DetectPoint.SERVER);
- calls.add(call);
- }
- } catch (IoTDBConnectionException | StatementExecutionException e) {
- throw new IOException(e);
- } finally {
- if (wrapper != null) {
- sessionPool.closeResultSet(wrapper);
- }
- }
- return calls;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java
deleted file mode 100644
index d88bdf7..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/iotdb/query/IoTDBTraceQueryDAO.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.iotdb.query;
-
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
-import org.apache.skywalking.oap.server.core.query.type.Span;
-import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
-import org.apache.skywalking.oap.server.core.query.type.TraceState;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBClient;
-import org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBIndexes;
-
-@RequiredArgsConstructor
-public class IoTDBTraceQueryDAO implements ITraceQueryDAO {
- private final IoTDBClient client;
- private final StorageHashMapBuilder<SegmentRecord> storageBuilder = new SegmentRecord.Builder();
-
- @Override
- public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration,
- String serviceId, String serviceInstanceId,
- String endpointId, String traceId, int limit, int from,
- TraceState traceState, QueryOrder queryOrder, List<Tag> tags)
- throws IOException {
- StringBuilder query = new StringBuilder();
- // This method maybe have poor efficiency. It queries all data which meets a condition without select function.
- // https://github.com/apache/iotdb/discussions/3888
- query.append("select * from ");
- query = client.addModelPath(query, SegmentRecord.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- if (StringUtil.isNotEmpty(serviceId)) {
- indexAndValueMap.put(IoTDBIndexes.SERVICE_ID_IDX, serviceId);
- }
- if (!Strings.isNullOrEmpty(traceId)) {
- indexAndValueMap.put(IoTDBIndexes.TRACE_ID_IDX, traceId);
- }
- query = client.addQueryIndexValue(SegmentRecord.INDEX_NAME, query, indexAndValueMap);
-
- StringBuilder where = new StringBuilder(" where ");
- if (startSecondTB != 0 && endSecondTB != 0) {
- where.append(IoTDBClient.TIME).append(" >= ").append(TimeBucket.getTimestamp(startSecondTB)).append(" and ");
- where.append(IoTDBClient.TIME).append(" <= ").append(TimeBucket.getTimestamp(endSecondTB)).append(" and ");
- }
- if (minDuration != 0) {
- where.append(SegmentRecord.LATENCY).append(" >= ").append(minDuration).append(" and ");
- }
- if (maxDuration != 0) {
- where.append(SegmentRecord.LATENCY).append(" <= ").append(maxDuration).append(" and ");
- }
- if (StringUtil.isNotEmpty(serviceInstanceId)) {
- where.append(SegmentRecord.SERVICE_INSTANCE_ID).append(" = \"").append(serviceInstanceId).append("\"").append(" and ");
- }
- if (!Strings.isNullOrEmpty(endpointId)) {
- where.append(SegmentRecord.ENDPOINT_ID).append(" = \"").append(endpointId).append("\"").append(" and ");
- }
- if (CollectionUtils.isNotEmpty(tags)) {
- for (final Tag tag : tags) {
- where.append(tag.getKey()).append(" = \"").append(tag.getValue()).append("\"").append(" and ");
- }
- }
- switch (traceState) {
- case ERROR:
- where.append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.TRUE).append(" and ");
- break;
- case SUCCESS:
- where.append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.FALSE).append(" and ");
- break;
- }
- if (where.length() > 7) {
- int length = where.length();
- where.delete(length - 5, length);
- query.append(where);
- }
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- TraceBrief traceBrief = new TraceBrief();
- List<? super StorageData> storageDataList = client.filterQuery(SegmentRecord.INDEX_NAME, query.toString(), storageBuilder);
- int limitCount = 0;
- for (int i = from; i < storageDataList.size(); i++) {
- if (limitCount < limit) {
- limitCount++;
- SegmentRecord segmentRecord = (SegmentRecord) storageDataList.get(i);
- BasicTrace basicTrace = new BasicTrace();
- basicTrace.setSegmentId(segmentRecord.getSegmentId());
- basicTrace.setStart(String.valueOf(segmentRecord.getStartTime()));
- basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(segmentRecord.getEndpointId()).getEndpointName());
- basicTrace.setDuration(segmentRecord.getLatency());
- basicTrace.setError(BooleanUtils.valueToBoolean(segmentRecord.getIsError()));
- basicTrace.getTraceIds().add(segmentRecord.getTraceId());
- traceBrief.getTraces().add(basicTrace);
- }
- }
- traceBrief.setTotal(storageDataList.size());
- // resort by self, because of the select query result order by time.
- switch (queryOrder) {
- case BY_START_TIME:
- traceBrief.getTraces().sort((BasicTrace b1, BasicTrace b2) ->
- Long.compare(Long.parseLong(b2.getStart()), Long.parseLong(b1.getStart())));
- break;
- case BY_DURATION:
- traceBrief.getTraces().sort((BasicTrace b1, BasicTrace b2) ->
- Integer.compare(b2.getDuration(), b1.getDuration()));
- break;
- }
- return traceBrief;
- }
-
- @Override
- public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
- StringBuilder query = new StringBuilder();
- query.append("select * from ");
- query = client.addModelPath(query, SegmentRecord.INDEX_NAME);
- Map<String, String> indexAndValueMap = new HashMap<>();
- indexAndValueMap.put(IoTDBIndexes.TRACE_ID_IDX, traceId);
- query = client.addQueryIndexValue(SegmentRecord.INDEX_NAME, query, indexAndValueMap);
- query.append(IoTDBClient.ALIGN_BY_DEVICE);
-
- List<? super StorageData> storageDataList = client.filterQuery(SegmentRecord.INDEX_NAME, query.toString(), storageBuilder);
- List<SegmentRecord> segmentRecords = new ArrayList<>(storageDataList.size());
- storageDataList.forEach(storageData -> segmentRecords.add((SegmentRecord) storageData));
- return segmentRecords;
- }
-
- @Override
- public List<Span> doFlexibleTraceQuery(String traceId) {
- return Collections.emptyList();
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
deleted file mode 100644
index 3cdccc3..0000000
--- a/oap-server/server-storage-plugin/storage-iotdb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-
-org.apache.skywalking.oap.server.storage.plugin.iotdb.IoTDBStorageProvider
\ No newline at end of file
diff --git a/test/e2e-v2/cases/alarm/iotdb/docker-compose.yml b/test/e2e-v2/cases/alarm/iotdb/docker-compose.yml
deleted file mode 100644
index be0a45d..0000000
--- a/test/e2e-v2/cases/alarm/iotdb/docker-compose.yml
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: '2.1'
-
-services:
- iotdb:
- image: apache/iotdb:0.12.3-node
- expose:
- - 6667
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: oap
- environment:
- SW_STORAGE: iotdb
- SW_SEARCHABLE_ALARM_TAG_KEYS: level,receivers
- ports:
- - 12800
- depends_on:
- iotdb:
- condition: service_healthy
- volumes:
- - ../alarm-settings.yml:/skywalking/config/alarm-settings.yml
-
- provider:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: provider
- ports:
- - 9090
- depends_on:
- oap:
- condition: service_healthy
-
-networks:
- e2e:
diff --git a/test/e2e-v2/cases/alarm/iotdb/e2e.yaml b/test/e2e-v2/cases/alarm/iotdb/e2e.yaml
deleted file mode 100644
index a44a76b..0000000
--- a/test/e2e-v2/cases/alarm/iotdb/e2e.yaml
+++ /dev/null
@@ -1,45 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This file is used to show how to write configuration files and can be used to test.
-
-setup:
- env: compose
- file: docker-compose.yml
- timeout: 20m
- init-system-environment: ../../../script/env
- steps:
- - name: install yq
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- - name: install swctl
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
-
-trigger:
- action: http
- interval: 3s
- times: 10
- url: http://${provider_host}:${provider_9090}/users
- method: POST
- body: '{"id":"123","name":"skywalking"}'
- headers:
- "Content-Type": "application/json"
-
-verify:
- retry:
- count: 20
- interval: 3s
- cases:
- - includes:
- - ../alarm-cases.yaml
diff --git a/test/e2e-v2/cases/cluster/zk/iotdb/docker-compose.yml b/test/e2e-v2/cases/cluster/zk/iotdb/docker-compose.yml
deleted file mode 100644
index 0abf7ee..0000000
--- a/test/e2e-v2/cases/cluster/zk/iotdb/docker-compose.yml
+++ /dev/null
@@ -1,111 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: '2.1'
-
-services:
- zk:
- image: zookeeper:3.5
- expose:
- - 2181
- networks:
- - e2e
- healthcheck:
- test: [ "CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/2181" ]
- interval: 5s
- timeout: 60s
- retries: 120
-
- iotdb:
- image: apache/iotdb:0.12.3-node
- expose:
- - 6667
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap1:
- extends:
- file: ../../../../script/docker-compose/base-compose.yml
- service: oap
- environment:
- SW_CLUSTER: zookeeper
- SW_STORAGE: iotdb
- depends_on:
- zk:
- condition: service_healthy
- iotdb:
- condition: service_healthy
-
- oap2:
- extends:
- file: ../../../../script/docker-compose/base-compose.yml
- service: oap
- environment:
- SW_CLUSTER: zookeeper
- SW_STORAGE: iotdb
- depends_on:
- zk:
- condition: service_healthy
- iotdb:
- condition: service_healthy
- oap1:
- condition: service_healthy
-
- ui:
- extends:
- file: ../../../../script/docker-compose/base-compose.yml
- service: ui
- environment:
- - SW_OAP_ADDRESS=http://oap1:12800,http://oap2:12800
- depends_on:
- oap1:
- condition: service_healthy
- oap2:
- condition: service_healthy
- ports:
- - 8080
-
- provider1:
- extends:
- file: ../../../../script/docker-compose/base-compose.yml
- service: provider
- environment:
- SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap2:11800
- SW_AGENT_INSTANCE_NAME: provider1
- depends_on:
- oap2:
- condition: service_healthy
-
- consumer:
- extends:
- file: ../../../../script/docker-compose/base-compose.yml
- service: consumer
- environment:
- SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap1:11800
- PROVIDER_URL: http://provider1:9090
- depends_on:
- oap1:
- condition: service_healthy
- provider1:
- condition: service_healthy
- ports:
- - 9092
-networks:
- e2e:
diff --git a/test/e2e-v2/cases/cluster/zk/iotdb/e2e.yaml b/test/e2e-v2/cases/cluster/zk/iotdb/e2e.yaml
deleted file mode 100644
index f7b4fee..0000000
--- a/test/e2e-v2/cases/cluster/zk/iotdb/e2e.yaml
+++ /dev/null
@@ -1,48 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This file is used to show how to write configuration files and can be used to test.
-
-setup:
- env: compose
- file: docker-compose.yml
- timeout: 20m
- init-system-environment: ../../../../script/env
- steps:
- - name: install yq
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- - name: install swctl
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
-
-trigger:
- action: http
- interval: 3s
- times: 10
- url: http://${consumer_host}:${consumer_9092}/users
- method: POST
- body: '{"id":"123","name":"skywalking"}'
- headers:
- "Content-Type": "application/json"
-
-verify:
- # verify with retry strategy
- retry:
- # max retry count
- count: 20
- # the interval between two retries, in millisecond.
- interval: 3s
- cases:
- - includes:
- - ../../cluster-cases.yaml
diff --git a/test/e2e-v2/cases/event/iotdb/docker-compose.yml b/test/e2e-v2/cases/event/iotdb/docker-compose.yml
deleted file mode 100644
index d5639a5..0000000
--- a/test/e2e-v2/cases/event/iotdb/docker-compose.yml
+++ /dev/null
@@ -1,45 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: '3.8'
-
-services:
- iotdb:
- image: apache/iotdb:0.12.3-node
- expose:
- - 6667
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: oap
- environment:
- SW_STORAGE: iotdb
- depends_on:
- iotdb:
- condition: service_healthy
- ports:
- - 11800
- - 12800
-
-networks:
- e2e:
diff --git a/test/e2e-v2/cases/event/iotdb/e2e.yaml b/test/e2e-v2/cases/event/iotdb/e2e.yaml
deleted file mode 100644
index fe22de0..0000000
--- a/test/e2e-v2/cases/event/iotdb/e2e.yaml
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This file is used to show how to write configuration files and can be used to test.
-
-setup:
- env: compose
- file: docker-compose.yml
- timeout: 20m
- init-system-environment: ../../../script/env
- steps:
- - name: install yq
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- - name: install swctl
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
-
-verify:
- retry:
- count: 20
- interval: 3s
- cases:
- - includes:
- - ../event-cases.yaml
diff --git a/test/e2e-v2/cases/log/iotdb/docker-compose.yml b/test/e2e-v2/cases/log/iotdb/docker-compose.yml
deleted file mode 100644
index ac0c160..0000000
--- a/test/e2e-v2/cases/log/iotdb/docker-compose.yml
+++ /dev/null
@@ -1,59 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: '2.1'
-
-services:
- iotdb:
- image: apache/iotdb:0.12.3-node
- expose:
- - 6667
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap:
- extends:
- file: ../log-base-compose.yml
- service: oap
- environment:
- SW_STORAGE: iotdb
- SW_STORAGE_IOTDB_HOST: iotdb
- ports:
- - 12800
- networks:
- - e2e
- depends_on:
- iotdb:
- condition: service_healthy
-
- provider:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: provider
- ports:
- - 9090
- networks:
- - e2e
- depends_on:
- oap:
- condition: service_healthy
-
-networks:
- e2e:
diff --git a/test/e2e-v2/cases/log/iotdb/e2e.yaml b/test/e2e-v2/cases/log/iotdb/e2e.yaml
deleted file mode 100644
index 1edeacf..0000000
--- a/test/e2e-v2/cases/log/iotdb/e2e.yaml
+++ /dev/null
@@ -1,48 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This file is used to show how to write configuration files and can be used to test.
-
-setup:
- env: compose
- file: docker-compose.yml
- timeout: 20m
- init-system-environment: ../../../script/env
- steps:
- - name: install yq
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- - name: install swctl
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
-
-trigger:
- action: http
- interval: 3s
- times: 10
- url: http://${provider_host}:${provider_9090}/users
- method: POST
- body: '{"id":"123","name":"skywalking"}'
- headers:
- "Content-Type": "application/json"
-
-verify:
- # verify with retry strategy
- retry:
- # max retry count
- count: 20
- # the interval between two retries, in millisecond.
- interval: 10s
- cases:
- - includes:
- - ../log-cases.yaml
diff --git a/test/e2e-v2/cases/profile/iotdb/docker-compose.yml b/test/e2e-v2/cases/profile/iotdb/docker-compose.yml
deleted file mode 100644
index 1292925..0000000
--- a/test/e2e-v2/cases/profile/iotdb/docker-compose.yml
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: '2.1'
-
-services:
- iotdb:
- image: apache/iotdb:0.12.3-node
- expose:
- - 6667
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- provider:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: provider
- depends_on:
- oap:
- condition: service_healthy
- ports:
- - 9090
-
- oap:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: oap
- environment:
- SW_STORAGE: iotdb
- depends_on:
- iotdb:
- condition: service_healthy
- ports:
- - 12800
-
-networks:
- e2e:
diff --git a/test/e2e-v2/cases/profile/iotdb/e2e.yaml b/test/e2e-v2/cases/profile/iotdb/e2e.yaml
deleted file mode 100644
index 107ef28..0000000
--- a/test/e2e-v2/cases/profile/iotdb/e2e.yaml
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This file is used to show how to write configuration files and can be used to test.
-
-setup:
- env: compose
- file: docker-compose.yml
- timeout: 20m
- init-system-environment: ../../../script/env
- steps:
- - name: install yq
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- - name: install swctl
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
-
-verify:
- retry:
- count: 20
- interval: 3s
- cases:
- - includes:
- - ../profile-cases.yaml
diff --git a/test/e2e-v2/cases/storage/iotdb/docker-compose.yml b/test/e2e-v2/cases/storage/iotdb/docker-compose.yml
deleted file mode 100644
index 7626b2f..0000000
--- a/test/e2e-v2/cases/storage/iotdb/docker-compose.yml
+++ /dev/null
@@ -1,70 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: '2.1'
-
-services:
- iotdb:
- image: apache/iotdb:0.12.3-node
- expose:
- - 6667
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: oap
- environment:
- SW_STORAGE: iotdb
- ports:
- - 12800
- networks:
- - e2e
- depends_on:
- iotdb:
- condition: service_healthy
-
- provider:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: provider
- ports:
- - 9090
- networks:
- - e2e
- depends_on:
- oap:
- condition: service_healthy
-
- consumer:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: consumer
- ports:
- - 9092
- depends_on:
- oap:
- condition: service_healthy
- provider:
- condition: service_healthy
-
-networks:
- e2e:
diff --git a/test/e2e-v2/cases/storage/iotdb/e2e.yaml b/test/e2e-v2/cases/storage/iotdb/e2e.yaml
deleted file mode 100644
index 73f746c..0000000
--- a/test/e2e-v2/cases/storage/iotdb/e2e.yaml
+++ /dev/null
@@ -1,129 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This file is used to show how to write configuration files and can be used to test.
-
-setup:
- env: compose
- file: docker-compose.yml
- timeout: 20m
- init-system-environment: ../../../script/env
- steps:
- - name: install yq
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- - name: install swctl
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
-
-trigger:
- action: http
- interval: 3s
- times: 10
- url: http://${consumer_host}:${consumer_9092}/info
- method: POST
-
-verify:
- # verify with retry strategy
- retry:
- # max retry count
- count: 20
- # the interval between two retries, in millisecond.
- interval: 10s
- cases:
- # service list
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls
- expected: ../expected/service.yml
- # service metrics
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name service_sla --service-name=e2e-service-provider | yq e 'to_entries' -
- expected: ../expected/metrics-has-value.yml
- # service endpoint
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql endpoint list --keyword=info --service-name=e2e-service-provider
- expected: ../expected/service-endpoint.yml
- # service endpoint metrics
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name endpoint_cpm --endpoint-name=POST:/info --service-name=e2e-service-provider |yq e 'to_entries' -
- expected: ../expected/metrics-has-value.yml
- # dependency service
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql dependency service --service-name="e2e-service-provider"
- expected: ../expected/dependency-services.yml
- # service instance list
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
- expected: ../expected/service-instance.yml
- # service instance jvm metrics
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider | yq e 'to_entries' -
- expected: ../expected/metrics-has-value.yml
-
- # trace segment list
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls
- expected: ../expected/traces-list.yml
- # native tracing: trace detail
- - query: |
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $( \
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls \
- | yq e '.traces | select(.[].endpointnames[0]=="POST:/info") | .[0].traceids[0]' - \
- )
- expected: ../expected/trace-info-detail.yml
-
- # native event: event list
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql event list
- expected: ../expected/event-list.yml
-
- # native profile: create task
- - query: |
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql \
- profile create --service-name=e2e-service-provider \
- --endpoint-name=POST:/info \
- --start-time=$((($(date +%s)+5)*1000)) \
- --duration=1 --min-duration-threshold=0 \
- --dump-period=10 --max-sampling-count=9
- expected: ../expected/profile-create.yml
- # native profile: sleep to wait agent notices and query profile list
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile list -service-name=e2e-service-provider --endpoint-name=POST:/info
- expected: ../expected/profile-list.yml
-
- # native profile: sleep to wait segment report and query profiled segment list
- - query: |
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile segment-list --task-id=$( \
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile list --service-name=e2e-service-provider --endpoint-name=POST:/info | yq e '.[0].id' - \
- )
- expected: ../expected/profile-segment-list.yml
-
- # native profile: query profiled segment
- - query: |
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile profiled-segment --segment-id=$( \
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile segment-list --task-id=$( \
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile list --service-name=e2e-service-provider --endpoint-name=POST:/info | yq e '.[0].id' - \
- ) | yq e '.[0].segmentid' - \
- )
- expected: ../expected/profile-segment-detail.yml
-
- # native profile: query profiled segment analyze
- - query: |
- segmentid=$( \
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile segment-list --task-id=$( \
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile list --service-name=e2e-service-provider --endpoint-name=POST:/info | yq e '.[0].id' - \
- ) | yq e '.[0].segmentid' - \
- );
- start=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile profiled-segment --segment-id=$segmentid|yq e '.spans.[0].starttime' -);
- end=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile profiled-segment --segment-id=$segmentid|yq e '.spans.[0].endtime' -);
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profile profiled-analyze --segment-id=$segmentid --time-ranges=$(echo $start"-"$end)
- expected: ../expected/profile-segment-analyze.yml
-
- - query: |
- curl -s -XPOST http://${provider_host}:${provider_9090}/users -d '{"id":"123","name":"SinglesBar"}' -H "Content-Type: application/json" > /dev/null;
- sleep 5;
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $( \
- swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls --order startTime --service-name "e2e-service-provider" --endpoint-name "POST:/users" \
- | yq e '.traces[0].traceids[0]' - \
- )
- expected: ../expected/trace-users-detail.yml
diff --git a/test/e2e-v2/cases/ttl/iotdb/docker-compose.yml b/test/e2e-v2/cases/ttl/iotdb/docker-compose.yml
deleted file mode 100644
index 2a5c238..0000000
--- a/test/e2e-v2/cases/ttl/iotdb/docker-compose.yml
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-version: '2.1'
-
-services:
- iotdb:
- image: apache/iotdb:0.12.3-node
- expose:
- - 6667
- networks:
- - e2e
- healthcheck:
- test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/6667"]
- interval: 5s
- timeout: 60s
- retries: 120
-
- oap:
- extends:
- file: ../../../script/docker-compose/base-compose.yml
- service: oap
- environment:
- SW_STORAGE: iotdb
- SW_CORE_DATA_KEEPER_EXECUTE_PERIOD: 1
- SW_CORE_METRICS_DATA_TTL: 7
- depends_on:
- iotdb:
- condition: service_healthy
- ports:
- - 12800
-
- sender:
- image: "adoptopenjdk/openjdk8:alpine-jre"
- volumes:
- - ./../../../java-test-service/e2e-mock-sender/target/e2e-mock-sender-2.0.0.jar:/e2e-mock-sender-2.0.0.jar
- command: [ "java", "-jar", "/e2e-mock-sender-2.0.0.jar" ]
- environment:
- OAP_HOST: oap
- OAP_GRPC_PORT: 11800
- networks:
- - e2e
- ports:
- - 9093
- healthcheck:
- test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"]
- interval: 5s
- timeout: 60s
- retries: 120
- depends_on:
- oap:
- condition: service_healthy
-networks:
- e2e:
diff --git a/test/e2e-v2/cases/ttl/iotdb/e2e.yaml b/test/e2e-v2/cases/ttl/iotdb/e2e.yaml
deleted file mode 100644
index 3793093..0000000
--- a/test/e2e-v2/cases/ttl/iotdb/e2e.yaml
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This file is used to show how to write configuration files and can be used to test.
-
-setup:
- env: compose
- file: docker-compose.yml
- timeout: 20m
- init-system-environment: ../../../script/env
- steps:
- - name: install yq
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- - name: install swctl
- command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
-
-verify:
- retry:
- count: 30
- interval: 3s
- cases:
- - includes:
- - ../ttl-cases.yaml
diff --git a/test/e2e-v2/script/docker-compose/base-compose.yml b/test/e2e-v2/script/docker-compose/base-compose.yml
index 07d51d1..26e6ccc 100644
--- a/test/e2e-v2/script/docker-compose/base-compose.yml
+++ b/test/e2e-v2/script/docker-compose/base-compose.yml
@@ -33,7 +33,6 @@ services:
SW_STORAGE_ES_CLUSTER_NODES: es:9200
SW_JDBC_URL: jdbc:mysql://mysql:3306/swtest
SW_STORAGE_INFLUXDB_URL: http://influxdb:8086
- SW_STORAGE_IOTDB_HOST: iotdb
SW_CONFIG_ETCD_PERIOD: 1
SW_CONFIG_ETCD_ENDPOINTS: http://etcd:2379
SW_CLUSTER_ETCD_ENDPOINTS: http://etcd:2379
diff --git a/tools/dependencies/known-oap-backend-dependencies.txt b/tools/dependencies/known-oap-backend-dependencies.txt
index 0c1ad0b..ad972f8 100755
--- a/tools/dependencies/known-oap-backend-dependencies.txt
+++ b/tools/dependencies/known-oap-backend-dependencies.txt
@@ -61,8 +61,6 @@ httpclient-4.5.13.jar
httpcore-4.4.13.jar
httpcore-nio-4.4.13.jar
influxdb-java-2.15.jar
-iotdb-session-0.12.3.jar
-iotdb-thrift-0.12.3.jar
jackson-annotations-2.12.2.jar
jackson-core-2.12.2.jar
jackson-databind-2.12.2.jar
@@ -91,14 +89,11 @@ jsr305-3.0.2.jar
kafka-clients-2.4.1.jar
kotlin-reflect-1.1.1.jar
kotlin-stdlib-1.1.60.jar
-libthrift-0.14.1.jar
listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
log4j-api-2.14.1.jar
log4j-core-2.14.1.jar
log4j-over-slf4j-1.7.30.jar
log4j-slf4j-impl-2.14.1.jar
-logback-classic-1.2.3.jar
-logback-core-1.2.3.jar
logging-interceptor-3.13.1.jar
lz4-java-1.6.0.jar
micrometer-core-1.7.4.jar
@@ -137,7 +132,6 @@ protobuf-java-util-3.17.3.jar
reactive-streams-1.0.2.jar
reflectasm-1.11.7.jar
retrofit-2.5.0.jar
-service-rpc-0.12.3.jar
simpleclient-0.6.0.jar
simpleclient_common-0.6.0.jar
simpleclient_hotspot-0.6.0.jar
@@ -146,7 +140,6 @@ slf4j-api-1.7.30.jar
snakeyaml-1.28.jar
snappy-java-1.1.7.3.jar
swagger-annotations-1.6.3.jar
-tsfile-0.12.3.jar
vavr-0.10.3.jar
vavr-match-0.10.3.jar
zipkin-2.9.1.jar