You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/19 12:11:15 UTC
[skywalking] branch master updated: Upgrade the InfluxDB
storage-plugin to protocol V3 (#4641)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 28530cd Upgrade the InfluxDB storage-plugin to protocol V3 (#4641)
28530cd is described below
commit 28530cd79d6d91443c0584fade1cbb79e137a8ed
Author: Daming <zt...@foxmail.com>
AuthorDate: Sun Apr 19 20:10:59 2020 +0800
Upgrade the InfluxDB storage-plugin to protocol V3 (#4641)
Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
Co-authored-by: kezhenxu94 <ke...@163.com>
Co-authored-by: kezhenxu94 <ke...@apache.org>
---
.github/workflows/e2e.cluster.yaml | 2 +-
.github/workflows/e2e.profiling.yaml | 2 +-
.github/workflows/e2e.storages.yaml | 2 +-
.github/workflows/e2e.ttl.yaml | 2 +-
.github/workflows/plugins-test.1.yaml | 1 -
.github/workflows/plugins-test.3.yaml | 1 +
.../storage/plugin/influxdb/InfluxClient.java | 20 +-
...luxModelConstants.java => InfluxConstants.java} | 32 ++-
.../plugin/influxdb/InfluxStorageProvider.java | 48 ++--
...AddressAlias.java => InfluxTableInstaller.java} | 28 +-
.../storage/plugin/influxdb/TableMetaInfo.java | 68 ++++-
.../plugin/influxdb/base/InfluxInsertRequest.java | 14 +-
.../storage/plugin/influxdb/base/MetricsDAO.java | 44 ++--
.../plugin/influxdb/base/NoneStreamDAO.java | 21 +-
.../storage/plugin/influxdb/base/RecordDAO.java | 16 +-
.../plugin/influxdb/query/AggregationQuery.java | 18 +-
.../influxdb/query/InfluxMetadataQueryDAO.java | 146 -----------
.../storage/plugin/influxdb/query/LogQuery.java | 16 +-
.../plugin/influxdb/query/MetadataQuery.java | 290 +++++++++++++++++++++
.../plugin/influxdb/query/MetricsQuery.java | 40 +--
.../influxdb/query/NetworkAddressAliasDAO.java | 86 ++++++
.../plugin/influxdb/query/ProfileTaskLogQuery.java | 27 +-
.../plugin/influxdb/query/ProfileTaskQuery.java | 51 ++--
.../influxdb/query/ProfileThreadSnapshotQuery.java | 28 +-
.../plugin/influxdb/query/TopNRecordsQuery.java | 8 +-
.../plugin/influxdb/query/TopologyQuery.java | 148 ++++++-----
.../storage/plugin/influxdb/query/TraceQuery.java | 10 +-
.../docker/profile/docker-compose.influxdb.yml | 4 -
test/plugin/run.sh | 5 +-
.../scenarios/mysql-scenario/support-version.list | 11 +-
.../scenarios/solrj-7.x-scenario/configuration.yml | 1 +
31 files changed, 782 insertions(+), 408 deletions(-)
diff --git a/.github/workflows/e2e.cluster.yaml b/.github/workflows/e2e.cluster.yaml
index b1eb89f..d2f82f2 100644
--- a/.github/workflows/e2e.cluster.yaml
+++ b/.github/workflows/e2e.cluster.yaml
@@ -37,7 +37,7 @@ jobs:
strategy:
matrix:
coordinator: ['zk']
- storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb'
+ storage: ['mysql', 'es6', 'es7', 'influxdb']
env:
SW_COORDINATOR: ${{ matrix.coordinator }}
SW_STORAGE: ${{ matrix.storage }}
diff --git a/.github/workflows/e2e.profiling.yaml b/.github/workflows/e2e.profiling.yaml
index 04de176..229468d 100644
--- a/.github/workflows/e2e.profiling.yaml
+++ b/.github/workflows/e2e.profiling.yaml
@@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
- storage: ['h2', 'mysql', 'es6', 'es7'] #TODO: 'influxdb'
+ storage: ['h2', 'mysql', 'es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
diff --git a/.github/workflows/e2e.storages.yaml b/.github/workflows/e2e.storages.yaml
index c346f4d..c5c380b 100644
--- a/.github/workflows/e2e.storages.yaml
+++ b/.github/workflows/e2e.storages.yaml
@@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
- storage: ['mysql', 'es6', 'es7'] #TODO: 'influxdb'
+ storage: ['mysql', 'es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
diff --git a/.github/workflows/e2e.ttl.yaml b/.github/workflows/e2e.ttl.yaml
index 8bce3fa..c1dce82 100644
--- a/.github/workflows/e2e.ttl.yaml
+++ b/.github/workflows/e2e.ttl.yaml
@@ -36,7 +36,7 @@ jobs:
timeout-minutes: 90
strategy:
matrix:
- storage: ['es6', 'es7'] #TODO: 'influxdb'
+ storage: ['es6', 'es7', 'influxdb']
env:
SW_STORAGE: ${{ matrix.storage }}
steps:
diff --git a/.github/workflows/plugins-test.1.yaml b/.github/workflows/plugins-test.1.yaml
index b9fc6c2..a9a46b7 100644
--- a/.github/workflows/plugins-test.1.yaml
+++ b/.github/workflows/plugins-test.1.yaml
@@ -46,7 +46,6 @@ jobs:
- { name: 'kotlin-coroutine-scenario', title: 'Kotlin Coroutine 1.0.1-1.3.3 (4)' }
- { name: 'lettuce-scenario', title: 'Lettuce 5.x (17)' }
- { name: 'mongodb-3.x-scenario', title: 'Mongodb 3.4.0-3.11.1 (22)' }
- - { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (53)' }
- { name: 'netty-socketio-scenario', title: 'Netty-SocketIO 1.x (4)' }
steps:
- uses: actions/checkout@v2
diff --git a/.github/workflows/plugins-test.3.yaml b/.github/workflows/plugins-test.3.yaml
index 4870180..80af4c7 100644
--- a/.github/workflows/plugins-test.3.yaml
+++ b/.github/workflows/plugins-test.3.yaml
@@ -33,6 +33,7 @@ jobs:
fail-fast: true
matrix:
case:
+ - { name: 'mysql-scenario', title: 'MySQL 5.1.2-8.0.15 (30)' }
- { name: 'undertow-scenario', title: 'Undertow 1.3.0-2.0.27 (23)' }
- { name: 'webflux-scenario', title: 'Spring-WebFlux 2.x (7)' }
- { name: 'zookeeper-scenario', title: 'Zookeeper 3.4.x (14)' }
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java
index cc4c06e..fe96dd4 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxClient.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
@@ -73,6 +74,7 @@ public class InfluxClient implements Client {
InfluxDB.ResponseFormat.MSGPACK
);
influx.query(new Query("CREATE DATABASE " + database));
+ influx.enableGzip();
influx.enableBatch(config.getActions(), config.getDuration(), TimeUnit.MILLISECONDS);
influx.setDatabase(database);
@@ -99,7 +101,7 @@ public class InfluxClient implements Client {
}
try {
- QueryResult result = getInflux().query(query);
+ QueryResult result = getInflux().query(new Query(query.getCommand()));
if (result.hasError()) {
throw new IOException(result.getError());
}
@@ -137,6 +139,22 @@ public class InfluxClient implements Client {
}
/**
+ * Execute a query against InfluxDB with a `select count(*)` statement and return the count only.
+ *
+ * @throws IOException if there is an error on the InfluxDB server or communication error
+ */
+ public int getCounter(Query query) throws IOException {
+ QueryResult.Series series = queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
+ if (Objects.isNull(series)) {
+ return 0;
+ }
+ return ((Number) series.getValues().get(0).get(1)).intValue();
+ }
+
+ /**
* Data management, to drop a time-series by measurement and time-series name specified. If an exception isn't
* thrown, it means execution success. Notice, drop series don't support to drop series by range
*
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
similarity index 62%
rename from oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java
rename to oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
index 18dd47d..33bc7db 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxModelConstants.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxConstants.java
@@ -18,9 +18,31 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb;
-public interface InfluxModelConstants {
- /**
- * Override column because the 'duration' is the identifier of InfluxDB.
- */
- String DURATION = "dur";
+public interface InfluxConstants {
+ String ID_COLUMN = "id";
+
+ String NAME = "\"name\"";
+
+ String ALL_FIELDS = "*::field";
+
+ String SORT_DES = "top";
+
+ String SORT_ASC = "bottom";
+
+ String DURATION = "\"" + "duration" + "\"";
+
+ interface TagName {
+
+ String ID_COLUMN = "_id";
+
+ String NAME = "_name";
+
+ String ENTITY_ID = "_entity_id";
+
+ String TIME_BUCKET = "_time_bucket";
+
+ String NODE_TYPE = "_node_type";
+
+ String SERVICE_ID = "_service_id";
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
index e501b4f..ae67748 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxStorageProvider.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
@@ -46,10 +47,10 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.HistoryDele
import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.InfluxStorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AggregationQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.AlarmQuery;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxMetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.InfluxNetworkAddressAlias;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.LogQuery;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetadataQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.MetricsQuery;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.NetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskLogQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileTaskQuery;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.ProfileThreadSnapshotQuery;
@@ -60,7 +61,7 @@ import org.apache.skywalking.oap.server.storage.plugin.influxdb.query.TraceQuery
@Slf4j
public class InfluxStorageProvider extends ModuleProvider {
private InfluxStorageConfig config;
- private InfluxClient influxClient;
+ private InfluxClient client;
public InfluxStorageProvider() {
config = new InfluxStorageConfig();
@@ -83,35 +84,42 @@ public class InfluxStorageProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
- influxClient = new InfluxClient(config);
+ client = new InfluxClient(config);
- this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(influxClient));
- this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(influxClient));
+ this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(client));
+ this.registerServiceImplementation(StorageDAO.class, new InfluxStorageDAO(client));
- this.registerServiceImplementation(INetworkAddressAliasDAO.class, new InfluxNetworkAddressAlias(influxClient));
- this.registerServiceImplementation(IMetadataQueryDAO.class, new InfluxMetadataQueryDAO(influxClient));
+ this.registerServiceImplementation(INetworkAddressAliasDAO.class, new NetworkAddressAliasDAO(client));
+ this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQuery(client));
- this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(influxClient));
- this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(influxClient));
- this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(influxClient));
- this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(influxClient));
- this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(influxClient));
- this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(influxClient));
- this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(influxClient));
+ this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQuery(client));
+ this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQuery(client));
+ this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQuery(client));
+ this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQuery(client));
+ this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQuery(client));
+ this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQuery(client));
+ this.registerServiceImplementation(ILogQueryDAO.class, new LogQuery(client));
- this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(influxClient));
+ this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQuery(client));
this.registerServiceImplementation(
- IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(influxClient));
+ IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQuery(client));
this.registerServiceImplementation(
- IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(influxClient, config.getFetchTaskLogMaxSize()));
+ IProfileTaskLogQueryDAO.class, new ProfileTaskLogQuery(client, config.getFetchTaskLogMaxSize()));
this.registerServiceImplementation(
- IHistoryDeleteDAO.class, new HistoryDeleteDAO(influxClient));
+ IHistoryDeleteDAO.class, new HistoryDeleteDAO(client));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
- influxClient.connect();
+ client.connect();
+
+ InfluxTableInstaller installer = new InfluxTableInstaller(getManager());
+ try {
+ installer.install(client);
+ } catch (StorageException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java
similarity index 52%
rename from oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java
rename to oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java
index 90b47e2..585df52 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxNetworkAddressAlias.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/InfluxTableInstaller.java
@@ -16,22 +16,28 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
+package org.apache.skywalking.oap.server.storage.plugin.influxdb;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
-public class InfluxNetworkAddressAlias implements INetworkAddressAliasDAO {
- private InfluxClient client;
+public class InfluxTableInstaller extends ModelInstaller {
- public InfluxNetworkAddressAlias(final InfluxClient client) {
- this.client = client;
+ public InfluxTableInstaller(ModuleManager moduleManager) {
+ super(moduleManager);
}
@Override
- public List<NetworkAddressAlias> loadLastUpdate(final long timeBucket) {
- return null;
+ protected boolean isExists(final Client client, final Model model) throws StorageException {
+ TableMetaInfo.addModel(model);
+ return true;
+ }
+
+ @Override
+ protected void createTable(final Client client, final Model model) throws StorageException {
+ // Automatically create table
}
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
index 4cc9e80..3d73663 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/TableMetaInfo.java
@@ -18,18 +18,80 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb;
+import com.google.common.collect.Maps;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+@Getter
+@Builder
+@AllArgsConstructor
public class TableMetaInfo {
- private static Map<String, Model> TABLES = new HashMap<>();
+ private static final Map<String, TableMetaInfo> TABLES = new HashMap<>();
+
+ private Map<String, String> storageAndColumnMap;
+ private Map<String, String> storageAndTagMap;
+ private Model model;
public static void addModel(Model model) {
- TABLES.put(model.getName(), model);
+ final List<ModelColumn> columns = model.getColumns();
+ final Map<String, String> storageAndTagMap = Maps.newHashMap();
+ final Map<String, String> storageAndColumnMap = Maps.newHashMap();
+ columns.forEach(column -> {
+ ColumnName columnName = column.getColumnName();
+ storageAndColumnMap.put(columnName.getStorageName(), columnName.getName());
+ });
+
+ if (model.getName().endsWith("_traffic")) {
+ // instance_traffic name, service_id
+ // endpoint_traffic name, service_id
+ storageAndTagMap.put(InstanceTraffic.NAME, InfluxConstants.TagName.NAME);
+ if (InstanceTraffic.INDEX_NAME.equals(model.getName())
+ || EndpointTraffic.INDEX_NAME.equals(model.getName())) {
+ storageAndTagMap.put(EndpointTraffic.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
+ } else {
+ // service_traffic name, node_type
+ storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
+ }
+ } else {
+
+ // Specifies ENTITY_ID, TIME_BUCKET, NODE_TYPE, SERVICE_ID as tag
+ if (storageAndColumnMap.containsKey(Metrics.ENTITY_ID)) {
+ storageAndTagMap.put(Metrics.ENTITY_ID, InfluxConstants.TagName.ENTITY_ID);
+ }
+ if (storageAndColumnMap.containsKey(Record.TIME_BUCKET)) {
+ storageAndTagMap.put(Record.TIME_BUCKET, InfluxConstants.TagName.TIME_BUCKET);
+ }
+ if (storageAndColumnMap.containsKey(ServiceTraffic.NODE_TYPE)) {
+ storageAndTagMap.put(ServiceTraffic.NODE_TYPE, InfluxConstants.TagName.NODE_TYPE);
+ }
+ if (storageAndColumnMap.containsKey(SegmentRecord.SERVICE_ID)) {
+ storageAndTagMap.put(SegmentRecord.SERVICE_ID, InfluxConstants.TagName.SERVICE_ID);
+ }
+ }
+
+ TableMetaInfo info = TableMetaInfo.builder()
+ .model(model)
+ .storageAndTagMap(storageAndTagMap)
+ .storageAndColumnMap(storageAndColumnMap)
+ .build();
+ TABLES.put(model.getName(), info);
}
- public static Model get(String moduleName) {
+ public static TableMetaInfo get(String moduleName) {
return TABLES.get(moduleName);
}
+
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
index eaa0f4d..f984372 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/InfluxInsertRequest.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.storage.plugin.influxdb.base;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
@@ -29,15 +28,13 @@ import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.Point;
/**
* InfluxDB Point wrapper.
*/
public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
- public static final String ID = "id";
-
private Point.Builder builder;
private Map<String, Object> fields = Maps.newHashMap();
@@ -57,9 +54,8 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
}
}
builder = Point.measurement(model.getName())
- .addField(ID, storageData.id())
- .fields(fields)
- .tag(InfluxClient.TAG_TIME_BUCKET, String.valueOf(fields.get(Metrics.TIME_BUCKET)));
+ .addField(InfluxConstants.ID_COLUMN, storageData.id())
+ .fields(fields);
}
public InfluxInsertRequest time(long time, TimeUnit unit) {
@@ -68,9 +64,7 @@ public class InfluxInsertRequest implements InsertRequest, UpdateRequest {
}
public InfluxInsertRequest addFieldAsTag(String fieldName, String tagName) {
- if (fields.containsKey(fieldName)) {
- builder.tag(tagName, String.valueOf(fields.get(fieldName)));
- }
+ builder.tag(tagName, String.valueOf(fields.get(fieldName)));
return this;
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java
index eb3008f..86f5c8c 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/MetricsDAO.java
@@ -26,30 +26,27 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
+@Slf4j
public class MetricsDAO implements IMetricsDAO {
- public static final String TAG_ENTITY_ID = "_entity_id";
- public static final String TAG_ENDPOINT_OWNER_SERVICE = "_service_id";
- public static final String TAG_ENDPOINT_NAME = "_endpoint_name";
private final StorageBuilder<Metrics> storageBuilder;
private final InfluxClient client;
@@ -62,10 +59,13 @@ public class MetricsDAO implements IMetricsDAO {
@Override
public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
- .regex("*::field")
+ .raw(ALL_FIELDS)
.from(client.getDatabase(), model.getName())
.where(contains("id", Joiner.on("|").join(ids)));
QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
if (series == null) {
return Collections.emptyList();
@@ -73,10 +73,9 @@ public class MetricsDAO implements IMetricsDAO {
final List<Metrics> metrics = Lists.newArrayList();
List<String> columns = series.getColumns();
- Map<String, String> storageAndColumnNames = Maps.newHashMap();
- for (ModelColumn column : model.getColumns()) {
- storageAndColumnNames.put(column.getColumnName().getStorageName(), column.getColumnName().getName());
- }
+
+ TableMetaInfo metaInfo = TableMetaInfo.get(model.getName());
+ Map<String, String> storageAndColumnMap = metaInfo.getStorageAndColumnMap();
series.getValues().forEach(values -> {
Map<String, Object> data = Maps.newHashMap();
@@ -87,7 +86,7 @@ public class MetricsDAO implements IMetricsDAO {
value = ((StorageDataComplexObject) value).toStorageData();
}
- data.put(storageAndColumnNames.get(columns.get(i)), value);
+ data.put(storageAndColumnMap.get(columns.get(i)), value);
}
metrics.add(storageBuilder.map2Data(data));
@@ -99,16 +98,15 @@ public class MetricsDAO implements IMetricsDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
final long timestamp = TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling());
- if (metrics instanceof EndpointTraffic || metrics instanceof ServiceTraffic || metrics instanceof InstanceTraffic) {
- return new InfluxInsertRequest(model, metrics, storageBuilder)
- .time(timestamp, TimeUnit.MILLISECONDS)
- .addFieldAsTag(EndpointTraffic.SERVICE_ID, TAG_ENDPOINT_OWNER_SERVICE)
- .addFieldAsTag(EndpointTraffic.NAME, TAG_ENDPOINT_NAME);
- } else {
- return new InfluxInsertRequest(model, metrics, storageBuilder)
- .time(timestamp, TimeUnit.MILLISECONDS)
- .addFieldAsTag(Metrics.ENTITY_ID, TAG_ENTITY_ID);
- }
+ TableMetaInfo tableMetaInfo = TableMetaInfo.get(model.getName());
+
+ final InfluxInsertRequest request = new InfluxInsertRequest(model, metrics, storageBuilder)
+ .time(timestamp, TimeUnit.MILLISECONDS);
+
+ tableMetaInfo.getStorageAndTagMap().forEach((field, tag) -> {
+ request.addFieldAsTag(field, tag);
+ });
+ return request;
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java
index ae00288..3d89e41 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/NoneStreamDAO.java
@@ -23,15 +23,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.influxdb.dto.Point;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
public class NoneStreamDAO implements INoneStreamDAO {
- public static final String TAG_SERVICE_ID = "_service_id";
private static final int PADDING_SIZE = 1_000_000;
private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE);
@@ -45,13 +43,14 @@ public class NoneStreamDAO implements INoneStreamDAO {
@Override
public void insert(final Model model, final NoneStream noneStream) throws IOException {
- final long timestamp = TimeBucket.getTimestamp(
- noneStream.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement();
-
- Point point = new InfluxInsertRequest(model, noneStream, storageBuilder)
- .time(timestamp, TimeUnit.NANOSECONDS)
- .addFieldAsTag(ProfileTaskRecord.SERVICE_ID, TAG_SERVICE_ID).getPoint();
-
- client.write(point);
+ final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling())
+ * PADDING_SIZE + SUFFIX.getAndIncrement();
+
+ final InfluxInsertRequest request = new InfluxInsertRequest(model, noneStream, storageBuilder)
+ .time(timestamp, TimeUnit.NANOSECONDS);
+ TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
+ request.addFieldAsTag(field, tag);
+ });
+ client.write(request.getPoint());
}
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
index fae2e41..63739c1 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/base/RecordDAO.java
@@ -22,16 +22,15 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
public class RecordDAO implements IRecordDAO {
- public static final String TAG_SERVICE_ID = "_service_id";
private static final int PADDING_SIZE = 1_000_000;
private static final AtomicRangeInteger SUFFIX = new AtomicRangeInteger(0, PADDING_SIZE);
@@ -45,11 +44,14 @@ public class RecordDAO implements IRecordDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- final long timestamp = TimeBucket.getTimestamp(
- record.getTimeBucket(), model.getDownsampling()) * PADDING_SIZE + SUFFIX.getAndIncrement();
+ final long timestamp = TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())
+ * PADDING_SIZE + SUFFIX.getAndIncrement();
- return new InfluxInsertRequest(model, record, storageBuilder)
- .time(timestamp, TimeUnit.NANOSECONDS)
- .addFieldAsTag(SegmentRecord.SERVICE_ID, TAG_SERVICE_ID);
+ final InfluxInsertRequest request = new InfluxInsertRequest(model, record, storageBuilder)
+ .time(timestamp, TimeUnit.NANOSECONDS);
+ TableMetaInfo.get(model.getName()).getStorageAndTagMap().forEach((field, tag) -> {
+ request.addFieldAsTag(field, tag);
+ });
+ return request;
}
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java
index 2ab6b21..48cd1ef 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/AggregationQuery.java
@@ -25,13 +25,11 @@ import java.util.Comparator;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.query.entity.Order;
import org.apache.skywalking.oap.server.core.query.entity.TopNEntity;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.SelectSubQueryImpl;
@@ -68,7 +66,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
long startTB, long endTB, Order order) throws IOException {
return getTopNEntity(
downsampling, indName,
- subQuery(InstanceTraffic.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
+ subQuery(InfluxConstants.TagName.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
);
}
@@ -84,7 +82,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
long startTB, long endTB, Order order) throws IOException {
return getTopNEntity(
downsampling, indName,
- subQuery(EndpointTraffic.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
+ subQuery(InfluxConstants.TagName.SERVICE_ID, serviceId, indName, valueCName, startTB, endTB), order, topN
);
}
@@ -95,14 +93,14 @@ public class AggregationQuery implements IAggregationQueryDAO {
int topN) throws IOException {
// Have to re-sort here. Because the function, top()/bottom(), get the result ordered by the `time`.
Comparator<TopNEntity> comparator = DESCENDING;
- String functionName = "top";
+ String functionName = InfluxConstants.SORT_DES;
if (order == Order.ASC) {
- functionName = "bottom";
+ functionName = InfluxConstants.SORT_ASC;
comparator = ASCENDING;
}
SelectQueryImpl query = select().function(functionName, "mean", topN).as("value")
- .column(MetricsDAO.TAG_ENTITY_ID)
+ .column(InfluxConstants.TagName.ENTITY_ID)
.from(client.getDatabase(), measurement);
query.setSubQuery(subQuery);
@@ -135,7 +133,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
.and(eq(serviceColumnName, serviceId))
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)))
- .groupBy(MetricsDAO.TAG_ENTITY_ID);
+ .groupBy(InfluxConstants.TagName.ENTITY_ID);
}
private SelectSubQueryImpl<SelectQueryImpl> subQuery(String name, String columnName, long startTB, long endTB) {
@@ -143,7 +141,7 @@ public class AggregationQuery implements IAggregationQueryDAO {
.where()
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)))
- .groupBy(MetricsDAO.TAG_ENTITY_ID);
+ .groupBy(InfluxConstants.TagName.ENTITY_ID);
}
private static final Comparator<TopNEntity> ASCENDING = Comparator.comparingLong(TopNEntity::getValue);
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java
deleted file mode 100644
index b300f85..0000000
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/InfluxMetadataQueryDAO.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
-
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.query.entity.Database;
-import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
-import org.apache.skywalking.oap.server.core.query.entity.Service;
-import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
-import org.influxdb.dto.Query;
-import org.influxdb.dto.QueryResult;
-import org.influxdb.querybuilder.SelectQueryImpl;
-import org.influxdb.querybuilder.WhereQueryImpl;
-
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
-
-public class InfluxMetadataQueryDAO implements IMetadataQueryDAO {
- private InfluxClient client;
- // 'name' is InfluxDB keyword, so escapes it
- private static final String ENDPOINT_NAME = '\"' + EndpointTraffic.NAME + '\"';
-
- public InfluxMetadataQueryDAO(final InfluxClient client) {
- this.client = client;
- }
-
- @Override
- public int numOfService(final long startTimestamp, final long endTimestamp) throws IOException {
- return 0;
- }
-
- @Override
- public int numOfEndpoint() throws IOException {
- final SelectQueryImpl selectQuery = select()
- .count(EndpointTraffic.ENTITY_ID)
- .from(client.getDatabase(), EndpointTraffic.INDEX_NAME);
-
- Query query = new Query(selectQuery.getCommand());
-
- final QueryResult.Series series = client.queryForSingleSeries(query);
- if (series == null) {
- return 0;
- }
-
- return ((Number) series.getValues().get(0).get(1)).intValue();
- }
-
- @Override
- public int numOfConjectural(final int nodeTypeValue) throws IOException {
- return 0;
- }
-
- @Override
- public List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException {
- return null;
- }
-
- @Override
- public List<Service> getAllBrowserServices(final long startTimestamp, final long endTimestamp) throws IOException {
- return null;
- }
-
- @Override
- public List<Database> getAllDatabases() throws IOException {
- return null;
- }
-
- @Override
- public List<Service> searchServices(final long startTimestamp,
- final long endTimestamp,
- final String keyword) throws IOException {
- return null;
- }
-
- @Override
- public Service searchService(final String serviceCode) throws IOException {
- return null;
- }
-
- @Override
- public List<Endpoint> searchEndpoint(final String keyword,
- final String serviceId,
- final int limit) throws IOException {
- WhereQueryImpl<SelectQueryImpl> endpointQuery = select()
- .column(EndpointTraffic.SERVICE_ID)
- .column(ENDPOINT_NAME)
- .from(client.getDatabase(), EndpointTraffic.INDEX_NAME)
- .where();
- endpointQuery.where(eq(MetricsDAO.TAG_ENDPOINT_OWNER_SERVICE, String.valueOf(serviceId)));
- if (!Strings.isNullOrEmpty(keyword)) {
- endpointQuery.where(contains(MetricsDAO.TAG_ENDPOINT_NAME, keyword.replaceAll("/", "\\\\/")));
- }
- endpointQuery.limit(limit);
-
- Query query = new Query(endpointQuery.getCommand());
-
- final QueryResult.Series series = client.queryForSingleSeries(query);
-
- List<Endpoint> list = new ArrayList<>(limit);
- if (series != null) {
- series.getValues().forEach(values -> {
- EndpointTraffic endpointTraffic = new EndpointTraffic();
- endpointTraffic.setServiceId((String) values.get(1));
- endpointTraffic.setName((String) values.get(2));
-
- Endpoint endpoint = new Endpoint();
- endpoint.setId(IDManager.EndpointID.buildId(endpointTraffic.getServiceId(), endpointTraffic.getName()));
- endpoint.setName(endpointTraffic.getName());
- list.add(endpoint);
- });
- }
- return list;
- }
-
- @Override
- public List<ServiceInstance> getServiceInstances(final long startTimestamp,
- final long endTimestamp,
- final String serviceId) throws IOException {
- return null;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java
index fe69ac0..6b6019d 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/LogQuery.java
@@ -32,8 +32,9 @@ import org.apache.skywalking.oap.server.core.query.entity.LogState;
import org.apache.skywalking.oap.server.core.query.entity.Logs;
import org.apache.skywalking.oap.server.core.query.entity.Pagination;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
@@ -51,6 +52,7 @@ import static org.apache.skywalking.oap.server.core.analysis.manual.log.Abstract
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.STATUS_CODE;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TIMESTAMP;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TRACE_ID;
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ALL_FIELDS;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
@@ -68,11 +70,11 @@ public class LogQuery implements ILogQueryDAO {
public Logs queryLogs(String metricName, int serviceId, int serviceInstanceId, String endpointId, String traceId,
LogState state, String stateCode, Pagination paging, int from, int limit,
long startTB, long endTB) throws IOException {
- WhereQueryImpl<SelectQueryImpl> recallQuery = select().regex("*::field")
+ WhereQueryImpl<SelectQueryImpl> recallQuery = select().raw(ALL_FIELDS)
.from(client.getDatabase(), metricName)
.where();
if (serviceId != Const.NONE) {
- recallQuery.and(eq(RecordDAO.TAG_SERVICE_ID, String.valueOf(serviceId)));
+ recallQuery.and(eq(InfluxConstants.TagName.SERVICE_ID, String.valueOf(serviceId)));
}
if (serviceInstanceId != Const.NONE) {
recallQuery.and(eq(SERVICE_INSTANCE_ID, serviceInstanceId));
@@ -131,8 +133,12 @@ public class LogQuery implements ILogQueryDAO {
Map<String, Object> data = Maps.newHashMap();
Log log = new Log();
- for (int i = 0; i < columns.size(); i++) {
- data.put(columns.get(i), values.get(i));
+ for (int i = 1; i < columns.size(); i++) {
+ Object value = values.get(i);
+ if (value instanceof StorageDataComplexObject) {
+ value = ((StorageDataComplexObject) value).toStorageData();
+ }
+ data.put(columns.get(i), value);
}
log.setContent((String) data.get(CONTENT));
log.setContentType(ContentType.instanceOf((int) data.get(CONTENT_TYPE)));
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java
new file mode 100644
index 0000000..bd04197
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetadataQuery.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.entity.Attribute;
+import org.apache.skywalking.oap.server.core.query.entity.Database;
+import org.apache.skywalking.oap.server.core.query.entity.Endpoint;
+import org.apache.skywalking.oap.server.core.query.entity.Language;
+import org.apache.skywalking.oap.server.core.query.entity.LanguageTrans;
+import org.apache.skywalking.oap.server.core.query.entity.Service;
+import org.apache.skywalking.oap.server.core.query.entity.ServiceInstance;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.influxdb.querybuilder.SelectQueryImpl;
+import org.influxdb.querybuilder.SelectSubQueryImpl;
+import org.influxdb.querybuilder.WhereQueryImpl;
+import org.influxdb.querybuilder.WhereSubQueryImpl;
+
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN;
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.NAME;
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.TagName;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
+
+@Slf4j
+public class MetadataQuery implements IMetadataQueryDAO {
+ private static final Gson GSON = new Gson();
+ private final InfluxClient client;
+
+ public MetadataQuery(final InfluxClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public int numOfService(final long startTimestamp, final long endTimestamp) throws IOException {
+ WhereQueryImpl query = select().raw("count(distinct " + ID_COLUMN + ")")
+ .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
+ .where()
+ .and(
+ eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())
+ ));
+ return client.getCounter(query);
+ }
+
+ @Override
+ public int numOfEndpoint() throws IOException {
+ SelectQueryImpl query = select()
+ .raw("count(distinct " + ID_COLUMN + ")")
+ .from(client.getDatabase(), EndpointTraffic.INDEX_NAME);
+ return client.getCounter(query);
+ }
+
+ @Override
+ public int numOfConjectural(final int nodeTypeValue) throws IOException {
+ WhereQueryImpl<SelectQueryImpl> query = select().raw("count(distinct " + ID_COLUMN + ")")
+ .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
+ .where(eq(
+ InfluxConstants.TagName.NODE_TYPE,
+ String.valueOf(nodeTypeValue)
+ ));
+ return client.getCounter(query);
+ }
+
+ @Override
+ public List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException {
+ SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
+ .fromSubQuery(client.getDatabase())
+ .column(ID_COLUMN).column(NAME)
+ .from(ServiceTraffic.INDEX_NAME)
+ .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())))
+ .groupBy(TagName.NAME, TagName.NODE_TYPE);
+ SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
+ query.setSubQuery(subQuery);
+ return buildServices(query);
+ }
+
+ @Override
+ public List<Service> getAllBrowserServices(long startTimestamp, long endTimestamp) throws IOException {
+ WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME)
+ .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
+ .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
+ return buildServices(query);
+ }
+
+ @Override
+ public List<Database> getAllDatabases() throws IOException {
+ SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
+ .fromSubQuery(client.getDatabase())
+ .column(ID_COLUMN).column(NAME)
+ .from(ServiceTraffic.INDEX_NAME)
+ .where(eq(InfluxConstants.TagName.NODE_TYPE, NodeType.Database.value()))
+ .groupBy(TagName.NAME, TagName.NODE_TYPE);
+ SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
+ query.setSubQuery(subQuery);
+ QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
+
+ List<Database> databases = Lists.newArrayList();
+ if (Objects.nonNull(series)) {
+ for (List<Object> values : series.getValues()) {
+ Database database = new Database();
+ database.setId((String) values.get(1));
+ database.setName((String) values.get(2));
+ databases.add(database);
+ }
+ }
+ return databases;
+ }
+
+ @Override
+ public List<Service> searchServices(long startTimestamp, long endTimestamp, String keyword) throws IOException {
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
+ .fromSubQuery(client.getDatabase())
+ .column(ID_COLUMN)
+ .column(NAME)
+ .from(ServiceTraffic.INDEX_NAME)
+ .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
+ if (!Strings.isNullOrEmpty(keyword)) {
+ subQuery.and(contains(ServiceTraffic.NAME, keyword));
+ }
+ subQuery.groupBy(TagName.NAME, TagName.NODE_TYPE);
+
+ SelectQueryImpl query = select(ID_COLUMN, NAME).from(client.getDatabase());
+ query.setSubQuery(subQuery);
+ return buildServices(query);
+ }
+
+ @Override
+ public Service searchService(String serviceCode) throws IOException {
+ WhereQueryImpl<SelectQueryImpl> query = select(ID_COLUMN, NAME)
+ .from(client.getDatabase(), ServiceTraffic.INDEX_NAME)
+ .where(eq(InfluxConstants.TagName.NODE_TYPE, String.valueOf(NodeType.Normal.value())));
+ query.and(eq(ServiceTraffic.NODE_TYPE, serviceCode));
+ return buildServices(query).get(0);
+ }
+
+ @Override
+ public List<Endpoint> searchEndpoint(final String keyword,
+ final String serviceId,
+ final int limit) throws IOException {
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
+ .fromSubQuery(client.getDatabase())
+ .column(ID_COLUMN)
+ .column(NAME)
+ .from(EndpointTraffic.INDEX_NAME)
+ .where(eq(InfluxConstants.TagName.SERVICE_ID, String.valueOf(serviceId)));
+ if (!Strings.isNullOrEmpty(keyword)) {
+ subQuery.where(contains(EndpointTraffic.NAME, keyword.replaceAll("/", "\\\\/")));
+ }
+ subQuery.groupBy(TagName.NAME, TagName.SERVICE_ID);
+ SelectQueryImpl query = select(ID_COLUMN, NAME)
+ .from(client.getDatabase());
+ query.setSubQuery(subQuery);
+ query.limit(limit);
+
+ final QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
+
+ List<Endpoint> list = new ArrayList<>(limit);
+ if (series != null) {
+ series.getValues().forEach(values -> {
+ Endpoint endpoint = new Endpoint();
+ endpoint.setId((String) values.get(1));
+ endpoint.setName((String) values.get(2));
+ list.add(endpoint);
+ });
+ }
+ return list;
+ }
+
+ @Override
+ public List<ServiceInstance> getServiceInstances(final long startTimestamp,
+ final long endTimestamp,
+ final String serviceId) throws IOException {
+ final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
+
+ SelectSubQueryImpl<SelectQueryImpl> subQuery = select()
+ .fromSubQuery(client.getDatabase())
+ .column(ID_COLUMN).column(NAME).column(InstanceTraffic.PROPERTIES)
+ .from(InstanceTraffic.INDEX_NAME)
+ .where()
+ .and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket))
+ .and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId))
+ .groupBy(TagName.NAME, TagName.SERVICE_ID);
+
+ SelectQueryImpl query = select().column(ID_COLUMN)
+ .column(NAME)
+ .column(InstanceTraffic.PROPERTIES)
+ .from(client.getDatabase(), InstanceTraffic.INDEX_NAME);
+ query.setSubQuery(subQuery);
+
+ QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
+
+ if (Objects.isNull(series)) {
+ return Collections.EMPTY_LIST;
+ }
+
+ List<List<Object>> result = series.getValues();
+ List<ServiceInstance> instances = Lists.newArrayList();
+ for (List<Object> values : result) {
+ ServiceInstance serviceInstance = new ServiceInstance();
+
+ serviceInstance.setId((String) values.get(1));
+ serviceInstance.setName((String) values.get(2));
+ serviceInstance.setInstanceUUID(serviceInstance.getId());
+
+ String propertiesString = (String) values.get(3);
+ if (!Strings.isNullOrEmpty(propertiesString)) {
+ JsonObject properties = GSON.fromJson(propertiesString, JsonObject.class);
+ for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
+ String key = property.getKey();
+ String value = property.getValue().getAsString();
+ if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
+ serviceInstance.setLanguage(LanguageTrans.INSTANCE.value(value));
+ } else {
+ serviceInstance.getAttributes().add(new Attribute(key, value));
+ }
+
+ }
+ } else {
+ serviceInstance.setLanguage(Language.UNKNOWN);
+ }
+ instances.add(serviceInstance);
+ }
+ return instances;
+ }
+
+ private List<Service> buildServices(Query query) throws IOException {
+ QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
+
+ ArrayList<Service> services = Lists.newArrayList();
+ if (Objects.nonNull(series)) {
+ for (List<Object> values : series.getValues()) {
+ Service service = new Service();
+ service.setId((String) values.get(1));
+ service.setName((String) values.get(2));
+ services.add(service);
+ }
+ }
+ return services;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
index 09b2107..a28f1c6 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/MetricsQuery.java
@@ -38,14 +38,16 @@ import org.apache.skywalking.oap.server.core.query.sql.KeyValues;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.MetricsDAO;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.SelectionQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
+import static org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants.ID_COLUMN;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.contains;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
@@ -74,7 +76,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
WhereQueryImpl<SelectQueryImpl> queryWhereQuery = query.from(client.getDatabase(), measurement).where();
Map<String, Class<?>> columnTypes = Maps.newHashMap();
- for (ModelColumn column : TableMetaInfo.get(measurement).getColumns()) {
+ for (ModelColumn column : TableMetaInfo.get(measurement).getModel().getColumns()) {
columnTypes.put(column.getColumnName().getStorageName(), column.getType());
}
@@ -84,6 +86,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
StringBuilder clauseBuilder = new StringBuilder();
for (KeyValues kv : whereKeyValues) {
final List<String> values = kv.getValues();
+ ids.addAll(values);
Class<?> type = columnTypes.get(kv.getKey());
if (values.size() == 1) {
@@ -93,16 +96,15 @@ public class MetricsQuery implements IMetricsQueryDAO {
}
clauseBuilder.append(kv.getKey()).append("=").append(value).append(" OR ");
} else {
- ids.addAll(values);
if (type == String.class) {
clauseBuilder.append(kv.getKey())
.append(" =~ /")
.append(Joiner.on("|").join(values))
.append("/ OR ");
- continue;
- }
- for (String value : values) {
- clauseBuilder.append(kv.getKey()).append(" = '").append(value).append("' OR ");
+ } else {
+ for (String value : values) {
+ clauseBuilder.append(kv.getKey()).append(" = '").append(value).append("' OR ");
+ }
}
}
}
@@ -111,17 +113,17 @@ public class MetricsQuery implements IMetricsQueryDAO {
queryWhereQuery
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB, downsampling)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB, downsampling)))
- .groupBy(MetricsDAO.TAG_ENTITY_ID);
+ .groupBy(InfluxConstants.TagName.ENTITY_ID);
IntValues intValues = new IntValues();
List<QueryResult.Series> seriesList = client.queryForSeries(queryWhereQuery);
if (log.isDebugEnabled()) {
log.debug("SQL: {} result set: {}", queryWhereQuery.getCommand(), seriesList);
}
- if (!(seriesList == null || seriesList.isEmpty())) {
+ if (CollectionUtils.isNotEmpty(seriesList)) {
for (QueryResult.Series series : seriesList) {
KVInt kv = new KVInt();
- kv.setId(series.getTags().get(MetricsDAO.TAG_ENTITY_ID));
+ kv.setId(series.getTags().get(InfluxConstants.TagName.ENTITY_ID));
Number value = (Number) series.getValues().get(0).get(1);
kv.setValue(value.longValue());
@@ -139,16 +141,16 @@ public class MetricsQuery implements IMetricsQueryDAO {
String valueCName)
throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
- .column("id")
+ .column(ID_COLUMN)
.column(valueCName)
.from(client.getDatabase(), measurement)
.where();
- if (ids != null && !ids.isEmpty()) {
+ if (CollectionUtils.isNotEmpty(ids)) {
if (ids.size() == 1) {
- query.where(eq("id", ids.get(0)));
+ query.where(eq(ID_COLUMN, ids.get(0)));
} else {
- query.where(contains("id", Joiner.on("|").join(ids)));
+ query.where(contains(ID_COLUMN, Joiner.on("|").join(ids)));
}
}
List<QueryResult.Series> seriesList = client.queryForSeries(query);
@@ -157,7 +159,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
}
IntValues intValues = new IntValues();
- if (!(seriesList == null || seriesList.isEmpty())) {
+ if (CollectionUtils.isNotEmpty(seriesList)) {
seriesList.get(0).getValues().forEach(values -> {
KVInt kv = new KVInt();
kv.setValue(((Number) values.get(2)).longValue());
@@ -197,7 +199,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
.from(client.getDatabase(), measurement)
.where();
- if (ids != null && !ids.isEmpty()) {
+ if (CollectionUtils.isNotEmpty(ids)) {
if (ids.size() == 1) {
query.where(eq("id", ids.get(0)));
} else {
@@ -212,7 +214,7 @@ public class MetricsQuery implements IMetricsQueryDAO {
for (int i = 0; i < intValues.length; i++) {
intValues[i] = new IntValues();
}
- if (series == null || series.isEmpty()) {
+ if (CollectionUtils.isEmpty(series)) {
return intValues;
}
series.get(0).getValues().forEach(values -> {
@@ -253,9 +255,9 @@ public class MetricsQuery implements IMetricsQueryDAO {
.column(ThermodynamicMetrics.STEP)
.column(ThermodynamicMetrics.NUM_OF_STEPS)
.column(ThermodynamicMetrics.DETAIL_GROUP)
- .column("id")
+ .column(ID_COLUMN)
.from(client.getDatabase(), measurement)
- .where(contains("id", Joiner.on("|").join(ids)));
+ .where(contains(ID_COLUMN, Joiner.on("|").join(ids)));
Map<String, List<Long>> thermodynamicValueMatrix = new HashMap<>();
QueryResult.Series series = client.queryForSingleSeries(query);
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java
new file mode 100644
index 0000000..ef60af8
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/NetworkAddressAliasDAO.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.TableMetaInfo;
+import org.influxdb.dto.QueryResult;
+import org.influxdb.querybuilder.SelectQueryImpl;
+import org.influxdb.querybuilder.WhereQueryImpl;
+
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
+
+@Slf4j
+public class NetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+ private final NetworkAddressAlias.Builder builder = new NetworkAddressAlias.Builder();
+ private InfluxClient client;
+
+ public NetworkAddressAliasDAO(final InfluxClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public List<NetworkAddressAlias> loadLastUpdate(final long timeBucket) {
+ List<NetworkAddressAlias> networkAddressAliases = new ArrayList<>();
+
+ WhereQueryImpl<SelectQueryImpl> query = select().raw(InfluxConstants.ALL_FIELDS)
+ .from(client.getDatabase(), NetworkAddressAlias.INDEX_NAME)
+ .where(gte(
+ NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET,
+ timeBucket
+ ));
+ try {
+ QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
+ if (Objects.isNull(series)) {
+ return networkAddressAliases;
+ }
+
+ List<List<Object>> result = series.getValues();
+ List<String> columns = series.getColumns();
+
+ Map<String, String> columnAndFieldMap = TableMetaInfo.get(NetworkAddressAlias.INDEX_NAME)
+ .getStorageAndColumnMap();
+ for (List<Object> values : result) {
+ Map<String, Object> map = Maps.newHashMap();
+ for (int i = 1; i < columns.size(); i++) {
+ map.put(columnAndFieldMap.get(columns.get(i)), values.get(i));
+ }
+ networkAddressAliases.add(builder.map2Data(map));
+ }
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ return networkAddressAliases;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java
index cfac9db..59982d8 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskLogQuery.java
@@ -19,17 +19,16 @@
package org.apache.skywalking.oap.server.storage.plugin.influxdb.query;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLog;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
@@ -49,8 +48,8 @@ public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO {
@Override
public List<ProfileTaskLog> getTaskLogList() throws IOException {
WhereQueryImpl<SelectQueryImpl> query = select()
- .function("top", ProfileTaskLogRecord.OPERATION_TIME, fetchTaskLogMaxSize)
- .column("id")
+ .function(InfluxConstants.SORT_DES, ProfileTaskLogRecord.OPERATION_TIME, fetchTaskLogMaxSize)
+ .column(InfluxConstants.ID_COLUMN)
.column(ProfileTaskLogRecord.TASK_ID)
.column(ProfileTaskLogRecord.INSTANCE_ID)
.column(ProfileTaskLogRecord.OPERATION_TIME)
@@ -65,26 +64,18 @@ public class ProfileTaskLogQuery implements IProfileTaskLogQueryDAO {
if (series == null) {
return Collections.emptyList();
}
- List<String> columns = series.getColumns();
- Map<String, Integer> columnsMap = Maps.newHashMap();
- for (int i = 0; i < columns.size(); i++) {
- columnsMap.put(columns.get(i), i);
- }
-
- List<ProfileTaskLog> taskLogs = Lists.newArrayList();
+ final List<ProfileTaskLog> taskLogs = Lists.newArrayList();
series.getValues().stream()
// re-sort by self, because of the result order by time.
.sorted((a, b) -> Long.compare(((Number) b.get(1)).longValue(), ((Number) a.get(1)).longValue()))
.forEach(values -> {
taskLogs.add(ProfileTaskLog.builder()
- .id((String) values.get(columnsMap.get("id")))
- .taskId((String) values.get(columnsMap.get(ProfileTaskLogRecord.TASK_ID)))
- .instanceId(
- (String) values.get(columnsMap.get(ProfileTaskLogRecord.INSTANCE_ID)))
- .operationTime(
- (Long) values.get(columnsMap.get(ProfileTaskLogRecord.OPERATION_TIME)))
+ .id((String) values.get(2))
+ .taskId((String) values.get(3))
+ .instanceId((String) values.get(4))
+ .operationTime(((Number) values.get(5)).longValue())
.operationType(ProfileTaskLogOperationType.parse(
- (int) values.get(columnsMap.get(ProfileTaskLogRecord.OPERATION_TYPE))))
+ ((Number) values.get(6)).intValue()))
.build());
});
return taskLogs;
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java
index db2b2f2..43099ef 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileTaskQuery.java
@@ -22,13 +22,13 @@ import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxModelConstants;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.NoneStreamDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.SelectQueryImpl;
import org.influxdb.querybuilder.WhereQueryImpl;
@@ -38,8 +38,9 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
+@Slf4j
public class ProfileTaskQuery implements IProfileTaskQueryDAO {
- private InfluxClient client;
+ private final InfluxClient client;
public ProfileTaskQuery(InfluxClient client) {
this.client = client;
@@ -52,19 +53,22 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
final Long endTimeBucket,
final Integer limit) throws IOException {
WhereQueryImpl<SelectQueryImpl> query =
- select("id", ProfileTaskRecord.SERVICE_ID,
- ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
- ProfileTaskRecord.CREATE_TIME,
- InfluxModelConstants.DURATION,
- ProfileTaskRecord.MIN_DURATION_THRESHOLD,
- ProfileTaskRecord.DUMP_PERIOD,
- ProfileTaskRecord.MAX_SAMPLING_COUNT
+ select(
+ InfluxConstants.ID_COLUMN,
+ ProfileTaskRecord.SERVICE_ID,
+ ProfileTaskRecord.ENDPOINT_NAME,
+ ProfileTaskRecord.START_TIME,
+ ProfileTaskRecord.CREATE_TIME,
+ InfluxConstants.DURATION,
+ ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+ ProfileTaskRecord.DUMP_PERIOD,
+ ProfileTaskRecord.MAX_SAMPLING_COUNT
)
.from(client.getDatabase(), ProfileTaskRecord.INDEX_NAME)
.where();
if (StringUtil.isNotEmpty(serviceId)) {
- query.and(eq(NoneStreamDAO.TAG_SERVICE_ID, serviceId));
+ query.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(endpointName)) {
query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
@@ -81,6 +85,9 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
List<ProfileTask> tasks = Lists.newArrayList();
QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
if (series != null) {
series.getValues().forEach(values -> {
tasks.add(profileTaskBuilder(values));
@@ -94,20 +101,26 @@ public class ProfileTaskQuery implements IProfileTaskQueryDAO {
if (StringUtil.isEmpty(id)) {
return null;
}
- SelectQueryImpl query = select("id", ProfileTaskRecord.SERVICE_ID,
- ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
- ProfileTaskRecord.CREATE_TIME,
- InfluxModelConstants.DURATION,
- ProfileTaskRecord.MIN_DURATION_THRESHOLD,
- ProfileTaskRecord.DUMP_PERIOD,
- ProfileTaskRecord.MAX_SAMPLING_COUNT
+ SelectQueryImpl query = select(
+ InfluxConstants.ID_COLUMN,
+ ProfileTaskRecord.SERVICE_ID,
+ ProfileTaskRecord.ENDPOINT_NAME,
+ ProfileTaskRecord.START_TIME,
+ ProfileTaskRecord.CREATE_TIME,
+ InfluxConstants.DURATION,
+ ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+ ProfileTaskRecord.DUMP_PERIOD,
+ ProfileTaskRecord.MAX_SAMPLING_COUNT
)
.from(client.getDatabase(), ProfileTaskRecord.INDEX_NAME)
.where()
- .and(eq("id", id))
+ .and(eq(InfluxConstants.ID_COLUMN, id))
.limit(1);
QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
if (Objects.nonNull(series)) {
return profileTaskBuilder(series.getValues().get(0));
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java
index 36a9937..25dfc0e 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/ProfileThreadSnapshotQuery.java
@@ -26,6 +26,8 @@ import java.util.Base64;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
@@ -33,6 +35,7 @@ import org.apache.skywalking.oap.server.core.query.entity.BasicTrace;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.WhereQueryImpl;
@@ -43,6 +46,7 @@ import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.lte;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
+@Slf4j
public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDAO {
private final InfluxClient client;
@@ -60,7 +64,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
final LinkedList<String> segments = new LinkedList<>();
QueryResult.Series series = client.queryForSingleSeries(query);
- if (series == null) {
+ if (Objects.isNull(series)) {
return Collections.emptyList();
}
series.getValues().forEach(values -> {
@@ -72,7 +76,7 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
}
query = select()
- .function("bottom", SegmentRecord.START_TIME, segments.size())
+ .function(InfluxConstants.SORT_ASC, SegmentRecord.START_TIME, segments.size())
.column(SegmentRecord.SEGMENT_ID)
.column(SegmentRecord.START_TIME)
.column(SegmentRecord.ENDPOINT_NAME)
@@ -130,8 +134,15 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
.and(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence))
.and(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence));
+ QueryResult.Series series = client.queryForSingleSeries(query);
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result: {}", query.getCommand(), series);
+ }
+ if (Objects.isNull(series)) {
+ return Collections.EMPTY_LIST;
+ }
ArrayList<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
- client.queryForSingleSeries(query).getValues().forEach(values -> {
+ series.getValues().forEach(values -> {
ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
record.setTaskId((String) values.get(1));
@@ -165,7 +176,10 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
.where()
.and(eq(SegmentRecord.SEGMENT_ID, segmentId));
List<QueryResult.Series> series = client.queryForSeries(query);
- if (series == null || series.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("SQL: {} result set: {}", query.getCommand(), series);
+ }
+ if (Objects.isNull(series) || series.isEmpty()) {
return null;
}
@@ -198,10 +212,6 @@ public class ProfileThreadSnapshotQuery implements IProfileThreadSnapshotQueryDA
.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
.and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start))
.and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end));
- QueryResult.Series series = client.queryForSingleSeries(query);
- if (series == null) {
- return -1;
- }
- return ((Number) series.getValues().get(0).get(1)).intValue();
+ return client.getCounter(query);
}
}
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java
index c247dd6..1dda92c 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopNRecordsQuery.java
@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.core.query.entity.Order;
import org.apache.skywalking.oap.server.core.query.entity.TopNRecord;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.influxdb.dto.QueryResult;
import org.influxdb.querybuilder.WhereQueryImpl;
@@ -50,11 +50,11 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO {
@Override
public List<TopNRecord> getTopNRecords(long startSecondTB, long endSecondTB, String metricName,
String serviceId, int topN, Order order) throws IOException {
- String function = "bottom";
+ String function = InfluxConstants.SORT_ASC;
// Have to re-sort here. Because the function, top()/bottom(), get the result ordered by the `time`.
Comparator<TopNRecord> comparator = Comparator.comparingLong(TopNRecord::getLatency);
if (order.equals(Order.DES)) {
- function = "top";
+ function = InfluxConstants.SORT_DES;
comparator = (a, b) -> Long.compare(b.getLatency(), a.getLatency());
}
@@ -68,7 +68,7 @@ public class TopNRecordsQuery implements ITopNRecordsQueryDAO {
.and(lte(TopN.TIME_BUCKET, endSecondTB));
if (StringUtil.isNotEmpty(serviceId)) {
- query.and(eq(RecordDAO.TAG_SERVICE_ID, serviceId));
+ query.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
}
QueryResult.Series series = client.queryForSingleSeries(query);
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java
index 804a785..67b6f4a 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TopologyQuery.java
@@ -29,14 +29,17 @@ import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.S
import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.entity.Call;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
+import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
+import org.influxdb.querybuilder.SelectQueryImpl;
+import org.influxdb.querybuilder.SelectSubQueryImpl;
import org.influxdb.querybuilder.WhereNested;
-import org.influxdb.querybuilder.WhereQueryImpl;
+import org.influxdb.querybuilder.WhereSubQueryImpl;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.eq;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.gte;
@@ -56,7 +59,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
long endTB,
List<String> serviceIds) throws IOException {
String measurement = ServiceRelationServerSideMetrics.INDEX_NAME;
- WhereQueryImpl query = buildServiceCallsQuery(
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB,
endTB,
@@ -64,7 +67,8 @@ public class TopologyQuery implements ITopologyQueryDAO {
ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
serviceIds
);
- return buildServiceCalls(query, DetectPoint.SERVER);
+
+ return buildServiceCalls(buildQuery(subQuery), DetectPoint.SERVER);
}
@Override
@@ -72,7 +76,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
long endTB,
List<String> serviceIds) throws IOException {
String measurement = ServiceRelationClientSideMetrics.INDEX_NAME;
- WhereQueryImpl query = buildServiceCallsQuery(
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB,
endTB,
@@ -80,14 +84,14 @@ public class TopologyQuery implements ITopologyQueryDAO {
ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
serviceIds
);
- return buildServiceCalls(query, DetectPoint.CLIENT);
+ return buildServiceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
}
@Override
public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(DownSampling downsampling, long startTB,
long endTB) throws IOException {
String measurement = ServiceRelationServerSideMetrics.INDEX_NAME;
- WhereQueryImpl query = buildServiceCallsQuery(
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB,
endTB,
@@ -95,14 +99,14 @@ public class TopologyQuery implements ITopologyQueryDAO {
ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
new ArrayList<>(0)
);
- return buildServiceCalls(query, DetectPoint.SERVER);
+ return buildServiceCalls(buildQuery(subQuery), DetectPoint.SERVER);
}
@Override
public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(DownSampling downsampling, long startTB,
long endTB) throws IOException {
String tableName = ServiceRelationClientSideMetrics.INDEX_NAME;
- WhereQueryImpl query = buildServiceCallsQuery(
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
tableName,
startTB,
endTB,
@@ -110,7 +114,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
ServiceRelationServerSideMetrics.DEST_SERVICE_ID,
new ArrayList<>(0)
);
- return buildServiceCalls(query, DetectPoint.CLIENT);
+ return buildServiceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
}
@Override
@@ -120,14 +124,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
long startTB,
long endTB) throws IOException {
String measurement = ServiceInstanceRelationServerSideMetrics.INDEX_NAME;
- WhereQueryImpl query = buildServiceInstanceCallsQuery(measurement,
- startTB,
- endTB,
- ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
- ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
- clientServiceId, serverServiceId
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery(
+ measurement,
+ startTB,
+ endTB,
+ ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
+ ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
+ clientServiceId, serverServiceId
);
- return buildInstanceCalls(query, DetectPoint.SERVER);
+ return buildInstanceCalls(buildQuery(subQuery), DetectPoint.SERVER);
}
@Override
@@ -137,14 +142,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
long startTB,
long endTB) throws IOException {
String measurement = ServiceInstanceRelationClientSideMetrics.INDEX_NAME;
- WhereQueryImpl query = buildServiceInstanceCallsQuery(measurement,
- startTB,
- endTB,
- ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
- ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
- clientServiceId, serverServiceId
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceInstanceCallsQuery(
+ measurement,
+ startTB,
+ endTB,
+ ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
+ ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
+ clientServiceId, serverServiceId
);
- return buildInstanceCalls(query, DetectPoint.CLIENT);
+ return buildInstanceCalls(buildQuery(subQuery), DetectPoint.CLIENT);
}
@Override
@@ -154,7 +160,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
String destEndpointId) throws IOException {
String measurement = EndpointRelationServerSideMetrics.INDEX_NAME;
- WhereQueryImpl query = buildServiceCallsQuery(
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = buildServiceCallsQuery(
measurement,
startTB,
endTB,
@@ -162,9 +168,9 @@ public class TopologyQuery implements ITopologyQueryDAO {
EndpointRelationServerSideMetrics.DEST_ENDPOINT,
Collections.emptyList()
);
- query.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId));
+ subQuery.and(eq(EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId));
- WhereQueryImpl query2 = buildServiceCallsQuery(
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery2 = buildServiceCallsQuery(
measurement,
startTB,
endTB,
@@ -172,61 +178,73 @@ public class TopologyQuery implements ITopologyQueryDAO {
EndpointRelationServerSideMetrics.DEST_ENDPOINT,
Collections.emptyList()
);
- query2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId));
+ subQuery2.and(eq(EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, destEndpointId));
- List<Call.CallDetail> calls = buildEndpointCalls(query, DetectPoint.SERVER);
- calls.addAll(buildEndpointCalls(query2, DetectPoint.CLIENT));
+ List<Call.CallDetail> calls = buildEndpointCalls(buildQuery(subQuery), DetectPoint.SERVER);
+ calls.addAll(buildEndpointCalls(buildQuery(subQuery), DetectPoint.CLIENT));
return calls;
}
- private WhereQueryImpl buildServiceCallsQuery(String measurement, long startTB, long endTB, String sourceCName,
- String destCName, List<String> serviceIds) {
- WhereQueryImpl query = select()
- .function("distinct", Metrics.ENTITY_ID, ServiceRelationServerSideMetrics.COMPONENT_ID)
- .from(client.getDatabase(), measurement)
+ private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceCallsQuery(
+ String measurement,
+ long startTB,
+ long endTB,
+ String sourceCName,
+ String destCName,
+ List<String> serviceIds) {
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
+ .fromSubQuery(client.getDatabase())
+ .function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
+ .as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
+ .from(measurement)
.where()
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)));
if (!serviceIds.isEmpty()) {
- WhereNested whereNested = query.andNested();
+ WhereNested whereNested = subQuery.andNested();
for (String id : serviceIds) {
whereNested.or(eq(sourceCName, id))
.or(eq(destCName, id));
}
whereNested.close();
}
- return query;
+ return subQuery;
}
- private WhereQueryImpl buildServiceInstanceCallsQuery(String measurement,
- long startTB,
- long endTB,
- String sourceCName,
- String destCName,
- String sourceServiceId,
- String destServiceId) {
- WhereQueryImpl query = select()
- .function("distinct", Metrics.ENTITY_ID, ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
- .from(client.getDatabase(), measurement)
+ private WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> buildServiceInstanceCallsQuery(
+ String measurement,
+ long startTB,
+ long endTB,
+ String sourceCName,
+ String destCName,
+ String sourceServiceId,
+ String destServiceId) {
+
+ WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery = select()
+ .fromSubQuery(client.getDatabase())
+ .function("distinct", ServiceInstanceRelationServerSideMetrics.COMPONENT_ID)
+ .as(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
+ .from(measurement)
.where()
.and(gte(InfluxClient.TIME, InfluxClient.timeInterval(startTB)))
.and(lte(InfluxClient.TIME, InfluxClient.timeInterval(endTB)));
StringBuilder builder = new StringBuilder("((");
- builder.append(sourceCName).append("=").append(sourceServiceId)
- .append(" and ")
- .append(destCName).append("=").append(destServiceId)
- .append(") or (")
- .append(sourceCName).append("=").append(destServiceId)
- .append(") and (")
- .append(destCName).append("=").append(sourceServiceId)
- .append("))");
- query.where(builder.toString());
- return query;
+ builder.append(sourceCName).append("='").append(sourceServiceId)
+ .append("' and ")
+ .append(destCName).append("='").append(destServiceId)
+ .append("') or (")
+ .append(sourceCName).append("='").append(destServiceId)
+ .append("') and (")
+ .append(destCName).append("='").append(sourceServiceId)
+ .append("'))");
+ subQuery.where(builder.toString());
+ subQuery.groupBy(InfluxConstants.TagName.ENTITY_ID);
+ return subQuery;
}
- private List<Call.CallDetail> buildServiceCalls(WhereQueryImpl query,
+ private List<Call.CallDetail> buildServiceCalls(Query query,
DetectPoint detectPoint) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query);
@@ -240,7 +258,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
List<Call.CallDetail> calls = new ArrayList<>();
series.getValues().forEach(values -> {
Call.CallDetail call = new Call.CallDetail();
- String entityId = (String) values.get(1);
+ String entityId = String.valueOf(values.get(1));
int componentId = (int) values.get(2);
call.buildFromServiceRelation(entityId, componentId, detectPoint);
calls.add(call);
@@ -248,7 +266,15 @@ public class TopologyQuery implements ITopologyQueryDAO {
return calls;
}
- private List<Call.CallDetail> buildInstanceCalls(WhereQueryImpl query,
+ private Query buildQuery(WhereSubQueryImpl<SelectSubQueryImpl<SelectQueryImpl>, SelectQueryImpl> subQuery) {
+ SelectQueryImpl query = select().column(InfluxConstants.TagName.ENTITY_ID)
+ .column(ServiceInstanceRelationClientSideMetrics.COMPONENT_ID)
+ .from(client.getDatabase());
+ query.setSubQuery(subQuery.groupBy(InfluxConstants.TagName.ENTITY_ID));
+ return query;
+ }
+
+ private List<Call.CallDetail> buildInstanceCalls(Query query,
DetectPoint detectPoint) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query);
@@ -270,7 +296,7 @@ public class TopologyQuery implements ITopologyQueryDAO {
return calls;
}
- private List<Call.CallDetail> buildEndpointCalls(WhereQueryImpl query,
+ private List<Call.CallDetail> buildEndpointCalls(Query query,
DetectPoint detectPoint) throws IOException {
QueryResult.Series series = client.queryForSingleSeries(query);
diff --git a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
index eac9706..9e577fa 100644
--- a/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
+++ b/oap-server/server-storage-plugin/storage-influxdb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/influxdb/query/TraceQuery.java
@@ -34,7 +34,7 @@ import org.apache.skywalking.oap.server.core.query.entity.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxClient;
-import org.apache.skywalking.oap.server.storage.plugin.influxdb.base.RecordDAO;
+import org.apache.skywalking.oap.server.storage.plugin.influxdb.InfluxConstants;
import org.elasticsearch.common.Strings;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
@@ -78,7 +78,7 @@ public class TraceQuery implements ITraceQueryDAO {
}
WhereQueryImpl<SelectQueryImpl> recallQuery = select()
- .function("top", orderBy, limit + from)
+ .function(InfluxConstants.SORT_DES, orderBy, limit + from)
.column(SegmentRecord.SEGMENT_ID)
.column(SegmentRecord.START_TIME)
.column(SegmentRecord.ENDPOINT_NAME)
@@ -102,7 +102,7 @@ public class TraceQuery implements ITraceQueryDAO {
recallQuery.and(contains(SegmentRecord.ENDPOINT_NAME, endpointName.replaceAll("/", "\\\\/")));
}
if (StringUtil.isNotEmpty(serviceId)) {
- recallQuery.and(eq(RecordDAO.TAG_SERVICE_ID, String.valueOf(serviceId)));
+ recallQuery.and(eq(InfluxConstants.TagName.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
recallQuery.and(eq(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
@@ -201,9 +201,9 @@ public class TraceQuery implements ITraceQueryDAO {
segmentRecord.setEndTime((long) values.get(7));
segmentRecord.setLatency((int) values.get(8));
segmentRecord.setIsError((int) values.get(9));
- segmentRecord.setVersion((int) values.get(10));
+ segmentRecord.setVersion((int) values.get(11));
- String base64 = (String) values.get(9);
+ String base64 = (String) values.get(10);
if (!Strings.isNullOrEmpty(base64)) {
segmentRecord.setDataBinary(Base64.getDecoder().decode(base64));
}
diff --git a/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml b/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml
index 4f0dc47..377ea79 100644
--- a/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml
+++ b/test/e2e/e2e-test/docker/profile/docker-compose.influxdb.yml
@@ -22,8 +22,6 @@ services:
- 8086
networks:
- e2e
- depends_on:
- - h2db
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/8086"]
interval: 5s
@@ -37,8 +35,6 @@ services:
environment:
SW_STORAGE: influxdb
depends_on:
- h2db:
- condition: service_healthy
influxdb:
condition: service_healthy
diff --git a/test/plugin/run.sh b/test/plugin/run.sh
index 56b6c62..d81ecff 100755
--- a/test/plugin/run.sh
+++ b/test/plugin/run.sh
@@ -187,7 +187,6 @@ fi
supported_versions=`grep -v -E "^$|^#" ${supported_version_file}`
for version in ${supported_versions}
do
- waitForAvailable
testcase_name="${scenario_name}-${version}"
# testcase working directory, there are logs, data and packages.
@@ -218,8 +217,10 @@ do
[[ $? -ne 0 ]] && exitWithMessage "${testcase_name}, generate script failure!"
echo "start container of testcase.name=${testcase_name}"
- bash ${case_work_base}/scenario.sh ${task_state_house} 1>${case_work_logs_dir}/${testcase_name}.log &
+ bash ${case_work_base}/scenario.sh ${task_state_house} 1>${case_work_logs_dir}/${testcase_name}.log
sleep 3
+ waitForAvailable
+ rm -rf ${case_work_base}
done
echo -e "\033[33m${scenario_name} has already sumbitted\033[0m"
diff --git a/test/plugin/scenarios/mysql-scenario/support-version.list b/test/plugin/scenarios/mysql-scenario/support-version.list
index 4768268..bf4ddf6 100644
--- a/test/plugin/scenarios/mysql-scenario/support-version.list
+++ b/test/plugin/scenarios/mysql-scenario/support-version.list
@@ -36,13 +36,4 @@
5.1.26
5.1.24
5.1.22
-5.1.20
-5.1.18
-5.1.16
-5.1.14
-5.1.12
-5.1.10
-5.1.8
-5.1.6
-5.1.4
-5.1.2
\ No newline at end of file
+5.1.20
\ No newline at end of file
diff --git a/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml b/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml
index aea4ef8..be366df 100644
--- a/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml
+++ b/test/plugin/scenarios/solrj-7.x-scenario/configuration.yml
@@ -25,6 +25,7 @@ dependencies:
solr-server:
image: solr:${CASE_SERVER_IMAGE_VERSION}
hostname: solr-server
+ removeOnExit: true
entrypoint:
- docker-entrypoint.sh
- solr-precreate